libs/corosio/src/corosio/src/detail/timer_service.cpp

85.4% Lines (309/362) 93.0% Functions (40/43) 69.3% Branches (122/176)
libs/corosio/src/corosio/src/detail/timer_service.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include "src/detail/timer_service.hpp"
11 #include "src/detail/scheduler_impl.hpp"
12
13 #include <boost/corosio/basic_io_context.hpp>
14 #include <boost/corosio/detail/thread_local_ptr.hpp>
15 #include "src/detail/scheduler_op.hpp"
16 #include "src/detail/intrusive.hpp"
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/executor_ref.hpp>
19 #include <system_error>
20
21 #include <atomic>
22 #include <coroutine>
23 #include <limits>
24 #include <mutex>
25 #include <optional>
26 #include <stop_token>
27 #include <vector>
28
29 /*
30 Timer Service
31 =============
32
33 The public timer class holds an opaque timer_impl* and forwards
34 all operations through extern free functions defined at the bottom
35 of this file.
36
37 Data Structures
38 ---------------
39 waiter_node holds per-waiter state: coroutine handle, executor,
40 error output, stop_token, embedded completion_op. Each concurrent
41 co_await t.wait() allocates one waiter_node.
42
43 timer_impl holds per-timer state: expiry, heap index, and an
44 intrusive_list of waiter_nodes. Multiple coroutines can wait on
45 the same timer simultaneously.
46
47 timer_service_impl owns a min-heap of active timers, a free list
48 of recycled impls, and a free list of recycled waiter_nodes. The
49 heap is ordered by expiry time; the scheduler queries
50 nearest_expiry() to set the epoll/timerfd timeout.
51
52 Optimization Strategy
53 ---------------------
54 The common timer lifecycle is: construct, set expiry, cancel or
55 wait, destroy. Several optimizations target this path:
56
57 1. Deferred heap insertion — expires_after() stores the expiry
58 but does not insert into the heap. Insertion happens in
59 wait(). If the timer is cancelled or destroyed before wait(),
60 the heap is never touched and no mutex is taken. This also
61 enables the already-expired fast path: when wait() sees
62 expiry <= now before inserting, it posts the coroutine
63 handle to the executor and returns noop_coroutine — no
64 heap, no mutex, no epoll. This is only possible because
65 the coroutine API guarantees wait() always follows
66 expires_after(); callback APIs cannot assume this call
67 order.
68
69 2. Thread-local impl cache — A single-slot per-thread cache of
70 timer_impl avoids the mutex on create/destroy for the common
71 create-then-destroy-on-same-thread pattern. On pop, if the
72 cached impl's svc_ doesn't match the current service, the
73 stale impl is deleted eagerly rather than reused.
74
75 3. Embedded completion_op — Each waiter_node embeds a
76 scheduler_op subclass, eliminating heap allocation per
77 fire/cancel. Its destroy() is a no-op since the waiter_node
78 owns the lifetime.
79
80 4. Cached nearest expiry — An atomic<int64_t> mirrors the heap
81 root's time, updated under the lock. nearest_expiry() and
82 empty() read the atomic without locking.
83
84 5. might_have_pending_waits_ flag — Set on wait(), cleared on
85 cancel. Lets cancel_timer() return without locking when no
86 wait was ever issued.
87
88 6. Thread-local waiter cache — Single-slot per-thread cache of
89 waiter_node avoids the free-list mutex for the common
90 wait-then-complete-on-same-thread pattern.
91
92 With all fast paths hit (idle timer, same thread), the
93 schedule/cancel cycle takes zero mutex locks.
94
95 Concurrency
96 -----------
97 stop_token callbacks can fire from any thread. The impl_
98 pointer on waiter_node is used as a "still in list" marker:
99 set to nullptr under the mutex when a waiter is removed by
100 cancel_timer() or process_expired(). cancel_waiter() checks
101 this under the mutex to avoid double-removal races.
102
103 Multiple io_contexts in the same program are safe. The
104 service pointer is obtained directly from the scheduler,
105 and TL-cached impls are validated by comparing svc_ against
106 the current service pointer. Waiter nodes have no service
107 affinity and can safely migrate between contexts.
108 */
109
110 namespace boost::corosio::detail {
111
112 class timer_service_impl;
113 struct timer_impl;
114 struct waiter_node;
115
116 void timer_service_invalidate_cache() noexcept;
117
118 struct waiter_node
119 : intrusive_list<waiter_node>::node
120 {
121 // Embedded completion op — avoids heap allocation per fire/cancel
122 struct completion_op final : scheduler_op
123 {
124 waiter_node* waiter_ = nullptr;
125
126 static void do_complete(
127 void* owner,
128 scheduler_op* base,
129 std::uint32_t,
130 std::uint32_t);
131
132 142 completion_op() noexcept
133 142 : scheduler_op(&do_complete)
134 {
135 142 }
136
137 void operator()() override;
138 // No-op — lifetime owned by waiter_node, not the scheduler queue
139 void destroy() override {}
140 };
141
142 // Per-waiter stop_token cancellation
143 struct canceller
144 {
145 waiter_node* waiter_;
146 void operator()() const;
147 };
148
149 // nullptr once removed from timer's waiter list (concurrency marker)
150 timer_impl* impl_ = nullptr;
151 timer_service_impl* svc_ = nullptr;
152 std::coroutine_handle<> h_;
153 capy::executor_ref d_;
154 std::error_code* ec_out_ = nullptr;
155 std::stop_token token_;
156 std::optional<std::stop_callback<canceller>> stop_cb_;
157 completion_op op_;
158 std::error_code ec_value_;
159 waiter_node* next_free_ = nullptr;
160
161 142 waiter_node() noexcept
162 142 {
163 142 op_.waiter_ = this;
164 142 }
165 };
166
167 struct timer_impl
168 : timer::timer_impl
169 {
170 using clock_type = std::chrono::steady_clock;
171 using time_point = clock_type::time_point;
172 using duration = clock_type::duration;
173
174 timer_service_impl* svc_ = nullptr;
175 intrusive_list<waiter_node> waiters_;
176
177 // Free list linkage (reused when impl is on free_list)
178 timer_impl* next_free_ = nullptr;
179
180 explicit timer_impl(timer_service_impl& svc) noexcept;
181
182
183 void release() override;
184
185 std::coroutine_handle<> wait(
186 std::coroutine_handle<>,
187 capy::executor_ref,
188 std::stop_token,
189 std::error_code*) override;
190 };
191
192 timer_impl* try_pop_tl_cache(timer_service_impl*) noexcept;
193 bool try_push_tl_cache(timer_impl*) noexcept;
194 waiter_node* try_pop_waiter_tl_cache() noexcept;
195 bool try_push_waiter_tl_cache(waiter_node*) noexcept;
196
197 class timer_service_impl : public timer_service
198 {
199 public:
200 using clock_type = std::chrono::steady_clock;
201 using time_point = clock_type::time_point;
202 using key_type = timer_service;
203
204 private:
205 struct heap_entry
206 {
207 time_point time_;
208 timer_impl* timer_;
209 };
210
211 scheduler* sched_ = nullptr;
212 mutable std::mutex mutex_;
213 std::vector<heap_entry> heap_;
214 timer_impl* free_list_ = nullptr;
215 waiter_node* waiter_free_list_ = nullptr;
216 callback on_earliest_changed_;
217 // Avoids mutex in nearest_expiry() and empty()
218 mutable std::atomic<std::int64_t> cached_nearest_ns_{
219 (std::numeric_limits<std::int64_t>::max)()};
220
221 public:
222 336 timer_service_impl(capy::execution_context&, scheduler& sched)
223 336 : timer_service()
224 336 , sched_(&sched)
225 {
226 336 }
227
228 17922 scheduler& get_scheduler() noexcept { return *sched_; }
229
230 672 ~timer_service_impl() = default;
231
232 timer_service_impl(timer_service_impl const&) = delete;
233 timer_service_impl& operator=(timer_service_impl const&) = delete;
234
235 336 void set_on_earliest_changed(callback cb) override
236 {
237 336 on_earliest_changed_ = cb;
238 336 }
239
240 336 void shutdown() override
241 {
242 336 timer_service_invalidate_cache();
243
244 // Cancel waiting timers still in the heap
245
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 336 times.
336 for (auto& entry : heap_)
246 {
247 auto* impl = entry.timer_;
248 while (auto* w = impl->waiters_.pop_front())
249 {
250 w->stop_cb_.reset();
251 w->h_.destroy();
252 sched_->on_work_finished();
253 delete w;
254 }
255 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
256 delete impl;
257 }
258 336 heap_.clear();
259 336 cached_nearest_ns_.store(
260 (std::numeric_limits<std::int64_t>::max)(),
261 std::memory_order_release);
262
263 // Delete free-listed impls
264
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 336 times.
384 while (free_list_)
265 {
266 48 auto* next = free_list_->next_free_;
267
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 delete free_list_;
268 48 free_list_ = next;
269 }
270
271 // Delete free-listed waiters
272
2/2
✓ Branch 0 taken 58 times.
✓ Branch 1 taken 336 times.
394 while (waiter_free_list_)
273 {
274 58 auto* next = waiter_free_list_->next_free_;
275
1/2
✓ Branch 0 taken 58 times.
✗ Branch 1 not taken.
58 delete waiter_free_list_;
276 58 waiter_free_list_ = next;
277 }
278 336 }
279
280 9198 timer::timer_impl* create_impl() override
281 {
282 9198 timer_impl* impl = try_pop_tl_cache(this);
283
2/2
✓ Branch 0 taken 9025 times.
✓ Branch 1 taken 173 times.
9198 if (impl)
284 {
285 9025 impl->svc_ = this;
286 9025 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
287 9025 impl->might_have_pending_waits_ = false;
288 9025 return impl;
289 }
290
291
1/1
✓ Branch 1 taken 173 times.
173 std::lock_guard lock(mutex_);
292
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 173 times.
173 if (free_list_)
293 {
294 impl = free_list_;
295 free_list_ = impl->next_free_;
296 impl->next_free_ = nullptr;
297 impl->svc_ = this;
298 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
299 impl->might_have_pending_waits_ = false;
300 }
301 else
302 {
303
1/1
✓ Branch 1 taken 173 times.
173 impl = new timer_impl(*this);
304 }
305 173 return impl;
306 173 }
307
308 9198 void destroy_impl(timer_impl& impl)
309 {
310
1/1
✓ Branch 1 taken 9198 times.
9198 cancel_timer(impl);
311
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9198 times.
9198 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
313 {
314 std::lock_guard lock(mutex_);
315 remove_timer_impl(impl);
316 refresh_cached_nearest();
317 }
318
319
2/2
✓ Branch 1 taken 9150 times.
✓ Branch 2 taken 48 times.
9198 if (try_push_tl_cache(&impl))
320 9150 return;
321
322
1/1
✓ Branch 1 taken 48 times.
48 std::lock_guard lock(mutex_);
323 48 impl.next_free_ = free_list_;
324 48 free_list_ = &impl;
325 48 }
326
327 8961 waiter_node* create_waiter()
328 {
329
2/2
✓ Branch 1 taken 8819 times.
✓ Branch 2 taken 142 times.
8961 if (auto* w = try_pop_waiter_tl_cache())
330 8819 return w;
331
332
1/1
✓ Branch 1 taken 142 times.
142 std::lock_guard lock(mutex_);
333
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 142 times.
142 if (waiter_free_list_)
334 {
335 auto* w = waiter_free_list_;
336 waiter_free_list_ = w->next_free_;
337 w->next_free_ = nullptr;
338 return w;
339 }
340
341
1/1
✓ Branch 1 taken 142 times.
142 return new waiter_node();
342 142 }
343
344 8961 void destroy_waiter(waiter_node* w)
345 {
346
2/2
✓ Branch 1 taken 8903 times.
✓ Branch 2 taken 58 times.
8961 if (try_push_waiter_tl_cache(w))
347 8903 return;
348
349
1/1
✓ Branch 1 taken 58 times.
58 std::lock_guard lock(mutex_);
350 58 w->next_free_ = waiter_free_list_;
351 58 waiter_free_list_ = w;
352 58 }
353
354 // Heap insertion deferred to wait() — avoids lock when timer is idle
355 6 std::size_t update_timer(timer_impl& impl, time_point new_time)
356 {
357 bool in_heap =
358 6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
359
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 6 times.
6 if (!in_heap && impl.waiters_.empty())
360 return 0;
361
362 6 bool notify = false;
363 6 intrusive_list<waiter_node> canceled;
364
365 {
366
1/1
✓ Branch 1 taken 6 times.
6 std::lock_guard lock(mutex_);
367
368
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = impl.waiters_.pop_front())
369 {
370 10 w->impl_ = nullptr;
371 10 canceled.push_back(w);
372 10 }
373
374
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 if (impl.heap_index_ < heap_.size())
375 {
376 6 time_point old_time = heap_[impl.heap_index_].time_;
377 6 heap_[impl.heap_index_].time_ = new_time;
378
379
2/3
✓ Branch 1 taken 6 times.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
6 if (new_time < old_time)
380
1/1
✓ Branch 1 taken 6 times.
6 up_heap(impl.heap_index_);
381 else
382 down_heap(impl.heap_index_);
383
384 6 notify = (impl.heap_index_ == 0);
385 }
386
387 6 refresh_cached_nearest();
388 6 }
389
390 6 std::size_t count = 0;
391
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = canceled.pop_front())
392 {
393 10 w->ec_value_ = make_error_code(capy::error::canceled);
394
1/1
✓ Branch 1 taken 10 times.
10 sched_->post(&w->op_);
395 10 ++count;
396 10 }
397
398
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (notify)
399
1/1
✓ Branch 1 taken 6 times.
6 on_earliest_changed_();
400
401 6 return count;
402 }
403
404 // Inserts timer into heap if needed and pushes waiter, all under
405 // one lock to prevent races with cancel_waiter/process_expired
406 8961 void insert_waiter(timer_impl& impl, waiter_node* w)
407 {
408 8961 bool notify = false;
409 {
410
1/1
✓ Branch 1 taken 8961 times.
8961 std::lock_guard lock(mutex_);
411
2/2
✓ Branch 1 taken 8939 times.
✓ Branch 2 taken 22 times.
8961 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
412 {
413 8939 impl.heap_index_ = heap_.size();
414
1/1
✓ Branch 1 taken 8939 times.
8939 heap_.push_back({impl.expiry_, &impl});
415
1/1
✓ Branch 2 taken 8939 times.
8939 up_heap(heap_.size() - 1);
416 8939 notify = (impl.heap_index_ == 0);
417 8939 refresh_cached_nearest();
418 }
419 8961 impl.waiters_.push_back(w);
420 8961 }
421
2/2
✓ Branch 0 taken 8926 times.
✓ Branch 1 taken 35 times.
8961 if (notify)
422 8926 on_earliest_changed_();
423 8961 }
424
425 9206 std::size_t cancel_timer(timer_impl& impl)
426 {
427
2/2
✓ Branch 0 taken 9190 times.
✓ Branch 1 taken 16 times.
9206 if (!impl.might_have_pending_waits_)
428 9190 return 0;
429
430 // Not in heap and no waiters — just clear the flag
431 16 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)()
432
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 16 times.
16 && impl.waiters_.empty())
433 {
434 impl.might_have_pending_waits_ = false;
435 return 0;
436 }
437
438 16 intrusive_list<waiter_node> canceled;
439
440 {
441
1/1
✓ Branch 1 taken 16 times.
16 std::lock_guard lock(mutex_);
442
1/1
✓ Branch 1 taken 16 times.
16 remove_timer_impl(impl);
443
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = impl.waiters_.pop_front())
444 {
445 20 w->impl_ = nullptr;
446 20 canceled.push_back(w);
447 20 }
448 16 refresh_cached_nearest();
449 16 }
450
451 16 impl.might_have_pending_waits_ = false;
452
453 16 std::size_t count = 0;
454
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = canceled.pop_front())
455 {
456 20 w->ec_value_ = make_error_code(capy::error::canceled);
457
1/1
✓ Branch 1 taken 20 times.
20 sched_->post(&w->op_);
458 20 ++count;
459 20 }
460
461 16 return count;
462 }
463
464 // Cancel a single waiter (called from stop_token callback, any thread)
465 4 void cancel_waiter(waiter_node* w)
466 {
467 {
468
1/1
✓ Branch 1 taken 4 times.
4 std::lock_guard lock(mutex_);
469 // Already removed by cancel_timer or process_expired
470
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (!w->impl_)
471 return;
472 4 auto* impl = w->impl_;
473 4 w->impl_ = nullptr;
474 4 impl->waiters_.remove(w);
475
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if (impl->waiters_.empty())
476 {
477
1/1
✓ Branch 1 taken 2 times.
2 remove_timer_impl(*impl);
478 2 impl->might_have_pending_waits_ = false;
479 }
480 4 refresh_cached_nearest();
481 4 }
482
483 4 w->ec_value_ = make_error_code(capy::error::canceled);
484 4 sched_->post(&w->op_);
485 }
486
487 // Cancel front waiter only (FIFO), return 0 or 1
488 2 std::size_t cancel_one_waiter(timer_impl& impl)
489 {
490
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!impl.might_have_pending_waits_)
491 return 0;
492
493 2 waiter_node* w = nullptr;
494
495 {
496
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(mutex_);
497 2 w = impl.waiters_.pop_front();
498
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!w)
499 return 0;
500 2 w->impl_ = nullptr;
501
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (impl.waiters_.empty())
502 {
503 remove_timer_impl(impl);
504 impl.might_have_pending_waits_ = false;
505 }
506 2 refresh_cached_nearest();
507 2 }
508
509 2 w->ec_value_ = make_error_code(capy::error::canceled);
510 2 sched_->post(&w->op_);
511 2 return 1;
512 }
513
514 bool empty() const noexcept override
515 {
516 return cached_nearest_ns_.load(std::memory_order_acquire)
517 == (std::numeric_limits<std::int64_t>::max)();
518 }
519
520 21248 time_point nearest_expiry() const noexcept override
521 {
522 21248 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
523 21248 return time_point(time_point::duration(ns));
524 }
525
526 92860 std::size_t process_expired() override
527 {
528 92860 intrusive_list<waiter_node> expired;
529
530 {
531
1/1
✓ Branch 1 taken 92860 times.
92860 std::lock_guard lock(mutex_);
532 92860 auto now = clock_type::now();
533
534
7/7
✓ Branch 1 taken 101416 times.
✓ Branch 2 taken 365 times.
✓ Branch 5 taken 101416 times.
✓ Branch 8 taken 8921 times.
✓ Branch 9 taken 92495 times.
✓ Branch 10 taken 8921 times.
✓ Branch 11 taken 92860 times.
101781 while (!heap_.empty() && heap_[0].time_ <= now)
535 {
536 8921 timer_impl* t = heap_[0].timer_;
537
1/1
✓ Branch 1 taken 8921 times.
8921 remove_timer_impl(*t);
538
2/2
✓ Branch 1 taken 8925 times.
✓ Branch 2 taken 8921 times.
17846 while (auto* w = t->waiters_.pop_front())
539 {
540 8925 w->impl_ = nullptr;
541 8925 w->ec_value_ = {};
542 8925 expired.push_back(w);
543 8925 }
544 8921 t->might_have_pending_waits_ = false;
545 }
546
547 92860 refresh_cached_nearest();
548 92860 }
549
550 92860 std::size_t count = 0;
551
2/2
✓ Branch 1 taken 8925 times.
✓ Branch 2 taken 92860 times.
101785 while (auto* w = expired.pop_front())
552 {
553
1/1
✓ Branch 1 taken 8925 times.
8925 sched_->post(&w->op_);
554 8925 ++count;
555 8925 }
556
557 92860 return count;
558 }
559
560 private:
561 101827 void refresh_cached_nearest() noexcept
562 {
563 101827 auto ns = heap_.empty()
564
2/2
✓ Branch 0 taken 381 times.
✓ Branch 1 taken 101446 times.
101827 ? (std::numeric_limits<std::int64_t>::max)()
565 101446 : heap_[0].time_.time_since_epoch().count();
566 101827 cached_nearest_ns_.store(ns, std::memory_order_release);
567 101827 }
568
569 8939 void remove_timer_impl(timer_impl& impl)
570 {
571 8939 std::size_t index = impl.heap_index_;
572
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8939 times.
8939 if (index >= heap_.size())
573 return; // Not in heap
574
575
2/2
✓ Branch 1 taken 102 times.
✓ Branch 2 taken 8837 times.
8939 if (index == heap_.size() - 1)
576 {
577 // Last element, just pop
578 102 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
579 102 heap_.pop_back();
580 }
581 else
582 {
583 // Swap with last and reheapify
584 8837 swap_heap(index, heap_.size() - 1);
585 8837 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
586 8837 heap_.pop_back();
587
588
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 8837 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 8837 times.
8837 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
589 up_heap(index);
590 else
591 8837 down_heap(index);
592 }
593 }
594
595 8945 void up_heap(std::size_t index)
596 {
597
2/2
✓ Branch 0 taken 8839 times.
✓ Branch 1 taken 8932 times.
17771 while (index > 0)
598 {
599 8839 std::size_t parent = (index - 1) / 2;
600
2/2
✓ Branch 4 taken 13 times.
✓ Branch 5 taken 8826 times.
8839 if (!(heap_[index].time_ < heap_[parent].time_))
601 13 break;
602 8826 swap_heap(index, parent);
603 8826 index = parent;
604 }
605 8945 }
606
607 8837 void down_heap(std::size_t index)
608 {
609 8837 std::size_t child = index * 2 + 1;
610
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8833 times.
8837 while (child < heap_.size())
611 {
612 4 std::size_t min_child = (child + 1 == heap_.size() ||
613 heap_[child].time_ < heap_[child + 1].time_)
614
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 ? child : child + 1;
615
616
1/2
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 if (heap_[index].time_ < heap_[min_child].time_)
617 4 break;
618
619 swap_heap(index, min_child);
620 index = min_child;
621 child = index * 2 + 1;
622 }
623 8837 }
624
625 17663 void swap_heap(std::size_t i1, std::size_t i2)
626 {
627 17663 heap_entry tmp = heap_[i1];
628 17663 heap_[i1] = heap_[i2];
629 17663 heap_[i2] = tmp;
630 17663 heap_[i1].timer_->heap_index_ = i1;
631 17663 heap_[i2].timer_->heap_index_ = i2;
632 17663 }
633 };
634
635 173 timer_impl::
636 173 timer_impl(timer_service_impl& svc) noexcept
637 173 : svc_(&svc)
638 {
639 173 }
640
641 void
642 4 waiter_node::canceller::
643 operator()() const
644 {
645 4 waiter_->svc_->cancel_waiter(waiter_);
646 4 }
647
648 void
649 waiter_node::completion_op::
650 do_complete(
651 void* owner,
652 scheduler_op* base,
653 std::uint32_t,
654 std::uint32_t)
655 {
656 if (!owner)
657 return;
658 static_cast<completion_op*>(base)->operator()();
659 }
660
661 void
662 8961 waiter_node::completion_op::
663 operator()()
664 {
665 8961 auto* w = waiter_;
666 8961 w->stop_cb_.reset();
667
1/2
✓ Branch 0 taken 8961 times.
✗ Branch 1 not taken.
8961 if (w->ec_out_)
668 8961 *w->ec_out_ = w->ec_value_;
669
670 8961 auto h = w->h_;
671 8961 auto d = w->d_;
672 8961 auto* svc = w->svc_;
673 8961 auto& sched = svc->get_scheduler();
674
675
1/1
✓ Branch 1 taken 8961 times.
8961 svc->destroy_waiter(w);
676
677
1/1
✓ Branch 1 taken 8961 times.
8961 d.post(h);
678 8961 sched.on_work_finished();
679 8961 }
680
681 void
682 9198 timer_impl::
683 release()
684 {
685 9198 svc_->destroy_impl(*this);
686 9198 }
687
688 std::coroutine_handle<>
689 8961 timer_impl::
690 wait(
691 std::coroutine_handle<> h,
692 capy::executor_ref d,
693 std::stop_token token,
694 std::error_code* ec)
695 {
696 // Already-expired fast path — no waiter_node, no mutex.
697 // Post instead of dispatch so the coroutine yields to the
698 // scheduler, allowing other queued work to run.
699
2/2
✓ Branch 1 taken 8939 times.
✓ Branch 2 taken 22 times.
8961 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
700 {
701
3/5
✓ Branch 2 taken 8939 times.
✓ Branch 4 taken 8939 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8939 times.
17878 if (expiry_ == (time_point::min)() ||
702
2/3
✓ Branch 2 taken 8939 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 8939 times.
17878 expiry_ <= clock_type::now())
703 {
704 if (ec)
705 *ec = {};
706 d.post(h);
707 return std::noop_coroutine();
708 }
709 }
710
711 8961 auto* w = svc_->create_waiter();
712 8961 w->impl_ = this;
713 8961 w->svc_ = svc_;
714 8961 w->h_ = h;
715 8961 w->d_ = std::move(d);
716 8961 w->token_ = std::move(token);
717 8961 w->ec_out_ = ec;
718
719 8961 svc_->insert_waiter(*this, w);
720 8961 might_have_pending_waits_ = true;
721 8961 svc_->get_scheduler().on_work_started();
722
723
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8957 times.
8961 if (w->token_.stop_possible())
724 4 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
725
726 8961 return std::noop_coroutine();
727 }
728
729 // Extern free functions called from timer.cpp
730 //
731 // Two thread-local caches avoid hot-path mutex acquisitions:
732 //
733 // 1. Impl cache — single-slot, validated by comparing svc_ on the
734 // impl against the current service pointer.
735 //
736 // 2. Waiter cache — single-slot, no service affinity.
737 //
738 // The service pointer is obtained from the scheduler_impl's
739 // timer_svc_ member, avoiding find_service() on the hot path.
740 // All caches are cleared by timer_service_invalidate_cache()
741 // during shutdown.
742
743 thread_local_ptr<timer_impl> tl_cached_impl;
744 thread_local_ptr<waiter_node> tl_cached_waiter;
745
746 timer_impl*
747 9198 try_pop_tl_cache(timer_service_impl* svc) noexcept
748 {
749 9198 auto* impl = tl_cached_impl.get();
750
2/2
✓ Branch 0 taken 9025 times.
✓ Branch 1 taken 173 times.
9198 if (impl)
751 {
752 9025 tl_cached_impl.set(nullptr);
753
1/2
✓ Branch 0 taken 9025 times.
✗ Branch 1 not taken.
9025 if (impl->svc_ == svc)
754 9025 return impl;
755 // Stale impl from a destroyed service
756 delete impl;
757 }
758 173 return nullptr;
759 }
760
761 bool
762 9198 try_push_tl_cache(timer_impl* impl) noexcept
763 {
764
2/2
✓ Branch 1 taken 9150 times.
✓ Branch 2 taken 48 times.
9198 if (!tl_cached_impl.get())
765 {
766 9150 tl_cached_impl.set(impl);
767 9150 return true;
768 }
769 48 return false;
770 }
771
772 waiter_node*
773 8961 try_pop_waiter_tl_cache() noexcept
774 {
775 8961 auto* w = tl_cached_waiter.get();
776
2/2
✓ Branch 0 taken 8819 times.
✓ Branch 1 taken 142 times.
8961 if (w)
777 {
778 8819 tl_cached_waiter.set(nullptr);
779 8819 return w;
780 }
781 142 return nullptr;
782 }
783
784 bool
785 8961 try_push_waiter_tl_cache(waiter_node* w) noexcept
786 {
787
2/2
✓ Branch 1 taken 8903 times.
✓ Branch 2 taken 58 times.
8961 if (!tl_cached_waiter.get())
788 {
789 8903 tl_cached_waiter.set(w);
790 8903 return true;
791 }
792 58 return false;
793 }
794
795 void
796 336 timer_service_invalidate_cache() noexcept
797 {
798
2/2
✓ Branch 1 taken 125 times.
✓ Branch 2 taken 211 times.
336 delete tl_cached_impl.get();
799 336 tl_cached_impl.set(nullptr);
800
801
2/2
✓ Branch 1 taken 84 times.
✓ Branch 2 taken 252 times.
336 delete tl_cached_waiter.get();
802 336 tl_cached_waiter.set(nullptr);
803 336 }
804
805 struct timer_service_access
806 {
807 9198 static scheduler_impl& get_scheduler(basic_io_context& ctx) noexcept
808 {
809 9198 return static_cast<scheduler_impl&>(*ctx.sched_);
810 }
811 };
812
813 timer::timer_impl*
814 9198 timer_service_create(capy::execution_context& ctx)
815 {
816
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9198 times.
9198 if (!ctx.target<basic_io_context>())
817 detail::throw_logic_error();
818 9198 auto& ioctx = static_cast<basic_io_context&>(ctx);
819 auto* svc = static_cast<timer_service_impl*>(
820 9198 timer_service_access::get_scheduler(ioctx).timer_svc_);
821
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9198 times.
9198 if (!svc)
822 detail::throw_logic_error();
823 9198 return svc->create_impl();
824 }
825
826 void
827 9198 timer_service_destroy(timer::timer_impl& base) noexcept
828 {
829 9198 static_cast<timer_impl&>(base).release();
830 9198 }
831
832 std::size_t
833 6 timer_service_update_expiry(timer::timer_impl& base)
834 {
835 6 auto& impl = static_cast<timer_impl&>(base);
836 6 return impl.svc_->update_timer(impl, impl.expiry_);
837 }
838
839 std::size_t
840 8 timer_service_cancel(timer::timer_impl& base) noexcept
841 {
842 8 auto& impl = static_cast<timer_impl&>(base);
843 8 return impl.svc_->cancel_timer(impl);
844 }
845
846 std::size_t
847 2 timer_service_cancel_one(timer::timer_impl& base) noexcept
848 {
849 2 auto& impl = static_cast<timer_impl&>(base);
850 2 return impl.svc_->cancel_one_waiter(impl);
851 }
852
853 timer_service&
854 336 get_timer_service(capy::execution_context& ctx, scheduler& sched)
855 {
856 336 return ctx.make_service<timer_service_impl>(sched);
857 }
858
859 } // namespace boost::corosio::detail
860