libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

79.9% Lines (397/497) 89.6% Functions (43/48) 66.7% Branches (202/303)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106 int inline_budget;
107
108 183 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
109 183 : key(k)
110 183 , next(n)
111 183 , private_outstanding_work(0)
112 183 , inline_budget(0)
113 {
114 183 }
115 };
116
117 namespace {
118
119 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
120
121 struct thread_context_guard
122 {
123 scheduler_context frame_;
124
125 183 explicit thread_context_guard(
126 epoll_scheduler const* ctx) noexcept
127 183 : frame_(ctx, context_stack.get())
128 {
129 183 context_stack.set(&frame_);
130 183 }
131
132 183 ~thread_context_guard() noexcept
133 {
134
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 183 times.
183 if (!frame_.private_queue.empty())
135 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
136 183 context_stack.set(frame_.next);
137 183 }
138 };
139
140 scheduler_context*
141 446838 find_context(epoll_scheduler const* self) noexcept
142 {
143
2/2
✓ Branch 1 taken 445159 times.
✓ Branch 2 taken 1679 times.
446838 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
144
1/2
✓ Branch 0 taken 445159 times.
✗ Branch 1 not taken.
445159 if (c->key == self)
145 445159 return c;
146 1679 return nullptr;
147 }
148
149 } // namespace
150
151 void
152 84867 epoll_scheduler::
153 reset_inline_budget() const noexcept
154 {
155
1/2
✓ Branch 1 taken 84867 times.
✗ Branch 2 not taken.
84867 if (auto* ctx = find_context(this))
156 84867 ctx->inline_budget = max_inline_budget_;
157 84867 }
158
159 bool
160 224402 epoll_scheduler::
161 try_consume_inline_budget() const noexcept
162 {
163
1/2
✓ Branch 1 taken 224402 times.
✗ Branch 2 not taken.
224402 if (auto* ctx = find_context(this))
164 {
165
2/2
✓ Branch 0 taken 149670 times.
✓ Branch 1 taken 74732 times.
224402 if (ctx->inline_budget > 0)
166 {
167 149670 --ctx->inline_budget;
168 149670 return true;
169 }
170 }
171 74732 return false;
172 }
173
174 void
175 60463 descriptor_state::
176 operator()()
177 {
178 60463 is_enqueued_.store(false, std::memory_order_relaxed);
179
180 // Take ownership of impl ref set by close_socket() to prevent
181 // the owning impl from being freed while we're executing
182 60463 auto prevent_impl_destruction = std::move(impl_ref_);
183
184 60463 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60463 times.
60463 if (ev == 0)
186 {
187 scheduler_->compensating_work_started();
188 return;
189 }
190
191 60463 op_queue local_ops;
192
193 60463 int err = 0;
194
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 60462 times.
60463 if (ev & EPOLLERR)
195 {
196 1 socklen_t len = sizeof(err);
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
198 err = errno;
199
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
200 1 err = EIO;
201 }
202
203 {
204
1/1
✓ Branch 1 taken 60463 times.
60463 std::lock_guard lock(mutex);
205
2/2
✓ Branch 0 taken 18072 times.
✓ Branch 1 taken 42391 times.
60463 if (ev & EPOLLIN)
206 {
207
2/2
✓ Branch 0 taken 5012 times.
✓ Branch 1 taken 13060 times.
18072 if (read_op)
208 {
209 5012 auto* rd = read_op;
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5012 times.
5012 if (err)
211 rd->complete(err, 0);
212 else
213 5012 rd->perform_io();
214
215
2/4
✓ Branch 0 taken 5012 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5012 times.
5012 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
216 {
217 rd->errn = 0;
218 }
219 else
220 {
221 5012 read_op = nullptr;
222 5012 local_ops.push(rd);
223 }
224 }
225 else
226 {
227 13060 read_ready = true;
228 }
229 }
230
2/2
✓ Branch 0 taken 55503 times.
✓ Branch 1 taken 4960 times.
60463 if (ev & EPOLLOUT)
231 {
232
3/4
✓ Branch 0 taken 50540 times.
✓ Branch 1 taken 4963 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 50540 times.
55503 bool had_write_op = (connect_op || write_op);
233
2/2
✓ Branch 0 taken 4963 times.
✓ Branch 1 taken 50540 times.
55503 if (connect_op)
234 {
235 4963 auto* cn = connect_op;
236
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4963 times.
4963 if (err)
237 cn->complete(err, 0);
238 else
239 4963 cn->perform_io();
240 4963 connect_op = nullptr;
241 4963 local_ops.push(cn);
242 }
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 55503 times.
55503 if (write_op)
244 {
245 auto* wr = write_op;
246 if (err)
247 wr->complete(err, 0);
248 else
249 wr->perform_io();
250
251 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
252 {
253 wr->errn = 0;
254 }
255 else
256 {
257 write_op = nullptr;
258 local_ops.push(wr);
259 }
260 }
261
2/2
✓ Branch 0 taken 50540 times.
✓ Branch 1 taken 4963 times.
55503 if (!had_write_op)
262 50540 write_ready = true;
263 }
264
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 60462 times.
60463 if (err)
265 {
266
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
267 {
268 read_op->complete(err, 0);
269 local_ops.push(std::exchange(read_op, nullptr));
270 }
271
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
272 {
273 write_op->complete(err, 0);
274 local_ops.push(std::exchange(write_op, nullptr));
275 }
276
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
277 {
278 connect_op->complete(err, 0);
279 local_ops.push(std::exchange(connect_op, nullptr));
280 }
281 }
282 60463 }
283
284 // Execute first handler inline — the scheduler's work_cleanup
285 // accounts for this as the "consumed" work item
286 60463 scheduler_op* first = local_ops.pop();
287
2/2
✓ Branch 0 taken 9975 times.
✓ Branch 1 taken 50488 times.
60463 if (first)
288 {
289
1/1
✓ Branch 1 taken 9975 times.
9975 scheduler_->post_deferred_completions(local_ops);
290
1/1
✓ Branch 1 taken 9975 times.
9975 (*first)();
291 }
292 else
293 {
294 50488 scheduler_->compensating_work_started();
295 }
296 60463 }
297
298 203 epoll_scheduler::
299 epoll_scheduler(
300 capy::execution_context& ctx,
301 203 int)
302 203 : epoll_fd_(-1)
303 203 , event_fd_(-1)
304 203 , timer_fd_(-1)
305 203 , outstanding_work_(0)
306 203 , stopped_(false)
307 203 , shutdown_(false)
308 203 , task_running_{false}
309 203 , task_interrupted_(false)
310 406 , state_(0)
311 {
312 203 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (epoll_fd_ < 0)
314 detail::throw_system_error(make_err(errno), "epoll_create1");
315
316 203 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (event_fd_ < 0)
318 {
319 int errn = errno;
320 ::close(epoll_fd_);
321 detail::throw_system_error(make_err(errn), "eventfd");
322 }
323
324 203 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
325
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (timer_fd_ < 0)
326 {
327 int errn = errno;
328 ::close(event_fd_);
329 ::close(epoll_fd_);
330 detail::throw_system_error(make_err(errn), "timerfd_create");
331 }
332
333 203 epoll_event ev{};
334 203 ev.events = EPOLLIN | EPOLLET;
335 203 ev.data.ptr = nullptr;
336
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
337 {
338 int errn = errno;
339 ::close(timer_fd_);
340 ::close(event_fd_);
341 ::close(epoll_fd_);
342 detail::throw_system_error(make_err(errn), "epoll_ctl");
343 }
344
345 203 epoll_event timer_ev{};
346 203 timer_ev.events = EPOLLIN | EPOLLERR;
347 203 timer_ev.data.ptr = &timer_fd_;
348
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
349 {
350 int errn = errno;
351 ::close(timer_fd_);
352 ::close(event_fd_);
353 ::close(epoll_fd_);
354 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
355 }
356
357
1/1
✓ Branch 1 taken 203 times.
203 timer_svc_ = &get_timer_service(ctx, *this);
358
1/1
✓ Branch 3 taken 203 times.
203 timer_svc_->set_on_earliest_changed(
359 timer_service::callback(
360 this,
361 [](void* p) {
362 5175 auto* self = static_cast<epoll_scheduler*>(p);
363 5175 self->timerfd_stale_.store(true, std::memory_order_release);
364
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5175 times.
5175 if (self->task_running_.load(std::memory_order_acquire))
365 self->interrupt_reactor();
366 5175 }));
367
368 // Initialize resolver service
369
1/1
✓ Branch 1 taken 203 times.
203 get_resolver_service(ctx, *this);
370
371 // Initialize signal service
372
1/1
✓ Branch 1 taken 203 times.
203 get_signal_service(ctx, *this);
373
374 // Push task sentinel to interleave reactor runs with handler execution
375 203 completed_ops_.push(&task_op_);
376 203 }
377
378 406 epoll_scheduler::
379 203 ~epoll_scheduler()
380 {
381
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (timer_fd_ >= 0)
382 203 ::close(timer_fd_);
383
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
384 203 ::close(event_fd_);
385
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (epoll_fd_ >= 0)
386 203 ::close(epoll_fd_);
387 406 }
388
389 void
390 203 epoll_scheduler::
391 shutdown()
392 {
393 {
394
1/1
✓ Branch 1 taken 203 times.
203 std::unique_lock lock(mutex_);
395 203 shutdown_ = true;
396
397
2/2
✓ Branch 1 taken 203 times.
✓ Branch 2 taken 203 times.
406 while (auto* h = completed_ops_.pop())
398 {
399
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (h == &task_op_)
400 203 continue;
401 lock.unlock();
402 h->destroy();
403 lock.lock();
404 203 }
405
406 203 signal_all(lock);
407 203 }
408
409 203 outstanding_work_.store(0, std::memory_order_release);
410
411
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
412 203 interrupt_reactor();
413 203 }
414
415 void
416 6962 epoll_scheduler::
417 post(std::coroutine_handle<> h) const
418 {
419 struct post_handler final
420 : scheduler_op
421 {
422 std::coroutine_handle<> h_;
423
424 explicit
425 6962 post_handler(std::coroutine_handle<> h)
426 6962 : h_(h)
427 {
428 6962 }
429
430 13924 ~post_handler() = default;
431
432 6962 void operator()() override
433 {
434 6962 auto h = h_;
435
1/2
✓ Branch 0 taken 6962 times.
✗ Branch 1 not taken.
6962 delete this;
436
1/1
✓ Branch 1 taken 6962 times.
6962 h.resume();
437 6962 }
438
439 void destroy() override
440 {
441 delete this;
442 }
443 };
444
445
1/1
✓ Branch 1 taken 6962 times.
6962 auto ph = std::make_unique<post_handler>(h);
446
447 // Fast path: same thread posts to private queue
448 // Only count locally; work_cleanup batches to global counter
449
2/2
✓ Branch 1 taken 5309 times.
✓ Branch 2 taken 1653 times.
6962 if (auto* ctx = find_context(this))
450 {
451 5309 ++ctx->private_outstanding_work;
452 5309 ctx->private_queue.push(ph.release());
453 5309 return;
454 }
455
456 // Slow path: cross-thread post requires mutex
457 1653 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
458
459
1/1
✓ Branch 1 taken 1653 times.
1653 std::unique_lock lock(mutex_);
460 1653 completed_ops_.push(ph.release());
461
1/1
✓ Branch 1 taken 1653 times.
1653 wake_one_thread_and_unlock(lock);
462 6962 }
463
464 void
465 80119 epoll_scheduler::
466 post(scheduler_op* h) const
467 {
468 // Fast path: same thread posts to private queue
469 // Only count locally; work_cleanup batches to global counter
470
2/2
✓ Branch 1 taken 80093 times.
✓ Branch 2 taken 26 times.
80119 if (auto* ctx = find_context(this))
471 {
472 80093 ++ctx->private_outstanding_work;
473 80093 ctx->private_queue.push(h);
474 80093 return;
475 }
476
477 // Slow path: cross-thread post requires mutex
478 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
479
480
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
481 26 completed_ops_.push(h);
482
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
483 26 }
484
485 void
486 5761 epoll_scheduler::
487 on_work_started() noexcept
488 {
489 5761 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
490 5761 }
491
492 void
493 5729 epoll_scheduler::
494 on_work_finished() noexcept
495 {
496
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5729 times.
11458 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
497 stop();
498 5729 }
499
500 bool
501 150228 epoll_scheduler::
502 running_in_this_thread() const noexcept
503 {
504
2/2
✓ Branch 1 taken 149988 times.
✓ Branch 2 taken 240 times.
150228 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
505
1/2
✓ Branch 0 taken 149988 times.
✗ Branch 1 not taken.
149988 if (c->key == this)
506 149988 return true;
507 240 return false;
508 }
509
510 void
511 43 epoll_scheduler::
512 stop()
513 {
514
1/1
✓ Branch 1 taken 43 times.
43 std::unique_lock lock(mutex_);
515
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 21 times.
43 if (!stopped_)
516 {
517 22 stopped_ = true;
518 22 signal_all(lock);
519
1/1
✓ Branch 1 taken 22 times.
22 interrupt_reactor();
520 }
521 43 }
522
523 bool
524 18 epoll_scheduler::
525 stopped() const noexcept
526 {
527 18 std::unique_lock lock(mutex_);
528 36 return stopped_;
529 18 }
530
531 void
532 49 epoll_scheduler::
533 restart()
534 {
535
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
536 49 stopped_ = false;
537 49 }
538
539 std::size_t
540 183 epoll_scheduler::
541 run()
542 {
543
2/2
✓ Branch 1 taken 32 times.
✓ Branch 2 taken 151 times.
366 if (outstanding_work_.load(std::memory_order_acquire) == 0)
544 {
545
1/1
✓ Branch 1 taken 32 times.
32 stop();
546 32 return 0;
547 }
548
549 151 thread_context_guard ctx(this);
550
1/1
✓ Branch 1 taken 151 times.
151 std::unique_lock lock(mutex_);
551
552 151 std::size_t n = 0;
553 for (;;)
554 {
555
3/3
✓ Branch 1 taken 147662 times.
✓ Branch 3 taken 151 times.
✓ Branch 4 taken 147511 times.
147662 if (!do_one(lock, -1, &ctx.frame_))
556 151 break;
557
1/2
✓ Branch 1 taken 147511 times.
✗ Branch 2 not taken.
147511 if (n != (std::numeric_limits<std::size_t>::max)())
558 147511 ++n;
559
2/2
✓ Branch 1 taken 67301 times.
✓ Branch 2 taken 80210 times.
147511 if (!lock.owns_lock())
560
1/1
✓ Branch 1 taken 67301 times.
67301 lock.lock();
561 }
562 151 return n;
563 151 }
564
565 std::size_t
566 2 epoll_scheduler::
567 run_one()
568 {
569
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
570 {
571 stop();
572 return 0;
573 }
574
575 2 thread_context_guard ctx(this);
576
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
577
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
578 2 }
579
580 std::size_t
581 34 epoll_scheduler::
582 wait_one(long usec)
583 {
584
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 27 times.
68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
585 {
586
1/1
✓ Branch 1 taken 7 times.
7 stop();
587 7 return 0;
588 }
589
590 27 thread_context_guard ctx(this);
591
1/1
✓ Branch 1 taken 27 times.
27 std::unique_lock lock(mutex_);
592
1/1
✓ Branch 1 taken 27 times.
27 return do_one(lock, usec, &ctx.frame_);
593 27 }
594
595 std::size_t
596 2 epoll_scheduler::
597 poll()
598 {
599
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
600 {
601
1/1
✓ Branch 1 taken 1 time.
1 stop();
602 1 return 0;
603 }
604
605 1 thread_context_guard ctx(this);
606
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
607
608 1 std::size_t n = 0;
609 for (;;)
610 {
611
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
612 1 break;
613
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
614 2 ++n;
615
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
616
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
617 }
618 1 return n;
619 1 }
620
621 std::size_t
622 4 epoll_scheduler::
623 poll_one()
624 {
625
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
626 {
627
1/1
✓ Branch 1 taken 2 times.
2 stop();
628 2 return 0;
629 }
630
631 2 thread_context_guard ctx(this);
632
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
633
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
634 2 }
635
636 void
637 9998 epoll_scheduler::
638 register_descriptor(int fd, descriptor_state* desc) const
639 {
640 9998 epoll_event ev{};
641 9998 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
642 9998 ev.data.ptr = desc;
643
644
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9998 times.
9998 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
645 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
646
647 9998 desc->registered_events = ev.events;
648 9998 desc->fd = fd;
649 9998 desc->scheduler_ = this;
650
651
1/1
✓ Branch 1 taken 9998 times.
9998 std::lock_guard lock(desc->mutex);
652 9998 desc->read_ready = false;
653 9998 desc->write_ready = false;
654 9998 }
655
656 void
657 9998 epoll_scheduler::
658 deregister_descriptor(int fd) const
659 {
660 9998 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
661 9998 }
662
663 void
664 10133 epoll_scheduler::
665 work_started() const noexcept
666 {
667 10133 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
668 10133 }
669
670 void
671 17025 epoll_scheduler::
672 work_finished() const noexcept
673 {
674
2/2
✓ Branch 0 taken 158 times.
✓ Branch 1 taken 16867 times.
34050 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
675 {
676 // Last work item completed - wake all threads so they can exit.
677 // signal_all() wakes threads waiting on the condvar.
678 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
679 // Both are needed because they target different blocking mechanisms.
680 158 std::unique_lock lock(mutex_);
681 158 signal_all(lock);
682
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 158 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 158 times.
158 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
683 {
684 task_interrupted_ = true;
685 lock.unlock();
686 interrupt_reactor();
687 }
688 158 }
689 17025 }
690
691 void
692 50488 epoll_scheduler::
693 compensating_work_started() const noexcept
694 {
695 50488 auto* ctx = find_context(this);
696
1/2
✓ Branch 0 taken 50488 times.
✗ Branch 1 not taken.
50488 if (ctx)
697 50488 ++ctx->private_outstanding_work;
698 50488 }
699
700 void
701 epoll_scheduler::
702 drain_thread_queue(op_queue& queue, long count) const
703 {
704 // Note: outstanding_work_ was already incremented when posting
705 std::unique_lock lock(mutex_);
706 completed_ops_.splice(queue);
707 if (count > 0)
708 maybe_unlock_and_signal_one(lock);
709 }
710
711 void
712 9975 epoll_scheduler::
713 post_deferred_completions(op_queue& ops) const
714 {
715
1/2
✓ Branch 1 taken 9975 times.
✗ Branch 2 not taken.
9975 if (ops.empty())
716 9975 return;
717
718 // Fast path: if on scheduler thread, use private queue
719 if (auto* ctx = find_context(this))
720 {
721 ctx->private_queue.splice(ops);
722 return;
723 }
724
725 // Slow path: add to global queue and wake a thread
726 std::unique_lock lock(mutex_);
727 completed_ops_.splice(ops);
728 wake_one_thread_and_unlock(lock);
729 }
730
731 void
732 251 epoll_scheduler::
733 interrupt_reactor() const
734 {
735 // Only write if not already armed to avoid redundant writes
736 251 bool expected = false;
737
2/2
✓ Branch 1 taken 235 times.
✓ Branch 2 taken 16 times.
251 if (eventfd_armed_.compare_exchange_strong(expected, true,
738 std::memory_order_release, std::memory_order_relaxed))
739 {
740 235 std::uint64_t val = 1;
741
1/1
✓ Branch 1 taken 235 times.
235 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
742 }
743 251 }
744
745 void
746 383 epoll_scheduler::
747 signal_all(std::unique_lock<std::mutex>&) const
748 {
749 383 state_ |= 1;
750 383 cond_.notify_all();
751 383 }
752
753 bool
754 1679 epoll_scheduler::
755 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
756 {
757 1679 state_ |= 1;
758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1679 times.
1679 if (state_ > 1)
759 {
760 lock.unlock();
761 cond_.notify_one();
762 return true;
763 }
764 1679 return false;
765 }
766
767 void
768 185188 epoll_scheduler::
769 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
770 {
771 185188 state_ |= 1;
772 185188 bool have_waiters = state_ > 1;
773 185188 lock.unlock();
774
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 185188 times.
185188 if (have_waiters)
775 cond_.notify_one();
776 185188 }
777
778 void
779 epoll_scheduler::
780 clear_signal() const
781 {
782 state_ &= ~std::size_t(1);
783 }
784
785 void
786 epoll_scheduler::
787 wait_for_signal(std::unique_lock<std::mutex>& lock) const
788 {
789 while ((state_ & 1) == 0)
790 {
791 state_ += 2;
792 cond_.wait(lock);
793 state_ -= 2;
794 }
795 }
796
797 void
798 epoll_scheduler::
799 wait_for_signal_for(
800 std::unique_lock<std::mutex>& lock,
801 long timeout_us) const
802 {
803 if ((state_ & 1) == 0)
804 {
805 state_ += 2;
806 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
807 state_ -= 2;
808 }
809 }
810
811 void
812 1679 epoll_scheduler::
813 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
814 {
815
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1679 times.
1679 if (maybe_unlock_and_signal_one(lock))
816 return;
817
818
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1653 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1653 times.
1679 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
819 {
820 26 task_interrupted_ = true;
821 26 lock.unlock();
822 26 interrupt_reactor();
823 }
824 else
825 {
826 1653 lock.unlock();
827 }
828 }
829
830 /** RAII guard for handler execution work accounting.
831
832 Handler consumes 1 work item, may produce N new items via fast-path posts.
833 Net change = N - 1:
834 - If N > 1: add (N-1) to global (more work produced than consumed)
835 - If N == 1: net zero, do nothing
836 - If N < 1: call work_finished() (work consumed, may trigger stop)
837
838 Also drains private queue to global for other threads to process.
839 */
840 struct work_cleanup
841 {
842 epoll_scheduler const* scheduler;
843 std::unique_lock<std::mutex>* lock;
844 scheduler_context* ctx;
845
846 147544 ~work_cleanup()
847 {
848
1/2
✓ Branch 0 taken 147544 times.
✗ Branch 1 not taken.
147544 if (ctx)
849 {
850 147544 long produced = ctx->private_outstanding_work;
851
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 147537 times.
147544 if (produced > 1)
852 7 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
853
2/2
✓ Branch 0 taken 16835 times.
✓ Branch 1 taken 130702 times.
147537 else if (produced < 1)
854 16835 scheduler->work_finished();
855 // produced == 1: net zero, handler consumed what it produced
856 147544 ctx->private_outstanding_work = 0;
857
858
2/2
✓ Branch 1 taken 80221 times.
✓ Branch 2 taken 67323 times.
147544 if (!ctx->private_queue.empty())
859 {
860 80221 lock->lock();
861 80221 scheduler->completed_ops_.splice(ctx->private_queue);
862 }
863 }
864 else
865 {
866 // No thread context - slow-path op was already counted globally
867 scheduler->work_finished();
868 }
869 147544 }
870 };
871
872 /** RAII guard for reactor work accounting.
873
874 Reactor only produces work via timer/signal callbacks posting handlers.
875 Unlike handler execution which consumes 1, the reactor consumes nothing.
876 All produced work must be flushed to global counter.
877 */
878 struct task_cleanup
879 {
880 epoll_scheduler const* scheduler;
881 std::unique_lock<std::mutex>* lock;
882 scheduler_context* ctx;
883
884 47850 ~task_cleanup()
885 47850 {
886
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 47850 times.
47850 if (!ctx)
887 return;
888
889
2/2
✓ Branch 0 taken 5169 times.
✓ Branch 1 taken 42681 times.
47850 if (ctx->private_outstanding_work > 0)
890 {
891 5169 scheduler->outstanding_work_.fetch_add(
892 5169 ctx->private_outstanding_work, std::memory_order_relaxed);
893 5169 ctx->private_outstanding_work = 0;
894 }
895
896
2/2
✓ Branch 1 taken 5169 times.
✓ Branch 2 taken 42681 times.
47850 if (!ctx->private_queue.empty())
897 {
898
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5169 times.
5169 if (!lock->owns_lock())
899 lock->lock();
900 5169 scheduler->completed_ops_.splice(ctx->private_queue);
901 }
902 47850 }
903 };
904
905 void
906 10332 epoll_scheduler::
907 update_timerfd() const
908 {
909 10332 auto nearest = timer_svc_->nearest_expiry();
910
911 10332 itimerspec ts{};
912 10332 int flags = 0;
913
914
3/3
✓ Branch 2 taken 10332 times.
✓ Branch 4 taken 10288 times.
✓ Branch 5 taken 44 times.
10332 if (nearest == timer_service::time_point::max())
915 {
916 // No timers - disarm by setting to 0 (relative)
917 }
918 else
919 {
920 10288 auto now = std::chrono::steady_clock::now();
921
3/3
✓ Branch 1 taken 10288 times.
✓ Branch 4 taken 54 times.
✓ Branch 5 taken 10234 times.
10288 if (nearest <= now)
922 {
923 // Use 1ns instead of 0 - zero disarms the timerfd
924 54 ts.it_value.tv_nsec = 1;
925 }
926 else
927 {
928 10234 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
929
1/1
✓ Branch 1 taken 10234 times.
20468 nearest - now).count();
930 10234 ts.it_value.tv_sec = nsec / 1000000000;
931 10234 ts.it_value.tv_nsec = nsec % 1000000000;
932 // Ensure non-zero to avoid disarming if duration rounds to 0
933
3/4
✓ Branch 0 taken 10223 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 10223 times.
10234 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
934 ts.it_value.tv_nsec = 1;
935 }
936 }
937
938
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10332 times.
10332 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
939 detail::throw_system_error(make_err(errno), "timerfd_settime");
940 10332 }
941
942 void
943 47850 epoll_scheduler::
944 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
945 {
946
2/2
✓ Branch 0 taken 37644 times.
✓ Branch 1 taken 10206 times.
47850 int timeout_ms = task_interrupted_ ? 0 : -1;
947
948
2/2
✓ Branch 1 taken 10206 times.
✓ Branch 2 taken 37644 times.
47850 if (lock.owns_lock())
949
1/1
✓ Branch 1 taken 10206 times.
10206 lock.unlock();
950
951 47850 task_cleanup on_exit{this, &lock, ctx};
952
953 // Flush deferred timerfd programming before blocking
954
2/2
✓ Branch 1 taken 5163 times.
✓ Branch 2 taken 42687 times.
47850 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
955
1/1
✓ Branch 1 taken 5163 times.
5163 update_timerfd();
956
957 // Event loop runs without mutex held
958 epoll_event events[128];
959
1/1
✓ Branch 1 taken 47850 times.
47850 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
960
961
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 47850 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
47850 if (nfds < 0 && errno != EINTR)
962 detail::throw_system_error(make_err(errno), "epoll_wait");
963
964 47850 bool check_timers = false;
965 47850 op_queue local_ops;
966
967 // Process events without holding the mutex
968
2/2
✓ Branch 0 taken 65664 times.
✓ Branch 1 taken 47850 times.
113514 for (int i = 0; i < nfds; ++i)
969 {
970
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 65632 times.
65664 if (events[i].data.ptr == nullptr)
971 {
972 std::uint64_t val;
973
1/1
✓ Branch 1 taken 32 times.
32 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
974 32 eventfd_armed_.store(false, std::memory_order_relaxed);
975 32 continue;
976 32 }
977
978
2/2
✓ Branch 0 taken 5169 times.
✓ Branch 1 taken 60463 times.
65632 if (events[i].data.ptr == &timer_fd_)
979 {
980 std::uint64_t expirations;
981
1/1
✓ Branch 1 taken 5169 times.
5169 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
982 5169 check_timers = true;
983 5169 continue;
984 5169 }
985
986 // Deferred I/O: just set ready events and enqueue descriptor
987 // No per-descriptor mutex locking in reactor hot path!
988 60463 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
989 60463 desc->add_ready_events(events[i].events);
990
991 // Only enqueue if not already enqueued
992 60463 bool expected = false;
993
1/2
✓ Branch 1 taken 60463 times.
✗ Branch 2 not taken.
60463 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
994 std::memory_order_release, std::memory_order_relaxed))
995 {
996 60463 local_ops.push(desc);
997 }
998 }
999
1000 // Process timers only when timerfd fires
1001
2/2
✓ Branch 0 taken 5169 times.
✓ Branch 1 taken 42681 times.
47850 if (check_timers)
1002 {
1003
1/1
✓ Branch 1 taken 5169 times.
5169 timer_svc_->process_expired();
1004
1/1
✓ Branch 1 taken 5169 times.
5169 update_timerfd();
1005 }
1006
1007
1/1
✓ Branch 1 taken 47850 times.
47850 lock.lock();
1008
1009
2/2
✓ Branch 1 taken 37192 times.
✓ Branch 2 taken 10658 times.
47850 if (!local_ops.empty())
1010 37192 completed_ops_.splice(local_ops);
1011 47850 }
1012
1013 std::size_t
1014 147696 epoll_scheduler::
1015 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1016 {
1017 for (;;)
1018 {
1019
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 195545 times.
195546 if (stopped_)
1020 1 return 0;
1021
1022 195545 scheduler_op* op = completed_ops_.pop();
1023
1024 // Handle reactor sentinel - time to poll for I/O
1025
2/2
✓ Branch 0 taken 48001 times.
✓ Branch 1 taken 147544 times.
195545 if (op == &task_op_)
1026 {
1027 48001 bool more_handlers = !completed_ops_.empty();
1028
1029 // Nothing to run the reactor for: no pending work to wait on,
1030 // or caller requested a non-blocking poll
1031
4/4
✓ Branch 0 taken 10357 times.
✓ Branch 1 taken 37644 times.
✓ Branch 2 taken 151 times.
✓ Branch 3 taken 47850 times.
58358 if (!more_handlers &&
1032
3/4
✓ Branch 1 taken 10206 times.
✓ Branch 2 taken 151 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 10206 times.
20714 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1033 timeout_us == 0))
1034 {
1035 151 completed_ops_.push(&task_op_);
1036 151 return 0;
1037 }
1038
1039
3/4
✓ Branch 0 taken 10206 times.
✓ Branch 1 taken 37644 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 10206 times.
47850 task_interrupted_ = more_handlers || timeout_us == 0;
1040 47850 task_running_.store(true, std::memory_order_release);
1041
1042
2/2
✓ Branch 0 taken 37644 times.
✓ Branch 1 taken 10206 times.
47850 if (more_handlers)
1043 37644 unlock_and_signal_one(lock);
1044
1045 47850 run_task(lock, ctx);
1046
1047 47850 task_running_.store(false, std::memory_order_relaxed);
1048 47850 completed_ops_.push(&task_op_);
1049 47850 continue;
1050 47850 }
1051
1052 // Handle operation
1053
1/2
✓ Branch 0 taken 147544 times.
✗ Branch 1 not taken.
147544 if (op != nullptr)
1054 {
1055
1/2
✓ Branch 1 taken 147544 times.
✗ Branch 2 not taken.
147544 if (!completed_ops_.empty())
1056
1/1
✓ Branch 1 taken 147544 times.
147544 unlock_and_signal_one(lock);
1057 else
1058 lock.unlock();
1059
1060 147544 work_cleanup on_exit{this, &lock, ctx};
1061
1062
1/1
✓ Branch 1 taken 147544 times.
147544 (*op)();
1063 147544 return 1;
1064 147544 }
1065
1066 // No pending work to wait on, or caller requested non-blocking poll
1067 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1068 timeout_us == 0)
1069 return 0;
1070
1071 clear_signal();
1072 if (timeout_us < 0)
1073 wait_for_signal(lock);
1074 else
1075 wait_for_signal_for(lock, timeout_us);
1076 47850 }
1077 }
1078
1079 } // namespace boost::corosio::detail
1080
1081 #endif
1082