libs/corosio/src/corosio/src/detail/select/acceptors.cpp

63.7% Lines (158/248) 88.9% Functions (16/18) 45.9% Branches (67/146)
libs/corosio/src/corosio/src/detail/select/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/acceptors.hpp"
15 #include "src/detail/select/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18 #include "src/detail/make_err.hpp"
19
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <netinet/in.h>
23 #include <sys/socket.h>
24 #include <unistd.h>
25
26 namespace boost::corosio::detail {
27
28 void
29 select_accept_op::
30 cancel() noexcept
31 {
32 if (acceptor_impl_)
33 acceptor_impl_->cancel_single_op(*this);
34 else
35 request_cancel();
36 }
37
38 void
39 3542 select_accept_op::
40 operator()()
41 {
42 3542 stop_cb.reset();
43
44
3/4
✓ Branch 0 taken 3542 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 3539 times.
✓ Branch 4 taken 3 times.
3542 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
45
46
1/2
✓ Branch 0 taken 3542 times.
✗ Branch 1 not taken.
3542 if (ec_out)
47 {
48
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 3539 times.
3542 if (cancelled.load(std::memory_order_acquire))
49 3 *ec_out = capy::error::canceled;
50
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3539 times.
3539 else if (errn != 0)
51 *ec_out = make_err(errn);
52 else
53 3539 *ec_out = {};
54 }
55
56
3/4
✓ Branch 0 taken 3539 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 3539 times.
✗ Branch 3 not taken.
3542 if (success && accepted_fd >= 0)
57 {
58
1/2
✓ Branch 0 taken 3539 times.
✗ Branch 1 not taken.
3539 if (acceptor_impl_)
59 {
60 3539 auto* socket_svc = static_cast<select_acceptor_impl*>(acceptor_impl_)
61 3539 ->service().socket_service();
62
1/2
✓ Branch 0 taken 3539 times.
✗ Branch 1 not taken.
3539 if (socket_svc)
63 {
64
1/1
✓ Branch 1 taken 3539 times.
3539 auto& impl = static_cast<select_socket_impl&>(socket_svc->create_impl());
65 3539 impl.set_socket(accepted_fd);
66
67 3539 sockaddr_in local_addr{};
68 3539 socklen_t local_len = sizeof(local_addr);
69 3539 sockaddr_in remote_addr{};
70 3539 socklen_t remote_len = sizeof(remote_addr);
71
72 3539 endpoint local_ep, remote_ep;
73
1/2
✓ Branch 1 taken 3539 times.
✗ Branch 2 not taken.
3539 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
74 3539 local_ep = from_sockaddr_in(local_addr);
75
1/2
✓ Branch 1 taken 3539 times.
✗ Branch 2 not taken.
3539 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
76 3539 remote_ep = from_sockaddr_in(remote_addr);
77
78 3539 impl.set_endpoints(local_ep, remote_ep);
79
80
1/2
✓ Branch 0 taken 3539 times.
✗ Branch 1 not taken.
3539 if (impl_out)
81 3539 *impl_out = &impl;
82
83 3539 accepted_fd = -1;
84 }
85 else
86 {
87 if (ec_out && !*ec_out)
88 *ec_out = make_err(ENOENT);
89 ::close(accepted_fd);
90 accepted_fd = -1;
91 if (impl_out)
92 *impl_out = nullptr;
93 }
94 }
95 else
96 {
97 ::close(accepted_fd);
98 accepted_fd = -1;
99 if (impl_out)
100 *impl_out = nullptr;
101 }
102 3539 }
103 else
104 {
105
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2 times.
3 if (accepted_fd >= 0)
106 {
107
1/1
✓ Branch 1 taken 1 time.
1 ::close(accepted_fd);
108 1 accepted_fd = -1;
109 }
110
111
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (peer_impl)
112 {
113 peer_impl->release();
114 peer_impl = nullptr;
115 }
116
117
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (impl_out)
118 3 *impl_out = nullptr;
119 }
120
121 // Move to stack before destroying the frame
122 3542 capy::executor_ref saved_ex( std::move( ex ) );
123 3542 std::coroutine_handle<> saved_h( std::move( h ) );
124 3542 impl_ptr.reset();
125
2/2
✓ Branch 1 taken 3542 times.
✓ Branch 4 taken 3542 times.
3542 dispatch_coro(saved_ex, saved_h).resume();
126 3542 }
127
128 42 select_acceptor_impl::
129 42 select_acceptor_impl(select_acceptor_service& svc) noexcept
130 42 : svc_(svc)
131 {
132 42 }
133
134 void
135 42 select_acceptor_impl::
136 release()
137 {
138 42 close_socket();
139 42 svc_.destroy_acceptor_impl(*this);
140 42 }
141
142 std::coroutine_handle<>
143 3542 select_acceptor_impl::
144 accept(
145 std::coroutine_handle<> h,
146 capy::executor_ref ex,
147 std::stop_token token,
148 std::error_code* ec,
149 io_object::io_object_impl** impl_out)
150 {
151 3542 auto& op = acc_;
152 3542 op.reset();
153 3542 op.h = h;
154 3542 op.ex = ex;
155 3542 op.ec_out = ec;
156 3542 op.impl_out = impl_out;
157 3542 op.fd = fd_;
158 3542 op.start(token, this);
159
160 3542 sockaddr_in addr{};
161 3542 socklen_t addrlen = sizeof(addr);
162
1/1
✓ Branch 1 taken 3542 times.
3542 int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
163
164
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3540 times.
3542 if (accepted >= 0)
165 {
166 // Reject fds that exceed select()'s FD_SETSIZE limit.
167 // Better to fail now than during later async operations.
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (accepted >= FD_SETSIZE)
169 {
170 ::close(accepted);
171 op.accepted_fd = -1;
172 op.complete(EINVAL, 0);
173 op.impl_ptr = shared_from_this();
174 svc_.post(&op);
175 // completion is always posted to scheduler queue, never inline.
176 return std::noop_coroutine();
177 }
178
179 // Set non-blocking and close-on-exec flags.
180 // A non-blocking socket is essential for the async reactor;
181 // if we can't configure it, fail rather than risk blocking.
182
1/1
✓ Branch 1 taken 2 times.
2 int flags = ::fcntl(accepted, F_GETFL, 0);
183
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (flags == -1)
184 {
185 int err = errno;
186 ::close(accepted);
187 op.accepted_fd = -1;
188 op.complete(err, 0);
189 op.impl_ptr = shared_from_this();
190 svc_.post(&op);
191 // completion is always posted to scheduler queue, never inline.
192 return std::noop_coroutine();
193 }
194
195
2/3
✓ Branch 1 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
2 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
196 {
197 int err = errno;
198 ::close(accepted);
199 op.accepted_fd = -1;
200 op.complete(err, 0);
201 op.impl_ptr = shared_from_this();
202 svc_.post(&op);
203 // completion is always posted to scheduler queue, never inline.
204 return std::noop_coroutine();
205 }
206
207
2/3
✓ Branch 1 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
2 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
208 {
209 int err = errno;
210 ::close(accepted);
211 op.accepted_fd = -1;
212 op.complete(err, 0);
213 op.impl_ptr = shared_from_this();
214 svc_.post(&op);
215 // completion is always posted to scheduler queue, never inline.
216 return std::noop_coroutine();
217 }
218
219 2 op.accepted_fd = accepted;
220 2 op.complete(0, 0);
221
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
222
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
223 // completion is always posted to scheduler queue, never inline.
224 2 return std::noop_coroutine();
225 }
226
227
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 3540 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
3540 if (errno == EAGAIN || errno == EWOULDBLOCK)
228 {
229 3540 svc_.work_started();
230
1/1
✓ Branch 1 taken 3540 times.
3540 op.impl_ptr = shared_from_this();
231
232 // Set registering BEFORE register_fd to close the race window where
233 // reactor sees an event before we set registered.
234 3540 op.registered.store(select_registration_state::registering, std::memory_order_release);
235
1/1
✓ Branch 2 taken 3540 times.
3540 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
236
237 // Transition to registered. If this fails, reactor or cancel already
238 // claimed the op (state is now unregistered), so we're done. However,
239 // we must still deregister the fd because cancel's deregister_fd may
240 // have run before our register_fd, leaving the fd orphaned.
241 3540 auto expected = select_registration_state::registering;
242
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3540 times.
3540 if (!op.registered.compare_exchange_strong(
243 expected, select_registration_state::registered, std::memory_order_acq_rel))
244 {
245 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
246 // completion is always posted to scheduler queue, never inline.
247 return std::noop_coroutine();
248 }
249
250 // If cancelled was set before we registered, handle it now.
251
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3540 times.
3540 if (op.cancelled.load(std::memory_order_acquire))
252 {
253 auto prev = op.registered.exchange(
254 select_registration_state::unregistered, std::memory_order_acq_rel);
255 if (prev != select_registration_state::unregistered)
256 {
257 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
258 op.impl_ptr = shared_from_this();
259 svc_.post(&op);
260 svc_.work_finished();
261 }
262 }
263 // completion is always posted to scheduler queue, never inline.
264 3540 return std::noop_coroutine();
265 }
266
267 op.complete(errno, 0);
268 op.impl_ptr = shared_from_this();
269 svc_.post(&op);
270 // completion is always posted to scheduler queue, never inline.
271 return std::noop_coroutine();
272 }
273
274 void
275 85 select_acceptor_impl::
276 cancel() noexcept
277 {
278 85 std::shared_ptr<select_acceptor_impl> self;
279 try {
280
1/1
✓ Branch 1 taken 85 times.
85 self = shared_from_this();
281 } catch (const std::bad_weak_ptr&) {
282 return;
283 }
284
285 85 auto prev = acc_.registered.exchange(
286 select_registration_state::unregistered, std::memory_order_acq_rel);
287 85 acc_.request_cancel();
288
289
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 83 times.
85 if (prev != select_registration_state::unregistered)
290 {
291 2 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
292 2 acc_.impl_ptr = self;
293 2 svc_.post(&acc_);
294 2 svc_.work_finished();
295 }
296 85 }
297
298 void
299 select_acceptor_impl::
300 cancel_single_op(select_op& op) noexcept
301 {
302 // Called from stop_token callback to cancel a specific pending operation.
303 auto prev = op.registered.exchange(
304 select_registration_state::unregistered, std::memory_order_acq_rel);
305 op.request_cancel();
306
307 if (prev != select_registration_state::unregistered)
308 {
309 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
310
311 // Keep impl alive until op completes
312 try {
313 op.impl_ptr = shared_from_this();
314 } catch (const std::bad_weak_ptr&) {
315 // Impl is being destroyed, op will be orphaned but that's ok
316 }
317
318 svc_.post(&op);
319 svc_.work_finished();
320 }
321 }
322
323 void
324 84 select_acceptor_impl::
325 close_socket() noexcept
326 {
327 84 cancel();
328
329
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 42 times.
84 if (fd_ >= 0)
330 {
331 // Unconditionally remove from registered_fds_ to handle edge cases
332 42 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
333 42 ::close(fd_);
334 42 fd_ = -1;
335 }
336
337 // Clear cached endpoint
338 84 local_endpoint_ = endpoint{};
339 84 }
340
341 133 select_acceptor_service::
342 133 select_acceptor_service(capy::execution_context& ctx)
343 133 : ctx_(ctx)
344
2/2
✓ Branch 2 taken 133 times.
✓ Branch 5 taken 133 times.
133 , state_(std::make_unique<select_acceptor_state>(ctx.use_service<select_scheduler>()))
345 {
346 133 }
347
348 266 select_acceptor_service::
349 133 ~select_acceptor_service()
350 {
351 266 }
352
353 void
354 133 select_acceptor_service::
355 shutdown()
356 {
357
1/1
✓ Branch 2 taken 133 times.
133 std::lock_guard lock(state_->mutex_);
358
359
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 133 times.
133 while (auto* impl = state_->acceptor_list_.pop_front())
360 impl->close_socket();
361
362 // Don't clear acceptor_ptrs_ here — same rationale as
363 // select_socket_service::shutdown(). Let ~state_ release ptrs
364 // after scheduler shutdown has drained all queued ops.
365 133 }
366
367 tcp_acceptor::acceptor_impl&
368 42 select_acceptor_service::
369 create_acceptor_impl()
370 {
371
1/1
✓ Branch 1 taken 42 times.
42 auto impl = std::make_shared<select_acceptor_impl>(*this);
372 42 auto* raw = impl.get();
373
374
1/1
✓ Branch 2 taken 42 times.
42 std::lock_guard lock(state_->mutex_);
375 42 state_->acceptor_list_.push_back(raw);
376
1/1
✓ Branch 3 taken 42 times.
42 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
377
378 42 return *raw;
379 42 }
380
381 void
382 42 select_acceptor_service::
383 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
384 {
385 42 auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
386
1/1
✓ Branch 2 taken 42 times.
42 std::lock_guard lock(state_->mutex_);
387 42 state_->acceptor_list_.remove(select_impl);
388
1/1
✓ Branch 2 taken 42 times.
42 state_->acceptor_ptrs_.erase(select_impl);
389 42 }
390
391 std::error_code
392 42 select_acceptor_service::
393 open_acceptor(
394 tcp_acceptor::acceptor_impl& impl,
395 endpoint ep,
396 int backlog)
397 {
398 42 auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
399 42 select_impl->close_socket();
400
401 42 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
402
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 if (fd < 0)
403 return make_err(errno);
404
405 // Set non-blocking and close-on-exec
406
1/1
✓ Branch 1 taken 42 times.
42 int flags = ::fcntl(fd, F_GETFL, 0);
407
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 if (flags == -1)
408 {
409 int errn = errno;
410 ::close(fd);
411 return make_err(errn);
412 }
413
2/3
✓ Branch 1 taken 42 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 42 times.
42 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
414 {
415 int errn = errno;
416 ::close(fd);
417 return make_err(errn);
418 }
419
2/3
✓ Branch 1 taken 42 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 42 times.
42 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
420 {
421 int errn = errno;
422 ::close(fd);
423 return make_err(errn);
424 }
425
426 // Check fd is within select() limits
427
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 if (fd >= FD_SETSIZE)
428 {
429 ::close(fd);
430 return make_err(EMFILE);
431 }
432
433 42 int reuse = 1;
434 42 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
435
436 42 sockaddr_in addr = detail::to_sockaddr_in(ep);
437
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
438 {
439 int errn = errno;
440 ::close(fd);
441 return make_err(errn);
442 }
443
444
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (::listen(fd, backlog) < 0)
445 {
446 int errn = errno;
447 ::close(fd);
448 return make_err(errn);
449 }
450
451 42 select_impl->fd_ = fd;
452
453 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
454 42 sockaddr_in local_addr{};
455 42 socklen_t local_len = sizeof(local_addr);
456
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
457 42 select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
458
459 42 return {};
460 }
461
462 void
463 4 select_acceptor_service::
464 post(select_op* op)
465 {
466 4 state_->sched_.post(op);
467 4 }
468
469 void
470 3540 select_acceptor_service::
471 work_started() noexcept
472 {
473 3540 state_->sched_.work_started();
474 3540 }
475
476 void
477 2 select_acceptor_service::
478 work_finished() noexcept
479 {
480 2 state_->sched_.work_finished();
481 2 }
482
483 select_socket_service*
484 3539 select_acceptor_service::
485 socket_service() const noexcept
486 {
487 3539 auto* svc = ctx_.find_service<detail::socket_service>();
488
2/4
✓ Branch 0 taken 3539 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3539 times.
✗ Branch 3 not taken.
3539 return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
489 }
490
491 } // namespace boost::corosio::detail
492
493 #endif // BOOST_COROSIO_HAS_SELECT
494