include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

81.4% Lines (250/307) 88.1% List of functions (37/42)
f(x) Functions (42)
Function Calls Lines Blocks
boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler_base const*, boost::corosio::detail::reactor_scheduler_context*) :65 394x 100.0% 100.0% boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler_base const*) :82 821946x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :94 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :111 1x 50.0% 64.0% boost::corosio::detail::reactor_scheduler_base::reactor_scheduler_base() :237 464x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::operator()() :275 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::task_op::destroy() :276 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler_base const*) :321 394x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :329 394x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_base::reset_inline_budget() const :341 104637x 54.5% 45.0% boost::corosio::detail::reactor_scheduler_base::try_consume_inline_budget() const :362 441472x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const :376 10668x 100.0% 84.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :382 10668x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :383 21336x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :385 10659x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :394 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(boost::corosio::detail::scheduler_op*) const :419 97103x 100.0% 87.0% boost::corosio::detail::reactor_scheduler_base::running_in_this_thread() const :436 1147x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::stop() :442 386x 100.0% 78.0% boost::corosio::detail::reactor_scheduler_base::stopped() const :454 21x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::restart() :461 91x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::run() :468 363x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::run_one() :493 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler_base::wait_one(long) :507 61x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::poll() :521 6x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::poll_one() :546 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::work_started() :560 26009x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::work_finished() :566 36423x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::compensating_work_started() const :573 166919x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :581 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :594 16056x 30.0% 35.0% boost::corosio::detail::reactor_scheduler_base::shutdown_drain() :611 464x 100.0% 88.0% boost::corosio::detail::reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const :628 813x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::maybe_unlock_and_signal_one(std::unique_lock<std::mutex>&) const :635 2008x 57.1% 50.0% boost::corosio::detail::reactor_scheduler_base::unlock_and_signal_one(std::unique_lock<std::mutex>&) const :649 345025x 85.7% 80.0% boost::corosio::detail::reactor_scheduler_base::clear_signal() const :661 1x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal(std::unique_lock<std::mutex>&) const :667 1x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal_for(std::unique_lock<std::mutex>&, long) const :679 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wake_one_thread_and_unlock(std::unique_lock<std::mutex>&) const :691 2008x 87.5% 92.0% boost::corosio::detail::reactor_scheduler_base::work_cleanup::~work_cleanup() :709 290729x 92.3% 92.0% boost::corosio::detail::reactor_scheduler_base::task_cleanup::~task_cleanup() :733 201627x 83.3% 86.0% boost::corosio::detail::reactor_scheduler_base::do_one(std::unique_lock<std::mutex>&, long, boost::corosio::detail::reactor_scheduler_context*) :754 291068x 78.6% 74.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/native/native_scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <condition_variable>
23 #include <coroutine>
24 #include <cstddef>
25 #include <cstdint>
26 #include <limits>
27 #include <memory>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 // Forward declaration
33 class reactor_scheduler_base;
34
35 /** Per-thread state for a reactor scheduler.
36
37 Each thread running a scheduler's event loop has one of these
38 on a thread-local stack. It holds a private work queue and
39 inline completion budget for speculative I/O fast paths.
40 */
41 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
42 {
43 /// Scheduler this context belongs to.
44 reactor_scheduler_base const* key;
45
46 /// Next context frame on this thread's stack.
47 reactor_scheduler_context* next;
48
49 /// Private work queue for reduced contention.
50 op_queue private_queue;
51
52 /// Unflushed work count for the private queue.
53 std::int64_t private_outstanding_work;
54
55 /// Remaining inline completions allowed this cycle.
56 int inline_budget;
57
58 /// Maximum inline budget (adaptive, 2-16).
59 int inline_budget_max;
60
61 /// True if no other thread absorbed queued work last cycle.
62 bool unassisted;
63
64 /// Construct a context frame linked to @a n.
65 394x reactor_scheduler_context(
66 reactor_scheduler_base const* k, reactor_scheduler_context* n)
67 394x : key(k)
68 394x , next(n)
69 394x , private_outstanding_work(0)
70 394x , inline_budget(0)
71 394x , inline_budget_max(2)
72 394x , unassisted(false)
73 {
74 394x }
75 };
76
77 /// Thread-local context stack for reactor schedulers.
78 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
79
80 /// Find the context frame for a scheduler on this thread.
81 inline reactor_scheduler_context*
82 821946x reactor_find_context(reactor_scheduler_base const* self) noexcept
83 {
84 821946x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
85 {
86 819386x if (c->key == self)
87 819386x return c;
88 }
89 2560x return nullptr;
90 }
91
92 /// Flush private work count to global counter.
93 inline void
94 reactor_flush_private_work(
95 reactor_scheduler_context* ctx,
96 std::atomic<std::int64_t>& outstanding_work) noexcept
97 {
98 if (ctx && ctx->private_outstanding_work > 0)
99 {
100 outstanding_work.fetch_add(
101 ctx->private_outstanding_work, std::memory_order_relaxed);
102 ctx->private_outstanding_work = 0;
103 }
104 }
105
106 /** Drain private queue to global queue, flushing work count first.
107
108 @return True if any ops were drained.
109 */
110 inline bool
111 1x reactor_drain_private_queue(
112 reactor_scheduler_context* ctx,
113 std::atomic<std::int64_t>& outstanding_work,
114 op_queue& completed_ops) noexcept
115 {
116 1x if (!ctx || ctx->private_queue.empty())
117 1x return false;
118
119 reactor_flush_private_work(ctx, outstanding_work);
120 completed_ops.splice(ctx->private_queue);
121 return true;
122 }
123
124 /** Non-template base for reactor-backed scheduler implementations.
125
126 Provides the complete threading model shared by epoll, kqueue,
127 and select schedulers: signal state machine, inline completion
128 budget, work counting, run/poll methods, and the do_one event
129 loop.
130
131 Derived classes provide platform-specific hooks by overriding:
132 - `run_task(lock, ctx)` to run the reactor poll
133 - `interrupt_reactor()` to wake a blocked reactor
134
135 De-templated from the original CRTP design to eliminate
136 duplicate instantiations when multiple backends are compiled
137 into the same binary. Virtual dispatch for run_task (called
138 once per reactor cycle, before a blocking syscall) has
139 negligible overhead.
140
141 @par Thread Safety
142 All public member functions are thread-safe.
143 */
144 class reactor_scheduler_base
145 : public native_scheduler
146 , public capy::execution_context::service
147 {
148 public:
149 using key_type = scheduler;
150 using context_type = reactor_scheduler_context;
151
152 /// Post a coroutine for deferred execution.
153 void post(std::coroutine_handle<> h) const override;
154
155 /// Post a scheduler operation for deferred execution.
156 void post(scheduler_op* h) const override;
157
158 /// Return true if called from a thread running this scheduler.
159 bool running_in_this_thread() const noexcept override;
160
161 /// Request the scheduler to stop dispatching handlers.
162 void stop() override;
163
164 /// Return true if the scheduler has been stopped.
165 bool stopped() const noexcept override;
166
167 /// Reset the stopped state so `run()` can resume.
168 void restart() override;
169
170 /// Run the event loop until no work remains.
171 std::size_t run() override;
172
173 /// Run until one handler completes or no work remains.
174 std::size_t run_one() override;
175
176 /// Run until one handler completes or @a usec elapses.
177 std::size_t wait_one(long usec) override;
178
179 /// Run ready handlers without blocking.
180 std::size_t poll() override;
181
182 /// Run at most one ready handler without blocking.
183 std::size_t poll_one() override;
184
185 /// Increment the outstanding work count.
186 void work_started() noexcept override;
187
188 /// Decrement the outstanding work count, stopping on zero.
189 void work_finished() noexcept override;
190
191 /** Reset the thread's inline completion budget.
192
193 Called at the start of each posted completion handler to
194 grant a fresh budget for speculative inline completions.
195 */
196 void reset_inline_budget() const noexcept;
197
198 /** Consume one unit of inline budget if available.
199
200 @return True if budget was available and consumed.
201 */
202 bool try_consume_inline_budget() const noexcept;
203
204 /** Offset a forthcoming work_finished from work_cleanup.
205
206 Called by descriptor_state when all I/O returned EAGAIN and
207 no handler will be executed. Must be called from a scheduler
208 thread.
209 */
210 void compensating_work_started() const noexcept;
211
212 /** Drain work from thread context's private queue to global queue.
213
214 Flushes private work count to the global counter, then
215 transfers the queue under mutex protection.
216
217 @param queue The private queue to drain.
218 @param count Private work count to flush before draining.
219 */
220 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
221
222 /** Post completed operations for deferred invocation.
223
224 If called from a thread running this scheduler, operations
225 go to the thread's private queue (fast path). Otherwise,
226 operations are added to the global queue under mutex and a
227 waiter is signaled.
228
229 @par Preconditions
230 work_started() must have been called for each operation.
231
232 @param ops Queue of operations to post.
233 */
234 void post_deferred_completions(op_queue& ops) const;
235
236 protected:
237 464x reactor_scheduler_base() = default;
238
239 /** Drain completed_ops during shutdown.
240
241 Pops all operations from the global queue and destroys them,
242 skipping the task sentinel. Signals all waiting threads.
243 Derived classes call this from their shutdown() override
244 before performing platform-specific cleanup.
245 */
246 void shutdown_drain();
247
248 /// RAII guard that re-inserts the task sentinel after `run_task`.
249 struct task_cleanup
250 {
251 reactor_scheduler_base const* sched;
252 std::unique_lock<std::mutex>* lock;
253 context_type* ctx;
254 ~task_cleanup();
255 };
256
257 mutable std::mutex mutex_;
258 mutable std::condition_variable cond_;
259 mutable op_queue completed_ops_;
260 mutable std::atomic<std::int64_t> outstanding_work_{0};
261 bool stopped_ = false;
262 mutable std::atomic<bool> task_running_{false};
263 mutable bool task_interrupted_ = false;
264
265 /// Bit 0 of `state_`: set when the condvar should be signaled.
266 static constexpr std::size_t signaled_bit = 1;
267
268 /// Increment per waiting thread in `state_`.
269 static constexpr std::size_t waiter_increment = 2;
270 mutable std::size_t state_ = 0;
271
272 /// Sentinel op that triggers a reactor poll when dequeued.
273 struct task_op final : scheduler_op
274 {
275 void operator()() override {}
276 void destroy() override {}
277 };
278 task_op task_op_;
279
280 /// Run the platform-specific reactor poll.
281 virtual void
282 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) = 0;
283
284 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
285 virtual void interrupt_reactor() const = 0;
286
287 private:
288 struct work_cleanup
289 {
290 reactor_scheduler_base* sched;
291 std::unique_lock<std::mutex>* lock;
292 context_type* ctx;
293 ~work_cleanup();
294 };
295
296 std::size_t do_one(
297 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx);
298
299 void signal_all(std::unique_lock<std::mutex>& lock) const;
300 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
301 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
302 void clear_signal() const;
303 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
304 void wait_for_signal_for(
305 std::unique_lock<std::mutex>& lock, long timeout_us) const;
306 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
307 };
308
309 /** RAII guard that pushes/pops a scheduler context frame.
310
311 On construction, pushes a new context frame onto the
312 thread-local stack. On destruction, drains any remaining
313 private queue items to the global queue and pops the frame.
314 */
315 struct reactor_thread_context_guard
316 {
317 /// The context frame managed by this guard.
318 reactor_scheduler_context frame_;
319
320 /// Construct the guard, pushing a frame for @a sched.
321 394x explicit reactor_thread_context_guard(
322 reactor_scheduler_base const* sched) noexcept
323 394x : frame_(sched, reactor_context_stack.get())
324 {
325 394x reactor_context_stack.set(&frame_);
326 394x }
327
328 /// Destroy the guard, draining private work and popping the frame.
329 394x ~reactor_thread_context_guard() noexcept
330 {
331 394x if (!frame_.private_queue.empty())
332 frame_.key->drain_thread_queue(
333 frame_.private_queue, frame_.private_outstanding_work);
334 394x reactor_context_stack.set(frame_.next);
335 394x }
336 };
337
338 // ---- Inline implementations ------------------------------------------------
339
340 inline void
341 104637x reactor_scheduler_base::reset_inline_budget() const noexcept
342 {
343 104637x if (auto* ctx = reactor_find_context(this))
344 {
345 // Cap when no other thread absorbed queued work
346 104637x if (ctx->unassisted)
347 {
348 104637x ctx->inline_budget_max = 4;
349 104637x ctx->inline_budget = 4;
350 104637x return;
351 }
352 // Ramp up when previous cycle fully consumed budget
353 if (ctx->inline_budget == 0)
354 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
355 else if (ctx->inline_budget < ctx->inline_budget_max)
356 ctx->inline_budget_max = 2;
357 ctx->inline_budget = ctx->inline_budget_max;
358 }
359 }
360
361 inline bool
362 441472x reactor_scheduler_base::try_consume_inline_budget() const noexcept
363 {
364 441472x if (auto* ctx = reactor_find_context(this))
365 {
366 441472x if (ctx->inline_budget > 0)
367 {
368 353206x --ctx->inline_budget;
369 353206x return true;
370 }
371 }
372 88266x return false;
373 }
374
375 inline void
376 10668x reactor_scheduler_base::post(std::coroutine_handle<> h) const
377 {
378 struct post_handler final : scheduler_op
379 {
380 std::coroutine_handle<> h_;
381
382 10668x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
383 21336x ~post_handler() override = default;
384
385 10659x void operator()() override
386 {
387 10659x auto saved = h_;
388 10659x delete this;
389 // Ensure stores from the posting thread are visible
390 std::atomic_thread_fence(std::memory_order_acquire);
391 10659x saved.resume();
392 10659x }
393
394 9x void destroy() override
395 {
396 9x auto saved = h_;
397 9x delete this;
398 9x saved.destroy();
399 9x }
400 };
401
402 10668x auto ph = std::make_unique<post_handler>(h);
403
404 10668x if (auto* ctx = reactor_find_context(this))
405 {
406 8694x ++ctx->private_outstanding_work;
407 8694x ctx->private_queue.push(ph.release());
408 8694x return;
409 }
410
411 1974x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
412
413 1974x std::unique_lock lock(mutex_);
414 1974x completed_ops_.push(ph.release());
415 1974x wake_one_thread_and_unlock(lock);
416 10668x }
417
418 inline void
419 97103x reactor_scheduler_base::post(scheduler_op* h) const
420 {
421 97103x if (auto* ctx = reactor_find_context(this))
422 {
423 97069x ++ctx->private_outstanding_work;
424 97069x ctx->private_queue.push(h);
425 97069x return;
426 }
427
428 34x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
429
430 34x std::unique_lock lock(mutex_);
431 34x completed_ops_.push(h);
432 34x wake_one_thread_and_unlock(lock);
433 34x }
434
435 inline bool
436 1147x reactor_scheduler_base::running_in_this_thread() const noexcept
437 {
438 1147x return reactor_find_context(this) != nullptr;
439 }
440
441 inline void
442 386x reactor_scheduler_base::stop()
443 {
444 386x std::unique_lock lock(mutex_);
445 386x if (!stopped_)
446 {
447 349x stopped_ = true;
448 349x signal_all(lock);
449 349x interrupt_reactor();
450 }
451 386x }
452
453 inline bool
454 21x reactor_scheduler_base::stopped() const noexcept
455 {
456 21x std::unique_lock lock(mutex_);
457 42x return stopped_;
458 21x }
459
460 inline void
461 91x reactor_scheduler_base::restart()
462 {
463 91x std::unique_lock lock(mutex_);
464 91x stopped_ = false;
465 91x }
466
467 inline std::size_t
468 363x reactor_scheduler_base::run()
469 {
470 726x if (outstanding_work_.load(std::memory_order_acquire) == 0)
471 {
472 29x stop();
473 29x return 0;
474 }
475
476 334x reactor_thread_context_guard ctx(this);
477 334x std::unique_lock lock(mutex_);
478
479 334x std::size_t n = 0;
480 for (;;)
481 {
482 291002x if (!do_one(lock, -1, &ctx.frame_))
483 334x break;
484 290668x if (n != (std::numeric_limits<std::size_t>::max)())
485 290668x ++n;
486 290668x if (!lock.owns_lock())
487 193350x lock.lock();
488 }
489 334x return n;
490 334x }
491
492 inline std::size_t
493 2x reactor_scheduler_base::run_one()
494 {
495 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
496 {
497 stop();
498 return 0;
499 }
500
501 2x reactor_thread_context_guard ctx(this);
502 2x std::unique_lock lock(mutex_);
503 2x return do_one(lock, -1, &ctx.frame_);
504 2x }
505
506 inline std::size_t
507 61x reactor_scheduler_base::wait_one(long usec)
508 {
509 122x if (outstanding_work_.load(std::memory_order_acquire) == 0)
510 {
511 10x stop();
512 10x return 0;
513 }
514
515 51x reactor_thread_context_guard ctx(this);
516 51x std::unique_lock lock(mutex_);
517 51x return do_one(lock, usec, &ctx.frame_);
518 51x }
519
520 inline std::size_t
521 6x reactor_scheduler_base::poll()
522 {
523 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
524 {
525 1x stop();
526 1x return 0;
527 }
528
529 5x reactor_thread_context_guard ctx(this);
530 5x std::unique_lock lock(mutex_);
531
532 5x std::size_t n = 0;
533 for (;;)
534 {
535 11x if (!do_one(lock, 0, &ctx.frame_))
536 5x break;
537 6x if (n != (std::numeric_limits<std::size_t>::max)())
538 6x ++n;
539 6x if (!lock.owns_lock())
540 6x lock.lock();
541 }
542 5x return n;
543 5x }
544
545 inline std::size_t
546 4x reactor_scheduler_base::poll_one()
547 {
548 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
549 {
550 2x stop();
551 2x return 0;
552 }
553
554 2x reactor_thread_context_guard ctx(this);
555 2x std::unique_lock lock(mutex_);
556 2x return do_one(lock, 0, &ctx.frame_);
557 2x }
558
559 inline void
560 26009x reactor_scheduler_base::work_started() noexcept
561 {
562 26009x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
563 26009x }
564
565 inline void
566 36423x reactor_scheduler_base::work_finished() noexcept
567 {
568 72846x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
569 341x stop();
570 36423x }
571
572 inline void
573 166919x reactor_scheduler_base::compensating_work_started() const noexcept
574 {
575 166919x auto* ctx = reactor_find_context(this);
576 166919x if (ctx)
577 166919x ++ctx->private_outstanding_work;
578 166919x }
579
580 inline void
581 reactor_scheduler_base::drain_thread_queue(
582 op_queue& queue, std::int64_t count) const
583 {
584 if (count > 0)
585 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
586
587 std::unique_lock lock(mutex_);
588 completed_ops_.splice(queue);
589 if (count > 0)
590 maybe_unlock_and_signal_one(lock);
591 }
592
593 inline void
594 16056x reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
595 {
596 16056x if (ops.empty())
597 16056x return;
598
599 if (auto* ctx = reactor_find_context(this))
600 {
601 ctx->private_queue.splice(ops);
602 return;
603 }
604
605 std::unique_lock lock(mutex_);
606 completed_ops_.splice(ops);
607 wake_one_thread_and_unlock(lock);
608 }
609
610 inline void
611 464x reactor_scheduler_base::shutdown_drain()
612 {
613 464x std::unique_lock lock(mutex_);
614
615 1016x while (auto* h = completed_ops_.pop())
616 {
617 552x if (h == &task_op_)
618 464x continue;
619 88x lock.unlock();
620 88x h->destroy();
621 88x lock.lock();
622 552x }
623
624 464x signal_all(lock);
625 464x }
626
627 inline void
628 813x reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const
629 {
630 813x state_ |= signaled_bit;
631 813x cond_.notify_all();
632 813x }
633
634 inline bool
635 2008x reactor_scheduler_base::maybe_unlock_and_signal_one(
636 std::unique_lock<std::mutex>& lock) const
637 {
638 2008x state_ |= signaled_bit;
639 2008x if (state_ > signaled_bit)
640 {
641 lock.unlock();
642 cond_.notify_one();
643 return true;
644 }
645 2008x return false;
646 }
647
648 inline bool
649 345025x reactor_scheduler_base::unlock_and_signal_one(
650 std::unique_lock<std::mutex>& lock) const
651 {
652 345025x state_ |= signaled_bit;
653 345025x bool have_waiters = state_ > signaled_bit;
654 345025x lock.unlock();
655 345025x if (have_waiters)
656 cond_.notify_one();
657 345025x return have_waiters;
658 }
659
660 inline void
661 1x reactor_scheduler_base::clear_signal() const
662 {
663 1x state_ &= ~signaled_bit;
664 1x }
665
666 inline void
667 1x reactor_scheduler_base::wait_for_signal(
668 std::unique_lock<std::mutex>& lock) const
669 {
670 2x while ((state_ & signaled_bit) == 0)
671 {
672 1x state_ += waiter_increment;
673 1x cond_.wait(lock);
674 1x state_ -= waiter_increment;
675 }
676 1x }
677
678 inline void
679 reactor_scheduler_base::wait_for_signal_for(
680 std::unique_lock<std::mutex>& lock, long timeout_us) const
681 {
682 if ((state_ & signaled_bit) == 0)
683 {
684 state_ += waiter_increment;
685 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
686 state_ -= waiter_increment;
687 }
688 }
689
690 inline void
691 2008x reactor_scheduler_base::wake_one_thread_and_unlock(
692 std::unique_lock<std::mutex>& lock) const
693 {
694 2008x if (maybe_unlock_and_signal_one(lock))
695 return;
696
697 2008x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
698 {
699 21x task_interrupted_ = true;
700 21x lock.unlock();
701 21x interrupt_reactor();
702 }
703 else
704 {
705 1987x lock.unlock();
706 }
707 }
708
709 290729x inline reactor_scheduler_base::work_cleanup::~work_cleanup()
710 {
711 290729x if (ctx)
712 {
713 290729x std::int64_t produced = ctx->private_outstanding_work;
714 290729x if (produced > 1)
715 15x sched->outstanding_work_.fetch_add(
716 produced - 1, std::memory_order_relaxed);
717 290714x else if (produced < 1)
718 26470x sched->work_finished();
719 290729x ctx->private_outstanding_work = 0;
720
721 290729x if (!ctx->private_queue.empty())
722 {
723 97340x lock->lock();
724 97340x sched->completed_ops_.splice(ctx->private_queue);
725 }
726 }
727 else
728 {
729 sched->work_finished();
730 }
731 290729x }
732
733 403254x inline reactor_scheduler_base::task_cleanup::~task_cleanup()
734 {
735 201627x if (!ctx)
736 return;
737
738 201627x if (ctx->private_outstanding_work > 0)
739 {
740 8398x sched->outstanding_work_.fetch_add(
741 8398x ctx->private_outstanding_work, std::memory_order_relaxed);
742 8398x ctx->private_outstanding_work = 0;
743 }
744
745 201627x if (!ctx->private_queue.empty())
746 {
747 8398x if (!lock->owns_lock())
748 lock->lock();
749 8398x sched->completed_ops_.splice(ctx->private_queue);
750 }
751 201627x }
752
753 inline std::size_t
754 291068x reactor_scheduler_base::do_one(
755 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx)
756 {
757 for (;;)
758 {
759 492696x if (stopped_)
760 335x return 0;
761
762 492361x scheduler_op* op = completed_ops_.pop();
763
764 // Handle reactor sentinel — time to poll for I/O
765 492361x if (op == &task_op_)
766 {
767 bool more_handlers =
768 201631x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
769
770 348966x if (!more_handlers &&
771 294670x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
772 timeout_us == 0))
773 {
774 4x completed_ops_.push(&task_op_);
775 4x return 0;
776 }
777
778 201627x task_interrupted_ = more_handlers || timeout_us == 0;
779 201627x task_running_.store(true, std::memory_order_release);
780
781 201627x if (more_handlers)
782 54296x unlock_and_signal_one(lock);
783
784 try
785 {
786 201627x run_task(lock, ctx);
787 }
788 catch (...)
789 {
790 task_running_.store(false, std::memory_order_relaxed);
791 throw;
792 }
793
794 201627x task_running_.store(false, std::memory_order_relaxed);
795 201627x completed_ops_.push(&task_op_);
796 201627x continue;
797 201627x }
798
799 // Handle operation
800 290730x if (op != nullptr)
801 {
802 290729x bool more = !completed_ops_.empty();
803
804 290729x if (more)
805 290729x ctx->unassisted = !unlock_and_signal_one(lock);
806 else
807 {
808 ctx->unassisted = false;
809 lock.unlock();
810 }
811
812 290729x work_cleanup on_exit{this, &lock, ctx};
813 (void)on_exit;
814
815 290729x (*op)();
816 290729x return 1;
817 290729x }
818
819 // Try private queue before blocking
820 1x if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
821 continue;
822
823 2x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
824 timeout_us == 0)
825 return 0;
826
827 1x clear_signal();
828 1x if (timeout_us < 0)
829 1x wait_for_signal(lock);
830 else
831 wait_for_signal_for(lock, timeout_us);
832 201628x }
833 }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
838