Line data Source code
1 : //
2 : // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 : #define BOOST_COROSIO_TCP_SERVER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/except.hpp>
15 : #include <boost/corosio/tcp_acceptor.hpp>
16 : #include <boost/corosio/tcp_socket.hpp>
17 : #include <boost/corosio/io_context.hpp>
18 : #include <boost/corosio/endpoint.hpp>
19 : #include <boost/capy/task.hpp>
20 : #include <boost/capy/concept/execution_context.hpp>
21 : #include <boost/capy/concept/io_awaitable.hpp>
22 : #include <boost/capy/concept/executor.hpp>
23 : #include <boost/capy/ex/any_executor.hpp>
24 : #include <boost/capy/ex/frame_allocator.hpp>
25 : #include <boost/capy/ex/io_env.hpp>
26 : #include <boost/capy/ex/run_async.hpp>
27 :
28 : #include <coroutine>
29 : #include <memory>
30 : #include <ranges>
31 : #include <vector>
32 :
33 : namespace boost::corosio {
34 :
35 : #ifdef _MSC_VER
36 : #pragma warning(push)
37 : #pragma warning(disable: 4251) // class needs to have dll-interface
38 : #endif
39 :
40 : /** TCP server with pooled workers.
41 :
42 : This class manages a pool of reusable worker objects that handle
43 : incoming connections. When a connection arrives, an idle worker
44 : is dispatched to handle it. After the connection completes, the
45 : worker returns to the pool for reuse, avoiding allocation overhead
46 : per connection.
47 :
48 : Workers are set via @ref set_workers as a forward range of
49 : pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 : takes ownership of the container via type erasure.
51 :
52 : @par Thread Safety
53 : Distinct objects: Safe.
54 : Shared objects: Unsafe.
55 :
56 : @par Lifecycle
57 : The server operates in three states:
58 :
59 : - **Stopped**: Initial state, or after @ref join completes.
60 : - **Running**: After @ref start, actively accepting connections.
61 : - **Stopping**: After @ref stop, draining active work.
62 :
63 : State transitions:
64 : @code
65 : [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 : @endcode
67 :
68 : @par Running the Server
69 : @code
70 : io_context ioc;
71 : tcp_server srv(ioc, ioc.get_executor());
72 : srv.set_workers(make_workers(ioc, 100));
73 : srv.bind(endpoint{address_v4::any(), 8080});
74 : srv.start();
75 : ioc.run(); // Blocks until all work completes
76 : @endcode
77 :
78 : @par Graceful Shutdown
79 : To shut down gracefully, call @ref stop then drain the io_context:
80 : @code
81 : // From a signal handler or timer callback:
82 : srv.stop();
83 :
84 : // ioc.run() returns after pending work drains.
85 : // Then from the thread that called ioc.run():
86 : srv.join(); // Wait for accept loops to finish
87 : @endcode
88 :
89 : @par Restart After Stop
90 : The server can be restarted after a complete shutdown cycle.
91 : You must drain the io_context and call @ref join before restarting:
92 : @code
93 : srv.start();
94 : ioc.run_for( 10s ); // Run for a while
95 : srv.stop(); // Signal shutdown
96 : ioc.run(); // REQUIRED: drain pending completions
97 : srv.join(); // REQUIRED: wait for accept loops
98 :
99 : // Now safe to restart
100 : srv.start();
101 : ioc.run();
102 : @endcode
103 :
104 : @par WARNING: What NOT to Do
105 : - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 : - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 : - Do NOT call @ref start without completing @ref join after @ref stop.
108 : - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109 :
110 : @par Example
111 : @code
112 : class my_worker : public tcp_server::worker_base
113 : {
114 : corosio::tcp_socket sock_;
115 : capy::any_executor ex_;
116 : public:
117 : my_worker(io_context& ctx)
118 : : sock_(ctx)
119 : , ex_(ctx.get_executor())
120 : {
121 : }
122 :
123 : corosio::tcp_socket& socket() override { return sock_; }
124 :
125 : void run(launcher launch) override
126 : {
127 : launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 : {
129 : // handle connection using sock
130 : co_return;
131 : }(&sock_));
132 : }
133 : };
134 :
135 : auto make_workers(io_context& ctx, int n)
136 : {
137 : std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 : v.reserve(n);
139 : for(int i = 0; i < n; ++i)
140 : v.push_back(std::make_unique<my_worker>(ctx));
141 : return v;
142 : }
143 :
144 : io_context ioc;
145 : tcp_server srv(ioc, ioc.get_executor());
146 : srv.set_workers(make_workers(ioc, 100));
147 : @endcode
148 :
149 : @see worker_base, set_workers, launcher
150 : */
151 : class BOOST_COROSIO_DECL
152 : tcp_server
153 : {
154 : public:
155 : class worker_base; ///< Abstract base for connection handlers.
156 : class launcher; ///< Move-only handle to launch worker coroutines.
157 :
158 : private:
159 : struct waiter
160 : {
161 : waiter* next;
162 : std::coroutine_handle<> h;
163 : worker_base* w;
164 : };
165 :
166 : struct impl;
167 :
168 : static impl* make_impl(capy::execution_context& ctx);
169 :
170 : impl* impl_;
171 : capy::any_executor ex_;
172 : waiter* waiters_ = nullptr;
173 : worker_base* idle_head_ = nullptr; // Forward list: available workers
174 : worker_base* active_head_ = nullptr; // Doubly linked: workers handling connections
175 : worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
176 : std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
177 : std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
178 : bool running_ = false;
179 :
180 : // Idle list (forward/singly linked) - push front, pop front
181 45 : void idle_push(worker_base* w) noexcept
182 : {
183 45 : w->next_ = idle_head_;
184 45 : idle_head_ = w;
185 45 : }
186 :
187 9 : worker_base* idle_pop() noexcept
188 : {
189 9 : auto* w = idle_head_;
190 9 : if(w) idle_head_ = w->next_;
191 9 : return w;
192 : }
193 :
194 9 : bool idle_empty() const noexcept { return idle_head_ == nullptr; }
195 :
196 : // Active list (doubly linked) - push back, remove anywhere
197 3 : void active_push(worker_base* w) noexcept
198 : {
199 3 : w->next_ = nullptr;
200 3 : w->prev_ = active_tail_;
201 3 : if(active_tail_)
202 0 : active_tail_->next_ = w;
203 : else
204 3 : active_head_ = w;
205 3 : active_tail_ = w;
206 3 : }
207 :
208 9 : void active_remove(worker_base* w) noexcept
209 : {
210 : // Skip if not in active list (e.g., after failed accept)
211 9 : if(w != active_head_ && w->prev_ == nullptr)
212 6 : return;
213 3 : if(w->prev_)
214 0 : w->prev_->next_ = w->next_;
215 : else
216 3 : active_head_ = w->next_;
217 3 : if(w->next_)
218 0 : w->next_->prev_ = w->prev_;
219 : else
220 3 : active_tail_ = w->prev_;
221 3 : w->prev_ = nullptr; // Mark as not in active list
222 : }
223 :
224 : template<capy::Executor Ex>
225 : struct launch_wrapper
226 : {
227 : struct promise_type
228 : {
229 : Ex ex; // Executor stored directly in frame (outlives child tasks)
230 : capy::io_env env_;
231 :
232 : // For regular coroutines: first arg is executor, second is stop token
233 : template<class E, class S, class... Args>
234 : requires capy::Executor<std::decay_t<E>>
235 : promise_type(E e, S s, Args&&...)
236 : : ex(std::move(e))
237 : , env_{capy::executor_ref(ex), std::move(s),
238 : capy::current_frame_allocator()}
239 : {
240 : }
241 :
242 : // For lambda coroutines: first arg is closure, second is executor, third is stop token
243 : template<class Closure, class E, class S, class... Args>
244 : requires (!capy::Executor<std::decay_t<Closure>> &&
245 : capy::Executor<std::decay_t<E>>)
246 3 : promise_type(Closure&&, E e, S s, Args&&...)
247 3 : : ex(std::move(e))
248 3 : , env_{capy::executor_ref(ex), std::move(s),
249 3 : capy::current_frame_allocator()}
250 : {
251 3 : }
252 :
253 3 : launch_wrapper get_return_object() noexcept {
254 3 : return {std::coroutine_handle<promise_type>::from_promise(*this)};
255 : }
256 3 : std::suspend_always initial_suspend() noexcept { return {}; }
257 3 : std::suspend_never final_suspend() noexcept { return {}; }
258 3 : void return_void() noexcept {}
259 0 : void unhandled_exception() { std::terminate(); }
260 :
261 : // Inject io_env for IoAwaitable
262 : template<capy::IoAwaitable Awaitable>
263 6 : auto await_transform(Awaitable&& a)
264 : {
265 : using AwaitableT = std::decay_t<Awaitable>;
266 : struct adapter
267 : {
268 : AwaitableT aw;
269 : capy::io_env const* env;
270 :
271 6 : bool await_ready() { return aw.await_ready(); }
272 6 : decltype(auto) await_resume() { return aw.await_resume(); }
273 :
274 6 : auto await_suspend(std::coroutine_handle<promise_type> h)
275 : {
276 6 : return aw.await_suspend(h, env);
277 : }
278 : };
279 9 : return adapter{std::forward<Awaitable>(a), &env_};
280 3 : }
281 : };
282 :
283 : std::coroutine_handle<promise_type> h;
284 :
285 3 : launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
286 3 : : h(handle)
287 : {
288 3 : }
289 :
290 3 : ~launch_wrapper()
291 : {
292 3 : if(h)
293 0 : h.destroy();
294 3 : }
295 :
296 : launch_wrapper(launch_wrapper&& o) noexcept
297 : : h(std::exchange(o.h, nullptr))
298 : {
299 : }
300 :
301 : launch_wrapper(launch_wrapper const&) = delete;
302 : launch_wrapper& operator=(launch_wrapper const&) = delete;
303 : launch_wrapper& operator=(launch_wrapper&&) = delete;
304 : };
305 :
306 : // Named functor to avoid incomplete lambda type in coroutine promise
307 : template<class Executor>
308 : struct launch_coro
309 : {
310 3 : launch_wrapper<Executor> operator()(
311 : Executor,
312 : std::stop_token,
313 : tcp_server* self,
314 : capy::task<void> t,
315 : worker_base* wp)
316 : {
317 : // Executor and stop token stored in promise via constructor
318 : co_await std::move(t);
319 : co_await self->push(*wp); // worker goes back to idle list
320 6 : }
321 : };
322 :
323 : class push_awaitable
324 : {
325 : tcp_server& self_;
326 : worker_base& w_;
327 :
328 : public:
329 9 : push_awaitable(
330 : tcp_server& self,
331 : worker_base& w) noexcept
332 9 : : self_(self)
333 9 : , w_(w)
334 : {
335 9 : }
336 :
337 9 : bool await_ready() const noexcept
338 : {
339 9 : return false;
340 : }
341 :
342 : std::coroutine_handle<>
343 9 : await_suspend(
344 : std::coroutine_handle<> h,
345 : capy::io_env const*) noexcept
346 : {
347 : // Symmetric transfer to server's executor
348 9 : return self_.ex_.dispatch(h);
349 : }
350 :
351 9 : void await_resume() noexcept
352 : {
353 : // Running on server executor - safe to modify lists
354 : // Remove from active (if present), then wake waiter or add to idle
355 9 : self_.active_remove(&w_);
356 9 : if(self_.waiters_)
357 : {
358 0 : auto* wait = self_.waiters_;
359 0 : self_.waiters_ = wait->next;
360 0 : wait->w = &w_;
361 0 : self_.ex_.post(wait->h);
362 : }
363 : else
364 : {
365 9 : self_.idle_push(&w_);
366 : }
367 9 : }
368 : };
369 :
370 : class pop_awaitable
371 : {
372 : tcp_server& self_;
373 : waiter wait_;
374 :
375 : public:
376 9 : pop_awaitable(tcp_server& self) noexcept
377 9 : : self_(self)
378 9 : , wait_{}
379 : {
380 9 : }
381 :
382 9 : bool await_ready() const noexcept
383 : {
384 9 : return !self_.idle_empty();
385 : }
386 :
387 : bool
388 0 : await_suspend(
389 : std::coroutine_handle<> h,
390 : capy::io_env const*) noexcept
391 : {
392 : // Running on server executor (do_accept runs there)
393 0 : wait_.h = h;
394 0 : wait_.w = nullptr;
395 0 : wait_.next = self_.waiters_;
396 0 : self_.waiters_ = &wait_;
397 0 : return true;
398 : }
399 :
400 9 : worker_base& await_resume() noexcept
401 : {
402 : // Running on server executor
403 9 : if(wait_.w)
404 0 : return *wait_.w; // Woken by push_awaitable
405 9 : return *self_.idle_pop();
406 : }
407 : };
408 :
409 9 : push_awaitable push(worker_base& w)
410 : {
411 9 : return push_awaitable{*this, w};
412 : }
413 :
414 : // Synchronous version for destructor/guard paths
415 : // Must be called from server executor context
416 0 : void push_sync(worker_base& w) noexcept
417 : {
418 0 : active_remove(&w);
419 0 : if(waiters_)
420 : {
421 0 : auto* wait = waiters_;
422 0 : waiters_ = wait->next;
423 0 : wait->w = &w;
424 0 : ex_.post(wait->h);
425 : }
426 : else
427 : {
428 0 : idle_push(&w);
429 : }
430 0 : }
431 :
432 9 : pop_awaitable pop()
433 : {
434 9 : return pop_awaitable{*this};
435 : }
436 :
437 : capy::task<void> do_accept(tcp_acceptor& acc);
438 :
439 : public:
440 : /** Abstract base class for connection handlers.
441 :
442 : Derive from this class to implement custom connection handling.
443 : Each worker owns a socket and is reused across multiple
444 : connections to avoid per-connection allocation.
445 :
446 : @see tcp_server, launcher
447 : */
448 : class BOOST_COROSIO_DECL
449 : worker_base
450 : {
451 : // Ordered largest to smallest for optimal packing
452 : std::stop_source stop_; // ~16 bytes
453 : worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
454 : worker_base* prev_ = nullptr; // 8 bytes - used only by active list
455 :
456 : friend class tcp_server;
457 :
458 : public:
459 : /// Destroy the worker.
460 36 : virtual ~worker_base() = default;
461 :
462 : /** Handle an accepted connection.
463 :
464 : Called when this worker is dispatched to handle a new
465 : connection. The implementation must invoke the launcher
466 : exactly once to start the handling coroutine.
467 :
468 : @param launch Handle to launch the connection coroutine.
469 : */
470 : virtual void run(launcher launch) = 0;
471 :
472 : /// Return the socket used for connections.
473 : virtual corosio::tcp_socket& socket() = 0;
474 : };
475 :
476 : /** Move-only handle to launch a worker coroutine.
477 :
478 : Passed to @ref worker_base::run to start the connection-handling
479 : coroutine. The launcher ensures the worker returns to the idle
480 : pool when the coroutine completes or if launching fails.
481 :
482 : The launcher must be invoked exactly once via `operator()`.
483 : If destroyed without invoking, the worker is returned to the
484 : idle pool automatically.
485 :
486 : @see worker_base::run
487 : */
488 : class BOOST_COROSIO_DECL
489 : launcher
490 : {
491 : tcp_server* srv_;
492 : worker_base* w_;
493 :
494 : friend class tcp_server;
495 :
496 3 : launcher(tcp_server& srv, worker_base& w) noexcept
497 3 : : srv_(&srv)
498 3 : , w_(&w)
499 : {
500 3 : }
501 :
502 : public:
503 : /// Return the worker to the pool if not launched.
504 3 : ~launcher()
505 : {
506 3 : if(w_)
507 0 : srv_->push_sync(*w_);
508 3 : }
509 :
510 : launcher(launcher&& o) noexcept
511 : : srv_(o.srv_)
512 : , w_(std::exchange(o.w_, nullptr))
513 : {
514 : }
515 : launcher(launcher const&) = delete;
516 : launcher& operator=(launcher const&) = delete;
517 : launcher& operator=(launcher&&) = delete;
518 :
519 : /** Launch the connection-handling coroutine.
520 :
521 : Starts the given coroutine on the specified executor. When
522 : the coroutine completes, the worker is automatically returned
523 : to the idle pool.
524 :
525 : @param ex The executor to run the coroutine on.
526 : @param task The coroutine to execute.
527 :
528 : @throws std::logic_error If this launcher was already invoked.
529 : */
530 : template<class Executor>
531 3 : void operator()(Executor const& ex, capy::task<void> task)
532 : {
533 3 : if(! w_)
534 0 : detail::throw_logic_error(); // launcher already invoked
535 :
536 3 : auto* w = std::exchange(w_, nullptr);
537 :
538 : // Worker is being dispatched - add to active list
539 3 : srv_->active_push(w);
540 :
541 : // Return worker to pool if coroutine setup throws
542 : struct guard_t {
543 : tcp_server* srv;
544 : worker_base* w;
545 3 : ~guard_t() { if(w) srv->push_sync(*w); }
546 3 : } guard{srv_, w};
547 :
548 : // Reset worker's stop source for this connection
549 3 : w->stop_ = {};
550 3 : auto st = w->stop_.get_token();
551 :
552 3 : auto wrapper = launch_coro<Executor>{}(
553 3 : ex, st, srv_, std::move(task), w);
554 :
555 : // Executor and stop token stored in promise via constructor
556 3 : ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
557 3 : guard.w = nullptr; // Success - dismiss guard
558 3 : }
559 : };
560 :
561 : /** Construct a TCP server.
562 :
563 : @tparam Ctx Execution context type satisfying ExecutionContext.
564 : @tparam Ex Executor type satisfying Executor.
565 :
566 : @param ctx The execution context for socket operations.
567 : @param ex The executor for dispatching coroutines.
568 :
569 : @par Example
570 : @code
571 : tcp_server srv(ctx, ctx.get_executor());
572 : srv.set_workers(make_workers(ctx, 100));
573 : srv.bind(endpoint{...});
574 : srv.start();
575 : @endcode
576 : */
577 : template<
578 : capy::ExecutionContext Ctx,
579 : capy::Executor Ex>
580 9 : tcp_server(Ctx& ctx, Ex ex)
581 9 : : impl_(make_impl(ctx))
582 9 : , ex_(std::move(ex))
583 : {
584 9 : }
585 :
586 : public:
587 : ~tcp_server();
588 : tcp_server(tcp_server const&) = delete;
589 : tcp_server& operator=(tcp_server const&) = delete;
590 : tcp_server(tcp_server&& o) noexcept;
591 : tcp_server& operator=(tcp_server&& o) noexcept;
592 :
593 : /** Bind to a local endpoint.
594 :
595 : Creates an acceptor listening on the specified endpoint.
596 : Multiple endpoints can be bound by calling this method
597 : multiple times before @ref start.
598 :
599 : @param ep The local endpoint to bind to.
600 :
601 : @return The error code if binding fails.
602 : */
603 : std::error_code
604 : bind(endpoint ep);
605 :
606 : /** Set the worker pool.
607 :
608 : Replaces any existing workers with the given range. Any
609 : previous workers are released and the idle/active lists
610 : are cleared before populating with new workers.
611 :
612 : @tparam Range Forward range of pointer-like objects to worker_base.
613 :
614 : @param workers Range of workers to manage. Each element must
615 : support `std::to_address()` yielding `worker_base*`.
616 :
617 : @par Example
618 : @code
619 : std::vector<std::unique_ptr<my_worker>> workers;
620 : for(int i = 0; i < 100; ++i)
621 : workers.push_back(std::make_unique<my_worker>(ctx));
622 : srv.set_workers(std::move(workers));
623 : @endcode
624 : */
625 : template<std::ranges::forward_range Range>
626 : requires std::convertible_to<
627 : decltype(std::to_address(
628 : std::declval<std::ranges::range_value_t<Range>&>())),
629 : worker_base*>
630 : void
631 9 : set_workers(Range&& workers)
632 : {
633 : // Clear existing state
634 9 : storage_.reset();
635 9 : idle_head_ = nullptr;
636 9 : active_head_ = nullptr;
637 9 : active_tail_ = nullptr;
638 :
639 : // Take ownership and populate idle list
640 : using StorageType = std::decay_t<Range>;
641 9 : auto* p = new StorageType(std::forward<Range>(workers));
642 9 : storage_ = std::shared_ptr<void>(p, [](void* ptr) {
643 9 : delete static_cast<StorageType*>(ptr);
644 : });
645 45 : for(auto&& elem : *static_cast<StorageType*>(p))
646 36 : idle_push(std::to_address(elem));
647 9 : }
648 :
649 : /** Start accepting connections.
650 :
651 : Launches accept loops for all bound endpoints. Incoming
652 : connections are dispatched to idle workers from the pool.
653 :
654 : Calling `start()` on an already-running server has no effect.
655 :
656 : @par Preconditions
657 : - At least one endpoint bound via @ref bind.
658 : - Workers provided to the constructor.
659 : - If restarting, @ref join must have completed first.
660 :
661 : @par Effects
662 : Creates one accept coroutine per bound endpoint. Each coroutine
663 : runs on the server's executor, waiting for connections and
664 : dispatching them to idle workers.
665 :
666 : @par Restart Sequence
667 : To restart after stopping, complete the full shutdown cycle:
668 : @code
669 : srv.start();
670 : ioc.run_for( 1s );
671 : srv.stop(); // 1. Signal shutdown
672 : ioc.run(); // 2. Drain remaining completions
673 : srv.join(); // 3. Wait for accept loops
674 :
675 : // Now safe to restart
676 : srv.start();
677 : ioc.run();
678 : @endcode
679 :
680 : @par Thread Safety
681 : Not thread safe.
682 :
683 : @throws std::logic_error If a previous session has not been
684 : joined (accept loops still active).
685 : */
686 : void start();
687 :
688 : /** Stop accepting connections.
689 :
690 : Signals all listening ports to stop accepting new connections
691 : and requests cancellation of active workers via their stop tokens.
692 :
693 : This function returns immediately; it does not wait for workers
694 : to finish. Pending I/O operations complete asynchronously.
695 :
696 : Calling `stop()` on a non-running server has no effect.
697 :
698 : @par Effects
699 : - Closes all acceptors (pending accepts complete with error).
700 : - Requests stop on each active worker's stop token.
701 : - Workers observing their stop token should exit promptly.
702 :
703 : @par Postconditions
704 : No new connections will be accepted. Active workers continue
705 : until they observe their stop token or complete naturally.
706 :
707 : @par What Happens Next
708 : After calling `stop()`:
709 : 1. Let `ioc.run()` return (drains pending completions).
710 : 2. Call @ref join to wait for accept loops to finish.
711 : 3. Only then is it safe to restart or destroy the server.
712 :
713 : @par Thread Safety
714 : Not thread safe.
715 :
716 : @see join, start
717 : */
718 : void stop();
719 :
720 : /** Block until all accept loops complete.
721 :
722 : Blocks the calling thread until all accept coroutines launched
723 : by @ref start have finished executing. This synchronizes the
724 : shutdown sequence, ensuring the server is fully stopped before
725 : restarting or destroying it.
726 :
727 : @par Preconditions
728 : @ref stop has been called and `ioc.run()` has returned.
729 :
730 : @par Postconditions
731 : All accept loops have completed. The server is in the stopped
732 : state and may be restarted via @ref start.
733 :
734 : @par Example (Correct Usage)
735 : @code
736 : // main thread
737 : srv.start();
738 : ioc.run(); // Blocks until work completes
739 : srv.join(); // Safe: called after ioc.run() returns
740 : @endcode
741 :
742 : @par WARNING: Deadlock Scenarios
743 : Calling `join()` from the wrong context causes deadlock:
744 :
745 : @code
746 : // WRONG: calling join() from inside a worker coroutine
747 : void run( launcher launch ) override
748 : {
749 : launch( ex, [this]() -> capy::task<>
750 : {
751 : srv_.join(); // DEADLOCK: blocks the executor
752 : co_return;
753 : }());
754 : }
755 :
756 : // WRONG: calling join() while ioc.run() is still active
757 : std::thread t( [&]{ ioc.run(); } );
758 : srv.stop();
759 : srv.join(); // DEADLOCK: ioc.run() still running in thread t
760 : @endcode
761 :
762 : @par Thread Safety
763 : May be called from any thread, but will deadlock if called
764 : from within the io_context event loop or from a worker coroutine.
765 :
766 : @see stop, start
767 : */
768 : void join();
769 :
770 : private:
771 : capy::task<> do_stop();
772 : };
773 :
774 : #ifdef _MSC_VER
775 : #pragma warning(pop)
776 : #endif
777 :
778 : } // namespace boost::corosio
779 :
780 : #endif
|