libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

78.2% Lines (390/499) 85.4% Functions (41/48) 63.6% Branches (203/319)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106
107 158 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 158 : key(k)
109 158 , next(n)
110 158 , private_outstanding_work(0)
111 {
112 158 }
113 };
114
115 namespace {
116
117 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
118
119 struct thread_context_guard
120 {
121 scheduler_context frame_;
122
123 158 explicit thread_context_guard(
124 epoll_scheduler const* ctx) noexcept
125 158 : frame_(ctx, context_stack.get())
126 {
127 158 context_stack.set(&frame_);
128 158 }
129
130 158 ~thread_context_guard() noexcept
131 {
132
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 158 times.
158 if (!frame_.private_queue.empty())
133 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
134 158 context_stack.set(frame_.next);
135 158 }
136 };
137
138 scheduler_context*
139 226890 find_context(epoll_scheduler const* self) noexcept
140 {
141
2/2
✓ Branch 1 taken 225241 times.
✓ Branch 2 taken 1649 times.
226890 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
142
1/2
✓ Branch 0 taken 225241 times.
✗ Branch 1 not taken.
225241 if (c->key == self)
143 225241 return c;
144 1649 return nullptr;
145 }
146
147 /// Flush private work count to global counter.
148 void
149 flush_private_work(
150 scheduler_context* ctx,
151 std::atomic<long>& outstanding_work) noexcept
152 {
153 if (ctx && ctx->private_outstanding_work > 0)
154 {
155 outstanding_work.fetch_add(
156 ctx->private_outstanding_work, std::memory_order_relaxed);
157 ctx->private_outstanding_work = 0;
158 }
159 }
160
161 /// Drain private queue to global queue, flushing work count first.
162 ///
163 /// @return True if any ops were drained.
164 bool
165 drain_private_queue(
166 scheduler_context* ctx,
167 std::atomic<long>& outstanding_work,
168 op_queue& completed_ops) noexcept
169 {
170 if (!ctx || ctx->private_queue.empty())
171 return false;
172
173 flush_private_work(ctx, outstanding_work);
174 completed_ops.splice(ctx->private_queue);
175 return true;
176 }
177
178 } // namespace
179
180 void
181 78482 descriptor_state::
182 operator()()
183 {
184 78482 is_enqueued_.store(false, std::memory_order_relaxed);
185
186 // Take ownership of impl ref set by close_socket() to prevent
187 // the owning impl from being freed while we're executing
188 78482 auto prevent_impl_destruction = std::move(impl_ref_);
189
190 78482 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78482 times.
78482 if (ev == 0)
192 {
193 scheduler_->compensating_work_started();
194 return;
195 }
196
197 78482 op_queue local_ops;
198
199 78482 int err = 0;
200
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 78481 times.
78482 if (ev & EPOLLERR)
201 {
202 1 socklen_t len = sizeof(err);
203
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
204 err = errno;
205
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
206 1 err = EIO;
207 }
208
209 78482 epoll_op* rd = nullptr;
210 78482 epoll_op* wr = nullptr;
211 78482 epoll_op* cn = nullptr;
212 {
213
1/1
✓ Branch 1 taken 78482 times.
78482 std::lock_guard lock(mutex);
214
2/2
✓ Branch 0 taken 30327 times.
✓ Branch 1 taken 48155 times.
78482 if (ev & EPOLLIN)
215 {
216 30327 rd = std::exchange(read_op, nullptr);
217
2/2
✓ Branch 0 taken 27637 times.
✓ Branch 1 taken 2690 times.
30327 if (!rd)
218 27637 read_ready = true;
219 }
220
2/2
✓ Branch 0 taken 75842 times.
✓ Branch 1 taken 2640 times.
78482 if (ev & EPOLLOUT)
221 {
222 75842 cn = std::exchange(connect_op, nullptr);
223 75842 wr = std::exchange(write_op, nullptr);
224
3/4
✓ Branch 0 taken 73199 times.
✓ Branch 1 taken 2643 times.
✓ Branch 2 taken 73199 times.
✗ Branch 3 not taken.
75842 if (!cn && !wr)
225 73199 write_ready = true;
226 }
227
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 78481 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
78482 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
228 {
229 rd = std::exchange(read_op, nullptr);
230 wr = std::exchange(write_op, nullptr);
231 cn = std::exchange(connect_op, nullptr);
232 }
233 78482 }
234
235 // Non-null after I/O means EAGAIN; re-register under lock below
236
2/2
✓ Branch 0 taken 2690 times.
✓ Branch 1 taken 75792 times.
78482 if (rd)
237 {
238
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2690 times.
2690 if (err)
239 rd->complete(err, 0);
240 else
241 2690 rd->perform_io();
242
243
2/4
✓ Branch 0 taken 2690 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2690 times.
2690 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
244 {
245 rd->errn = 0;
246 }
247 else
248 {
249 2690 local_ops.push(rd);
250 2690 rd = nullptr;
251 }
252 }
253
254
2/2
✓ Branch 0 taken 2643 times.
✓ Branch 1 taken 75839 times.
78482 if (cn)
255 {
256
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2643 times.
2643 if (err)
257 cn->complete(err, 0);
258 else
259 2643 cn->perform_io();
260 2643 local_ops.push(cn);
261 2643 cn = nullptr;
262 }
263
264
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78482 times.
78482 if (wr)
265 {
266 if (err)
267 wr->complete(err, 0);
268 else
269 wr->perform_io();
270
271 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
272 {
273 wr->errn = 0;
274 }
275 else
276 {
277 local_ops.push(wr);
278 wr = nullptr;
279 }
280 }
281
282
2/4
✓ Branch 0 taken 78482 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 78482 times.
78482 if (rd || wr)
283 {
284 std::lock_guard lock(mutex);
285 if (rd)
286 read_op = rd;
287 if (wr)
288 write_op = wr;
289 }
290
291 // Execute first handler inline — the scheduler's work_cleanup
292 // accounts for this as the "consumed" work item
293 78482 scheduler_op* first = local_ops.pop();
294
2/2
✓ Branch 0 taken 5333 times.
✓ Branch 1 taken 73149 times.
78482 if (first)
295 {
296
1/1
✓ Branch 1 taken 5333 times.
5333 scheduler_->post_deferred_completions(local_ops);
297
1/1
✓ Branch 1 taken 5333 times.
5333 (*first)();
298 }
299 else
300 {
301 73149 scheduler_->compensating_work_started();
302 }
303 78482 }
304
305 189 epoll_scheduler::
306 epoll_scheduler(
307 capy::execution_context& ctx,
308 189 int)
309 189 : epoll_fd_(-1)
310 189 , event_fd_(-1)
311 189 , timer_fd_(-1)
312 189 , outstanding_work_(0)
313 189 , stopped_(false)
314 189 , shutdown_(false)
315 189 , task_running_(false)
316 189 , task_interrupted_(false)
317 378 , state_(0)
318 {
319 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
321 detail::throw_system_error(make_err(errno), "epoll_create1");
322
323 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
324
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
325 {
326 int errn = errno;
327 ::close(epoll_fd_);
328 detail::throw_system_error(make_err(errn), "eventfd");
329 }
330
331 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
333 {
334 int errn = errno;
335 ::close(event_fd_);
336 ::close(epoll_fd_);
337 detail::throw_system_error(make_err(errn), "timerfd_create");
338 }
339
340 189 epoll_event ev{};
341 189 ev.events = EPOLLIN | EPOLLET;
342 189 ev.data.ptr = nullptr;
343
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
344 {
345 int errn = errno;
346 ::close(timer_fd_);
347 ::close(event_fd_);
348 ::close(epoll_fd_);
349 detail::throw_system_error(make_err(errn), "epoll_ctl");
350 }
351
352 189 epoll_event timer_ev{};
353 189 timer_ev.events = EPOLLIN | EPOLLERR;
354 189 timer_ev.data.ptr = &timer_fd_;
355
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
356 {
357 int errn = errno;
358 ::close(timer_fd_);
359 ::close(event_fd_);
360 ::close(epoll_fd_);
361 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
362 }
363
364
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
365
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
366 timer_service::callback(
367 this,
368 2882 [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
369
370 // Initialize resolver service
371
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
372
373 // Initialize signal service
374
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
375
376 // Push task sentinel to interleave reactor runs with handler execution
377 189 completed_ops_.push(&task_op_);
378 189 }
379
380 378 epoll_scheduler::
381 189 ~epoll_scheduler()
382 {
383
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
384 189 ::close(timer_fd_);
385
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
386 189 ::close(event_fd_);
387
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
388 189 ::close(epoll_fd_);
389 378 }
390
391 void
392 189 epoll_scheduler::
393 shutdown()
394 {
395 {
396
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
397 189 shutdown_ = true;
398
399
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
400 {
401
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
402 189 continue;
403 lock.unlock();
404 h->destroy();
405 lock.lock();
406 189 }
407
408 189 signal_all(lock);
409 189 }
410
411 189 outstanding_work_.store(0, std::memory_order_release);
412
413
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
414 189 interrupt_reactor();
415 189 }
416
417 void
418 4513 epoll_scheduler::
419 post(capy::coro h) const
420 {
421 struct post_handler final
422 : scheduler_op
423 {
424 capy::coro h_;
425
426 explicit
427 4513 post_handler(capy::coro h)
428 4513 : h_(h)
429 {
430 4513 }
431
432 9026 ~post_handler() = default;
433
434 4513 void operator()() override
435 {
436 4513 auto h = h_;
437
1/2
✓ Branch 0 taken 4513 times.
✗ Branch 1 not taken.
4513 delete this;
438 std::atomic_thread_fence(std::memory_order_acquire);
439
1/1
✓ Branch 1 taken 4513 times.
4513 h.resume();
440 4513 }
441
442 void destroy() override
443 {
444 delete this;
445 }
446 };
447
448
1/1
✓ Branch 1 taken 4513 times.
4513 auto ph = std::make_unique<post_handler>(h);
449
450 // Fast path: same thread posts to private queue
451 // Only count locally; work_cleanup batches to global counter
452
2/2
✓ Branch 1 taken 2890 times.
✓ Branch 2 taken 1623 times.
4513 if (auto* ctx = find_context(this))
453 {
454 2890 ++ctx->private_outstanding_work;
455 2890 ctx->private_queue.push(ph.release());
456 2890 return;
457 }
458
459 // Slow path: cross-thread post requires mutex
460 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
461
462
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
463 1623 completed_ops_.push(ph.release());
464
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
465 4513 }
466
467 void
468 149228 epoll_scheduler::
469 post(scheduler_op* h) const
470 {
471 // Fast path: same thread posts to private queue
472 // Only count locally; work_cleanup batches to global counter
473
2/2
✓ Branch 1 taken 149202 times.
✓ Branch 2 taken 26 times.
149228 if (auto* ctx = find_context(this))
474 {
475 149202 ++ctx->private_outstanding_work;
476 149202 ctx->private_queue.push(h);
477 149202 return;
478 }
479
480 // Slow path: cross-thread post requires mutex
481 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
482
483
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
484 26 completed_ops_.push(h);
485
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
486 26 }
487
488 void
489 2905 epoll_scheduler::
490 on_work_started() noexcept
491 {
492 2905 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
493 2905 }
494
495 void
496 2873 epoll_scheduler::
497 on_work_finished() noexcept
498 {
499
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2873 times.
5746 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
500 stop();
501 2873 }
502
503 bool
504 3118 epoll_scheduler::
505 running_in_this_thread() const noexcept
506 {
507
2/2
✓ Branch 1 taken 2908 times.
✓ Branch 2 taken 210 times.
3118 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
508
1/2
✓ Branch 0 taken 2908 times.
✗ Branch 1 not taken.
2908 if (c->key == this)
509 2908 return true;
510 210 return false;
511 }
512
513 void
514 40 epoll_scheduler::
515 stop()
516 {
517
1/1
✓ Branch 1 taken 40 times.
40 std::unique_lock lock(mutex_);
518
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 20 times.
40 if (!stopped_)
519 {
520 20 stopped_ = true;
521 20 signal_all(lock);
522
1/1
✓ Branch 1 taken 20 times.
20 interrupt_reactor();
523 }
524 40 }
525
526 bool
527 16 epoll_scheduler::
528 stopped() const noexcept
529 {
530 16 std::unique_lock lock(mutex_);
531 32 return stopped_;
532 16 }
533
534 void
535 49 epoll_scheduler::
536 restart()
537 {
538
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
539 49 stopped_ = false;
540 49 }
541
542 std::size_t
543 175 epoll_scheduler::
544 run()
545 {
546
2/2
✓ Branch 1 taken 31 times.
✓ Branch 2 taken 144 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
547 {
548
1/1
✓ Branch 1 taken 31 times.
31 stop();
549 31 return 0;
550 }
551
552 144 thread_context_guard ctx(this);
553
1/1
✓ Branch 1 taken 144 times.
144 std::unique_lock lock(mutex_);
554
555 144 std::size_t n = 0;
556 for (;;)
557 {
558
3/3
✓ Branch 1 taken 232352 times.
✓ Branch 3 taken 144 times.
✓ Branch 4 taken 232208 times.
232352 if (!do_one(lock, -1, &ctx.frame_))
559 144 break;
560
1/2
✓ Branch 1 taken 232208 times.
✗ Branch 2 not taken.
232208 if (n != (std::numeric_limits<std::size_t>::max)())
561 232208 ++n;
562
2/2
✓ Branch 1 taken 83033 times.
✓ Branch 2 taken 149175 times.
232208 if (!lock.owns_lock())
563
1/1
✓ Branch 1 taken 83033 times.
83033 lock.lock();
564 }
565 144 return n;
566 144 }
567
568 std::size_t
569 2 epoll_scheduler::
570 run_one()
571 {
572
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
573 {
574 stop();
575 return 0;
576 }
577
578 2 thread_context_guard ctx(this);
579
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
580
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
581 2 }
582
583 std::size_t
584 14 epoll_scheduler::
585 wait_one(long usec)
586 {
587
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
588 {
589
1/1
✓ Branch 1 taken 5 times.
5 stop();
590 5 return 0;
591 }
592
593 9 thread_context_guard ctx(this);
594
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
595
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
596 9 }
597
598 std::size_t
599 2 epoll_scheduler::
600 poll()
601 {
602
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
603 {
604
1/1
✓ Branch 1 taken 1 time.
1 stop();
605 1 return 0;
606 }
607
608 1 thread_context_guard ctx(this);
609
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
610
611 1 std::size_t n = 0;
612 for (;;)
613 {
614
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
615 1 break;
616
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
617 2 ++n;
618
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
619
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
620 }
621 1 return n;
622 1 }
623
624 std::size_t
625 4 epoll_scheduler::
626 poll_one()
627 {
628
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
629 {
630
1/1
✓ Branch 1 taken 2 times.
2 stop();
631 2 return 0;
632 }
633
634 2 thread_context_guard ctx(this);
635
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
636
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
637 2 }
638
639 void
640 5357 epoll_scheduler::
641 register_descriptor(int fd, descriptor_state* desc) const
642 {
643 5357 epoll_event ev{};
644 5357 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
645 5357 ev.data.ptr = desc;
646
647
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5357 times.
5357 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
648 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
649
650 5357 desc->registered_events = ev.events;
651 5357 desc->fd = fd;
652 5357 desc->scheduler_ = this;
653
654
1/1
✓ Branch 1 taken 5357 times.
5357 std::lock_guard lock(desc->mutex);
655 5357 desc->read_ready = false;
656 5357 desc->write_ready = false;
657 5357 }
658
659 void
660 5357 epoll_scheduler::
661 deregister_descriptor(int fd) const
662 {
663 5357 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
664 5357 }
665
666 void
667 5458 epoll_scheduler::
668 work_started() const noexcept
669 {
670 5458 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
671 5458 }
672
673 void
674 10053 epoll_scheduler::
675 work_finished() const noexcept
676 {
677
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 9905 times.
20106 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
678 {
679 // Last work item completed - wake all threads so they can exit.
680 // signal_all() wakes threads waiting on the condvar.
681 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
682 // Both are needed because they target different blocking mechanisms.
683 148 std::unique_lock lock(mutex_);
684 148 signal_all(lock);
685
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 147 times.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
148 if (task_running_ && !task_interrupted_)
686 {
687 1 task_interrupted_ = true;
688 1 lock.unlock();
689 1 interrupt_reactor();
690 }
691 148 }
692 10053 }
693
694 void
695 73149 epoll_scheduler::
696 compensating_work_started() const noexcept
697 {
698 73149 auto* ctx = find_context(this);
699
1/2
✓ Branch 0 taken 73149 times.
✗ Branch 1 not taken.
73149 if (ctx)
700 73149 ++ctx->private_outstanding_work;
701 73149 }
702
703 void
704 epoll_scheduler::
705 drain_thread_queue(op_queue& queue, long count) const
706 {
707 // Note: outstanding_work_ was already incremented when posting
708 std::unique_lock lock(mutex_);
709 completed_ops_.splice(queue);
710 if (count > 0)
711 maybe_unlock_and_signal_one(lock);
712 }
713
714 void
715 5333 epoll_scheduler::
716 post_deferred_completions(op_queue& ops) const
717 {
718
1/2
✓ Branch 1 taken 5333 times.
✗ Branch 2 not taken.
5333 if (ops.empty())
719 5333 return;
720
721 // Fast path: if on scheduler thread, use private queue
722 if (auto* ctx = find_context(this))
723 {
724 ctx->private_queue.splice(ops);
725 return;
726 }
727
728 // Slow path: add to global queue and wake a thread
729 std::unique_lock lock(mutex_);
730 completed_ops_.splice(ops);
731 wake_one_thread_and_unlock(lock);
732 }
733
734 void
735 236 epoll_scheduler::
736 interrupt_reactor() const
737 {
738 // Only write if not already armed to avoid redundant writes
739 236 bool expected = false;
740
2/2
✓ Branch 1 taken 222 times.
✓ Branch 2 taken 14 times.
236 if (eventfd_armed_.compare_exchange_strong(expected, true,
741 std::memory_order_release, std::memory_order_relaxed))
742 {
743 222 std::uint64_t val = 1;
744
1/1
✓ Branch 1 taken 222 times.
222 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
745 }
746 236 }
747
748 void
749 357 epoll_scheduler::
750 signal_all(std::unique_lock<std::mutex>&) const
751 {
752 357 state_ |= 1;
753 357 cond_.notify_all();
754 357 }
755
756 bool
757 46274 epoll_scheduler::
758 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
759 {
760 46274 state_ |= 1;
761
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46274 times.
46274 if (state_ > 1)
762 {
763 lock.unlock();
764 cond_.notify_one();
765 return true;
766 }
767 46274 return false;
768 }
769
770 void
771 302021 epoll_scheduler::
772 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
773 {
774 302021 state_ |= 1;
775 302021 bool have_waiters = state_ > 1;
776 302021 lock.unlock();
777
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 302021 times.
302021 if (have_waiters)
778 cond_.notify_one();
779 302021 }
780
781 void
782 epoll_scheduler::
783 clear_signal() const
784 {
785 state_ &= ~std::size_t(1);
786 }
787
788 void
789 epoll_scheduler::
790 wait_for_signal(std::unique_lock<std::mutex>& lock) const
791 {
792 while ((state_ & 1) == 0)
793 {
794 state_ += 2;
795 cond_.wait(lock);
796 state_ -= 2;
797 }
798 }
799
800 void
801 epoll_scheduler::
802 wait_for_signal_for(
803 std::unique_lock<std::mutex>& lock,
804 long timeout_us) const
805 {
806 if ((state_ & 1) == 0)
807 {
808 state_ += 2;
809 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
810 state_ -= 2;
811 }
812 }
813
814 void
815 1649 epoll_scheduler::
816 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
817 {
818
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
819 return;
820
821
3/4
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 1623 times.
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
1649 if (task_running_ && !task_interrupted_)
822 {
823 26 task_interrupted_ = true;
824 26 lock.unlock();
825 26 interrupt_reactor();
826 }
827 else
828 {
829 1623 lock.unlock();
830 }
831 }
832
833 /** RAII guard for handler execution work accounting.
834
835 Handler consumes 1 work item, may produce N new items via fast-path posts.
836 Net change = N - 1:
837 - If N > 1: add (N-1) to global (more work produced than consumed)
838 - If N == 1: net zero, do nothing
839 - If N < 1: call work_finished() (work consumed, may trigger stop)
840
841 Also drains private queue to global for other threads to process.
842 */
843 struct work_cleanup
844 {
845 epoll_scheduler const* scheduler;
846 std::unique_lock<std::mutex>* lock;
847 scheduler_context* ctx;
848
849 232223 ~work_cleanup()
850 {
851
1/2
✓ Branch 0 taken 232223 times.
✗ Branch 1 not taken.
232223 if (ctx)
852 {
853 232223 long produced = ctx->private_outstanding_work;
854
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 232177 times.
232223 if (produced > 1)
855 46 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
856
2/2
✓ Branch 0 taken 9896 times.
✓ Branch 1 taken 222281 times.
232177 else if (produced < 1)
857 9896 scheduler->work_finished();
858 // produced == 1: net zero, handler consumed what it produced
859 232223 ctx->private_outstanding_work = 0;
860
861
2/2
✓ Branch 1 taken 149178 times.
✓ Branch 2 taken 83045 times.
232223 if (!ctx->private_queue.empty())
862 {
863 149178 lock->lock();
864 149178 scheduler->completed_ops_.splice(ctx->private_queue);
865 }
866 }
867 else
868 {
869 // No thread context - slow-path op was already counted globally
870 scheduler->work_finished();
871 }
872 232223 }
873 };
874
875 /** RAII guard for reactor work accounting.
876
877 Reactor only produces work via timer/signal callbacks posting handlers.
878 Unlike handler execution which consumes 1, the reactor consumes nothing.
879 All produced work must be flushed to global counter.
880 */
881 struct task_cleanup
882 {
883 epoll_scheduler const* scheduler;
884 scheduler_context* ctx;
885
886 75329 ~task_cleanup()
887 {
888
3/4
✓ Branch 0 taken 75329 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2867 times.
✓ Branch 3 taken 72462 times.
75329 if (ctx && ctx->private_outstanding_work > 0)
889 {
890 2867 scheduler->outstanding_work_.fetch_add(
891 2867 ctx->private_outstanding_work, std::memory_order_relaxed);
892 2867 ctx->private_outstanding_work = 0;
893 }
894 75329 }
895 };
896
897 void
898 5752 epoll_scheduler::
899 update_timerfd() const
900 {
901 5752 auto nearest = timer_svc_->nearest_expiry();
902
903 5752 itimerspec ts{};
904 5752 int flags = 0;
905
906
3/3
✓ Branch 2 taken 5752 times.
✓ Branch 4 taken 5708 times.
✓ Branch 5 taken 44 times.
5752 if (nearest == timer_service::time_point::max())
907 {
908 // No timers - disarm by setting to 0 (relative)
909 }
910 else
911 {
912 5708 auto now = std::chrono::steady_clock::now();
913
3/3
✓ Branch 1 taken 5708 times.
✓ Branch 4 taken 37 times.
✓ Branch 5 taken 5671 times.
5708 if (nearest <= now)
914 {
915 // Use 1ns instead of 0 - zero disarms the timerfd
916 37 ts.it_value.tv_nsec = 1;
917 }
918 else
919 {
920 5671 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
921
1/1
✓ Branch 1 taken 5671 times.
11342 nearest - now).count();
922 5671 ts.it_value.tv_sec = nsec / 1000000000;
923 5671 ts.it_value.tv_nsec = nsec % 1000000000;
924 // Ensure non-zero to avoid disarming if duration rounds to 0
925
3/4
✓ Branch 0 taken 5659 times.
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5659 times.
5671 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
926 ts.it_value.tv_nsec = 1;
927 }
928 }
929
930
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5752 times.
5752 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
931 detail::throw_system_error(make_err(errno), "timerfd_settime");
932 5752 }
933
934 void
935 75329 epoll_scheduler::
936 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
937 {
938
2/2
✓ Branch 0 taken 69798 times.
✓ Branch 1 taken 5531 times.
75329 int timeout_ms = task_interrupted_ ? 0 : -1;
939
940
2/2
✓ Branch 1 taken 5531 times.
✓ Branch 2 taken 69798 times.
75329 if (lock.owns_lock())
941
1/1
✓ Branch 1 taken 5531 times.
5531 lock.unlock();
942
943 // Flush private work count when reactor completes
944 75329 task_cleanup on_exit{this, ctx};
945 (void)on_exit;
946
947 // Event loop runs without mutex held
948
949 epoll_event events[128];
950
1/1
✓ Branch 1 taken 75329 times.
75329 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
951 75329 int saved_errno = errno;
952
953
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 75329 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
75329 if (nfds < 0 && saved_errno != EINTR)
954 detail::throw_system_error(make_err(saved_errno), "epoll_wait");
955
956 75329 bool check_timers = false;
957 75329 op_queue local_ops;
958 75329 int completions_queued = 0;
959
960 // Process events without holding the mutex
961
2/2
✓ Branch 0 taken 81385 times.
✓ Branch 1 taken 75329 times.
156714 for (int i = 0; i < nfds; ++i)
962 {
963
2/2
✓ Branch 0 taken 33 times.
✓ Branch 1 taken 81352 times.
81385 if (events[i].data.ptr == nullptr)
964 {
965 std::uint64_t val;
966
1/1
✓ Branch 1 taken 33 times.
33 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
967 33 eventfd_armed_.store(false, std::memory_order_relaxed);
968 33 continue;
969 33 }
970
971
2/2
✓ Branch 0 taken 2870 times.
✓ Branch 1 taken 78482 times.
81352 if (events[i].data.ptr == &timer_fd_)
972 {
973 std::uint64_t expirations;
974
1/1
✓ Branch 1 taken 2870 times.
2870 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
975 2870 check_timers = true;
976 2870 continue;
977 2870 }
978
979 // Deferred I/O: just set ready events and enqueue descriptor
980 // No per-descriptor mutex locking in reactor hot path!
981 78482 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
982 78482 desc->add_ready_events(events[i].events);
983
984 // Only enqueue if not already enqueued
985 78482 bool expected = false;
986
1/2
✓ Branch 1 taken 78482 times.
✗ Branch 2 not taken.
78482 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
987 std::memory_order_release, std::memory_order_relaxed))
988 {
989 78482 local_ops.push(desc);
990 78482 ++completions_queued;
991 }
992 }
993
994 // Process timers only when timerfd fires
995
2/2
✓ Branch 0 taken 2870 times.
✓ Branch 1 taken 72459 times.
75329 if (check_timers)
996 {
997
1/1
✓ Branch 1 taken 2870 times.
2870 timer_svc_->process_expired();
998
1/1
✓ Branch 1 taken 2870 times.
2870 update_timerfd();
999 }
1000
1001 // --- Acquire mutex only for queue operations ---
1002
1/1
✓ Branch 1 taken 75329 times.
75329 lock.lock();
1003
1004
2/2
✓ Branch 1 taken 41759 times.
✓ Branch 2 taken 33570 times.
75329 if (!local_ops.empty())
1005 41759 completed_ops_.splice(local_ops);
1006
1007 // Drain private queue to global (work count handled by task_cleanup)
1008
5/6
✓ Branch 0 taken 75329 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2867 times.
✓ Branch 4 taken 72462 times.
✓ Branch 5 taken 2867 times.
✓ Branch 6 taken 72462 times.
75329 if (ctx && !ctx->private_queue.empty())
1009 {
1010 2867 completions_queued += ctx->private_outstanding_work;
1011 2867 completed_ops_.splice(ctx->private_queue);
1012 }
1013
1014 // Signal and wake one waiter if work is queued
1015
2/2
✓ Branch 0 taken 44625 times.
✓ Branch 1 taken 30704 times.
75329 if (completions_queued > 0)
1016 {
1017
2/3
✓ Branch 1 taken 44625 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 44625 times.
44625 if (maybe_unlock_and_signal_one(lock))
1018 lock.lock();
1019 }
1020 75329 }
1021
1022 std::size_t
1023 232368 epoll_scheduler::
1024 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1025 {
1026 for (;;)
1027 {
1028
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 307693 times.
307697 if (stopped_)
1029 4 return 0;
1030
1031 307693 scheduler_op* op = completed_ops_.pop();
1032
1033 // Handle reactor sentinel - time to poll for I/O
1034
2/2
✓ Branch 0 taken 75470 times.
✓ Branch 1 taken 232223 times.
307693 if (op == &task_op_)
1035 {
1036
3/4
✓ Branch 1 taken 5672 times.
✓ Branch 2 taken 69798 times.
✓ Branch 3 taken 5672 times.
✗ Branch 4 not taken.
81142 bool more_handlers = !completed_ops_.empty() ||
1037
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5672 times.
5672 (ctx && !ctx->private_queue.empty());
1038
1039 // Nothing to run the reactor for: no pending work to wait on,
1040 // or caller requested a non-blocking poll
1041
4/4
✓ Branch 0 taken 5672 times.
✓ Branch 1 taken 69798 times.
✓ Branch 2 taken 141 times.
✓ Branch 3 taken 75329 times.
81142 if (!more_handlers &&
1042
3/4
✓ Branch 1 taken 5531 times.
✓ Branch 2 taken 141 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5531 times.
11344 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1043 timeout_us == 0))
1044 {
1045 141 completed_ops_.push(&task_op_);
1046 141 return 0;
1047 }
1048
1049
3/4
✓ Branch 0 taken 5531 times.
✓ Branch 1 taken 69798 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5531 times.
75329 task_interrupted_ = more_handlers || timeout_us == 0;
1050 75329 task_running_ = true;
1051
1052
2/2
✓ Branch 0 taken 69798 times.
✓ Branch 1 taken 5531 times.
75329 if (more_handlers)
1053 69798 unlock_and_signal_one(lock);
1054
1055 75329 run_task(lock, ctx);
1056
1057 75329 task_running_ = false;
1058 75329 completed_ops_.push(&task_op_);
1059 75329 continue;
1060 75329 }
1061
1062 // Handle operation
1063
1/2
✓ Branch 0 taken 232223 times.
✗ Branch 1 not taken.
232223 if (op != nullptr)
1064 {
1065
1/2
✓ Branch 1 taken 232223 times.
✗ Branch 2 not taken.
232223 if (!completed_ops_.empty())
1066
1/1
✓ Branch 1 taken 232223 times.
232223 unlock_and_signal_one(lock);
1067 else
1068 lock.unlock();
1069
1070 232223 work_cleanup on_exit{this, &lock, ctx};
1071 (void)on_exit;
1072
1073
1/1
✓ Branch 1 taken 232223 times.
232223 (*op)();
1074 232223 return 1;
1075 232223 }
1076
1077 // No work from global queue - try private queue before blocking
1078 if (drain_private_queue(ctx, outstanding_work_, completed_ops_))
1079 continue;
1080
1081 // No pending work to wait on, or caller requested non-blocking poll
1082 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1083 timeout_us == 0)
1084 return 0;
1085
1086 clear_signal();
1087 if (timeout_us < 0)
1088 wait_for_signal(lock);
1089 else
1090 wait_for_signal_for(lock, timeout_us);
1091 75329 }
1092 }
1093
1094 } // namespace boost::corosio::detail
1095
1096 #endif
1097