libs/corosio/src/corosio/src/detail/select/sockets.cpp

73.4% Lines (273/372) 94.1% Functions (32/34) 57.4% Branches (113/197)
libs/corosio/src/corosio/src/detail/select/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/dispatch_coro.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <boost/capy/buffers.hpp>
20
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 98 select_op::canceller::
32 operator()() const noexcept
33 {
34 98 op->cancel();
35 98 }
36
37 void
38 select_connect_op::
39 cancel() noexcept
40 {
41 if (socket_impl_)
42 socket_impl_->cancel_single_op(*this);
43 else
44 request_cancel();
45 }
46
47 void
48 98 select_read_op::
49 cancel() noexcept
50 {
51
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
52 98 socket_impl_->cancel_single_op(*this);
53 else
54 request_cancel();
55 98 }
56
57 void
58 select_write_op::
59 cancel() noexcept
60 {
61 if (socket_impl_)
62 socket_impl_->cancel_single_op(*this);
63 else
64 request_cancel();
65 }
66
67 void
68 3541 select_connect_op::
69 operator()()
70 {
71 3541 stop_cb.reset();
72
73
3/4
✓ Branch 0 taken 3540 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 3540 times.
✗ Branch 4 not taken.
3541 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
74
75 // Cache endpoints on successful connect
76
3/4
✓ Branch 0 taken 3540 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 3540 times.
✗ Branch 3 not taken.
3541 if (success && socket_impl_)
77 {
78 // Query local endpoint via getsockname (may fail, but remote is always known)
79 3540 endpoint local_ep;
80 3540 sockaddr_in local_addr{};
81 3540 socklen_t local_len = sizeof(local_addr);
82
1/2
✓ Branch 1 taken 3540 times.
✗ Branch 2 not taken.
3540 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
83 3540 local_ep = from_sockaddr_in(local_addr);
84 // Always cache remote endpoint; local may be default if getsockname failed
85 3540 static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
86 }
87
88
1/2
✓ Branch 0 taken 3541 times.
✗ Branch 1 not taken.
3541 if (ec_out)
89 {
90
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3541 times.
3541 if (cancelled.load(std::memory_order_acquire))
91 *ec_out = capy::error::canceled;
92
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3540 times.
3541 else if (errn != 0)
93 1 *ec_out = make_err(errn);
94 else
95 3540 *ec_out = {};
96 }
97
98
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3541 times.
3541 if (bytes_out)
99 *bytes_out = bytes_transferred;
100
101 // Move to stack before destroying the frame
102 3541 capy::executor_ref saved_ex( std::move( ex ) );
103 3541 std::coroutine_handle<> saved_h( std::move( h ) );
104 3541 impl_ptr.reset();
105
2/2
✓ Branch 1 taken 3541 times.
✓ Branch 4 taken 3541 times.
3541 dispatch_coro(saved_ex, saved_h).resume();
106 3541 }
107
108 7091 select_socket_impl::
109 7091 select_socket_impl(select_socket_service& svc) noexcept
110 7091 : svc_(svc)
111 {
112 7091 }
113
114 void
115 7091 select_socket_impl::
116 release()
117 {
118 7091 close_socket();
119 7091 svc_.destroy_impl(*this);
120 7091 }
121
122 std::coroutine_handle<>
123 3541 select_socket_impl::
124 connect(
125 std::coroutine_handle<> h,
126 capy::executor_ref ex,
127 endpoint ep,
128 std::stop_token token,
129 std::error_code* ec)
130 {
131 3541 auto& op = conn_;
132 3541 op.reset();
133 3541 op.h = h;
134 3541 op.ex = ex;
135 3541 op.ec_out = ec;
136 3541 op.fd = fd_;
137 3541 op.target_endpoint = ep; // Store target for endpoint caching
138 3541 op.start(token, this);
139
140 3541 sockaddr_in addr = detail::to_sockaddr_in(ep);
141
1/1
✓ Branch 1 taken 3541 times.
3541 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
142
143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3541 times.
3541 if (result == 0)
144 {
145 // Sync success - cache endpoints immediately
146 sockaddr_in local_addr{};
147 socklen_t local_len = sizeof(local_addr);
148 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
149 local_endpoint_ = detail::from_sockaddr_in(local_addr);
150 remote_endpoint_ = ep;
151
152 op.complete(0, 0);
153 op.impl_ptr = shared_from_this();
154 svc_.post(&op);
155 // completion is always posted to scheduler queue, never inline.
156 return std::noop_coroutine();
157 }
158
159
1/2
✓ Branch 0 taken 3541 times.
✗ Branch 1 not taken.
3541 if (errno == EINPROGRESS)
160 {
161 3541 svc_.work_started();
162
1/1
✓ Branch 1 taken 3541 times.
3541 op.impl_ptr = shared_from_this();
163
164 // Set registering BEFORE register_fd to close the race window where
165 // reactor sees an event before we set registered. The reactor treats
166 // registering the same as registered when claiming the op.
167 3541 op.registered.store(select_registration_state::registering, std::memory_order_release);
168
1/1
✓ Branch 2 taken 3541 times.
3541 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
169
170 // Transition to registered. If this fails, reactor or cancel already
171 // claimed the op (state is now unregistered), so we're done. However,
172 // we must still deregister the fd because cancel's deregister_fd may
173 // have run before our register_fd, leaving the fd orphaned.
174 3541 auto expected = select_registration_state::registering;
175
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3541 times.
3541 if (!op.registered.compare_exchange_strong(
176 expected, select_registration_state::registered, std::memory_order_acq_rel))
177 {
178 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
179 // completion is always posted to scheduler queue, never inline.
180 return std::noop_coroutine();
181 }
182
183 // If cancelled was set before we registered, handle it now.
184
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3541 times.
3541 if (op.cancelled.load(std::memory_order_acquire))
185 {
186 auto prev = op.registered.exchange(
187 select_registration_state::unregistered, std::memory_order_acq_rel);
188 if (prev != select_registration_state::unregistered)
189 {
190 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
191 op.impl_ptr = shared_from_this();
192 svc_.post(&op);
193 svc_.work_finished();
194 }
195 }
196 // completion is always posted to scheduler queue, never inline.
197 3541 return std::noop_coroutine();
198 }
199
200 op.complete(errno, 0);
201 op.impl_ptr = shared_from_this();
202 svc_.post(&op);
203 // completion is always posted to scheduler queue, never inline.
204 return std::noop_coroutine();
205 }
206
207 std::coroutine_handle<>
208 78105 select_socket_impl::
209 read_some(
210 std::coroutine_handle<> h,
211 capy::executor_ref ex,
212 io_buffer_param param,
213 std::stop_token token,
214 std::error_code* ec,
215 std::size_t* bytes_out)
216 {
217 78105 auto& op = rd_;
218 78105 op.reset();
219 78105 op.h = h;
220 78105 op.ex = ex;
221 78105 op.ec_out = ec;
222 78105 op.bytes_out = bytes_out;
223 78105 op.fd = fd_;
224 78105 op.start(token, this);
225
226 78105 capy::mutable_buffer bufs[select_read_op::max_buffers];
227 78105 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
228
229
6/8
✓ Branch 0 taken 78104 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 78104 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 78104 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 78104 times.
78105 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
230 {
231 1 op.empty_buffer_read = true;
232 1 op.complete(0, 0);
233
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
234
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
235 1 return std::noop_coroutine();
236 }
237
238
2/2
✓ Branch 0 taken 78104 times.
✓ Branch 1 taken 78104 times.
156208 for (int i = 0; i < op.iovec_count; ++i)
239 {
240 78104 op.iovecs[i].iov_base = bufs[i].data();
241 78104 op.iovecs[i].iov_len = bufs[i].size();
242 }
243
244
1/1
✓ Branch 1 taken 78104 times.
78104 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
245
246
2/2
✓ Branch 0 taken 77817 times.
✓ Branch 1 taken 287 times.
78104 if (n > 0)
247 {
248 77817 op.complete(0, static_cast<std::size_t>(n));
249
1/1
✓ Branch 1 taken 77817 times.
77817 op.impl_ptr = shared_from_this();
250
1/1
✓ Branch 1 taken 77817 times.
77817 svc_.post(&op);
251 77817 return std::noop_coroutine();
252 }
253
254
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 282 times.
287 if (n == 0)
255 {
256 5 op.complete(0, 0);
257
1/1
✓ Branch 1 taken 5 times.
5 op.impl_ptr = shared_from_this();
258
1/1
✓ Branch 1 taken 5 times.
5 svc_.post(&op);
259 5 return std::noop_coroutine();
260 }
261
262
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 282 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
282 if (errno == EAGAIN || errno == EWOULDBLOCK)
263 {
264 282 svc_.work_started();
265
1/1
✓ Branch 1 taken 282 times.
282 op.impl_ptr = shared_from_this();
266
267 // Set registering BEFORE register_fd to close the race window where
268 // reactor sees an event before we set registered.
269 282 op.registered.store(select_registration_state::registering, std::memory_order_release);
270
1/1
✓ Branch 2 taken 282 times.
282 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
271
272 // Transition to registered. If this fails, reactor or cancel already
273 // claimed the op (state is now unregistered), so we're done. However,
274 // we must still deregister the fd because cancel's deregister_fd may
275 // have run before our register_fd, leaving the fd orphaned.
276 282 auto expected = select_registration_state::registering;
277
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 282 times.
282 if (!op.registered.compare_exchange_strong(
278 expected, select_registration_state::registered, std::memory_order_acq_rel))
279 {
280 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
281 return std::noop_coroutine();
282 }
283
284 // If cancelled was set before we registered, handle it now.
285
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 282 times.
282 if (op.cancelled.load(std::memory_order_acquire))
286 {
287 auto prev = op.registered.exchange(
288 select_registration_state::unregistered, std::memory_order_acq_rel);
289 if (prev != select_registration_state::unregistered)
290 {
291 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
292 op.impl_ptr = shared_from_this();
293 svc_.post(&op);
294 svc_.work_finished();
295 }
296 }
297 282 return std::noop_coroutine();
298 }
299
300 op.complete(errno, 0);
301 op.impl_ptr = shared_from_this();
302 svc_.post(&op);
303 return std::noop_coroutine();
304 }
305
306 std::coroutine_handle<>
307 77942 select_socket_impl::
308 write_some(
309 std::coroutine_handle<> h,
310 capy::executor_ref ex,
311 io_buffer_param param,
312 std::stop_token token,
313 std::error_code* ec,
314 std::size_t* bytes_out)
315 {
316 77942 auto& op = wr_;
317 77942 op.reset();
318 77942 op.h = h;
319 77942 op.ex = ex;
320 77942 op.ec_out = ec;
321 77942 op.bytes_out = bytes_out;
322 77942 op.fd = fd_;
323 77942 op.start(token, this);
324
325 77942 capy::mutable_buffer bufs[select_write_op::max_buffers];
326 77942 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
327
328
6/8
✓ Branch 0 taken 77941 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 77941 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 77941 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 77941 times.
77942 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
329 {
330 1 op.complete(0, 0);
331
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
332
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
333 1 return std::noop_coroutine();
334 }
335
336
2/2
✓ Branch 0 taken 77941 times.
✓ Branch 1 taken 77941 times.
155882 for (int i = 0; i < op.iovec_count; ++i)
337 {
338 77941 op.iovecs[i].iov_base = bufs[i].data();
339 77941 op.iovecs[i].iov_len = bufs[i].size();
340 }
341
342 77941 msghdr msg{};
343 77941 msg.msg_iov = op.iovecs;
344 77941 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
345
346
1/1
✓ Branch 1 taken 77941 times.
77941 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
347
348
2/2
✓ Branch 0 taken 77940 times.
✓ Branch 1 taken 1 time.
77941 if (n > 0)
349 {
350 77940 op.complete(0, static_cast<std::size_t>(n));
351
1/1
✓ Branch 1 taken 77940 times.
77940 op.impl_ptr = shared_from_this();
352
1/1
✓ Branch 1 taken 77940 times.
77940 svc_.post(&op);
353 77940 return std::noop_coroutine();
354 }
355
356
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
357 {
358 svc_.work_started();
359 op.impl_ptr = shared_from_this();
360
361 // Set registering BEFORE register_fd to close the race window where
362 // reactor sees an event before we set registered.
363 op.registered.store(select_registration_state::registering, std::memory_order_release);
364 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
365
366 // Transition to registered. If this fails, reactor or cancel already
367 // claimed the op (state is now unregistered), so we're done. However,
368 // we must still deregister the fd because cancel's deregister_fd may
369 // have run before our register_fd, leaving the fd orphaned.
370 auto expected = select_registration_state::registering;
371 if (!op.registered.compare_exchange_strong(
372 expected, select_registration_state::registered, std::memory_order_acq_rel))
373 {
374 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
375 return std::noop_coroutine();
376 }
377
378 // If cancelled was set before we registered, handle it now.
379 if (op.cancelled.load(std::memory_order_acquire))
380 {
381 auto prev = op.registered.exchange(
382 select_registration_state::unregistered, std::memory_order_acq_rel);
383 if (prev != select_registration_state::unregistered)
384 {
385 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
386 op.impl_ptr = shared_from_this();
387 svc_.post(&op);
388 svc_.work_finished();
389 }
390 }
391 return std::noop_coroutine();
392 }
393
394
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
395
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
396
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
397 1 return std::noop_coroutine();
398 }
399
400 std::error_code
401 3 select_socket_impl::
402 shutdown(tcp_socket::shutdown_type what) noexcept
403 {
404 int how;
405
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
406 {
407 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
408 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
409 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
410 default:
411 return make_err(EINVAL);
412 }
413
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
414 return make_err(errno);
415 3 return {};
416 }
417
418 std::error_code
419 5 select_socket_impl::
420 set_no_delay(bool value) noexcept
421 {
422
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
423
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
424 return make_err(errno);
425 5 return {};
426 }
427
428 bool
429 5 select_socket_impl::
430 no_delay(std::error_code& ec) const noexcept
431 {
432 5 int flag = 0;
433 5 socklen_t len = sizeof(flag);
434
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
435 {
436 ec = make_err(errno);
437 return false;
438 }
439 5 ec = {};
440 5 return flag != 0;
441 }
442
443 std::error_code
444 4 select_socket_impl::
445 set_keep_alive(bool value) noexcept
446 {
447
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
448
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
449 return make_err(errno);
450 4 return {};
451 }
452
453 bool
454 4 select_socket_impl::
455 keep_alive(std::error_code& ec) const noexcept
456 {
457 4 int flag = 0;
458 4 socklen_t len = sizeof(flag);
459
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
460 {
461 ec = make_err(errno);
462 return false;
463 }
464 4 ec = {};
465 4 return flag != 0;
466 }
467
468 std::error_code
469 1 select_socket_impl::
470 set_receive_buffer_size(int size) noexcept
471 {
472
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
473 return make_err(errno);
474 1 return {};
475 }
476
477 int
478 3 select_socket_impl::
479 receive_buffer_size(std::error_code& ec) const noexcept
480 {
481 3 int size = 0;
482 3 socklen_t len = sizeof(size);
483
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
484 {
485 ec = make_err(errno);
486 return 0;
487 }
488 3 ec = {};
489 3 return size;
490 }
491
492 std::error_code
493 1 select_socket_impl::
494 set_send_buffer_size(int size) noexcept
495 {
496
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
497 return make_err(errno);
498 1 return {};
499 }
500
501 int
502 3 select_socket_impl::
503 send_buffer_size(std::error_code& ec) const noexcept
504 {
505 3 int size = 0;
506 3 socklen_t len = sizeof(size);
507
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
508 {
509 ec = make_err(errno);
510 return 0;
511 }
512 3 ec = {};
513 3 return size;
514 }
515
516 std::error_code
517 4 select_socket_impl::
518 set_linger(bool enabled, int timeout) noexcept
519 {
520
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
521 1 return make_err(EINVAL);
522 struct ::linger lg;
523
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
524 3 lg.l_linger = timeout;
525
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
526 return make_err(errno);
527 3 return {};
528 }
529
530 tcp_socket::linger_options
531 3 select_socket_impl::
532 linger(std::error_code& ec) const noexcept
533 {
534 3 struct ::linger lg{};
535 3 socklen_t len = sizeof(lg);
536
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
537 {
538 ec = make_err(errno);
539 return {};
540 }
541 3 ec = {};
542 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
543 }
544
545 void
546 10820 select_socket_impl::
547 cancel() noexcept
548 {
549 10820 std::shared_ptr<select_socket_impl> self;
550 try {
551
1/1
✓ Branch 1 taken 10820 times.
10820 self = shared_from_this();
552 } catch (const std::bad_weak_ptr&) {
553 return;
554 }
555
556 32460 auto cancel_op = [this, &self](select_op& op, int events) {
557 32460 auto prev = op.registered.exchange(
558 select_registration_state::unregistered, std::memory_order_acq_rel);
559 32460 op.request_cancel();
560
2/2
✓ Branch 0 taken 93 times.
✓ Branch 1 taken 32367 times.
32460 if (prev != select_registration_state::unregistered)
561 {
562 93 svc_.scheduler().deregister_fd(fd_, events);
563 93 op.impl_ptr = self;
564 93 svc_.post(&op);
565 93 svc_.work_finished();
566 }
567 43280 };
568
569 10820 cancel_op(conn_, select_scheduler::event_write);
570 10820 cancel_op(rd_, select_scheduler::event_read);
571 10820 cancel_op(wr_, select_scheduler::event_write);
572 10820 }
573
574 void
575 98 select_socket_impl::
576 cancel_single_op(select_op& op) noexcept
577 {
578 // Called from stop_token callback to cancel a specific pending operation.
579 98 auto prev = op.registered.exchange(
580 select_registration_state::unregistered, std::memory_order_acq_rel);
581 98 op.request_cancel();
582
583
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (prev != select_registration_state::unregistered)
584 {
585 // Determine which event type to deregister
586 66 int events = 0;
587
2/4
✓ Branch 0 taken 66 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 66 times.
66 if (&op == &conn_ || &op == &wr_)
588 events = select_scheduler::event_write;
589
1/2
✓ Branch 0 taken 66 times.
✗ Branch 1 not taken.
66 else if (&op == &rd_)
590 66 events = select_scheduler::event_read;
591
592 66 svc_.scheduler().deregister_fd(fd_, events);
593
594 // Keep impl alive until op completes
595 try {
596
1/1
✓ Branch 1 taken 66 times.
66 op.impl_ptr = shared_from_this();
597 } catch (const std::bad_weak_ptr&) {
598 // Impl is being destroyed, op will be orphaned but that's ok
599 }
600
601 66 svc_.post(&op);
602 66 svc_.work_finished();
603 }
604 98 }
605
606 void
607 10643 select_socket_impl::
608 close_socket() noexcept
609 {
610 10643 cancel();
611
612
2/2
✓ Branch 0 taken 7091 times.
✓ Branch 1 taken 3552 times.
10643 if (fd_ >= 0)
613 {
614 // Unconditionally remove from registered_fds_ to handle edge cases
615 // where the fd might be registered but cancel() didn't clean it up
616 // due to race conditions.
617 7091 svc_.scheduler().deregister_fd(fd_,
618 select_scheduler::event_read | select_scheduler::event_write);
619 7091 ::close(fd_);
620 7091 fd_ = -1;
621 }
622
623 // Clear cached endpoints
624 10643 local_endpoint_ = endpoint{};
625 10643 remote_endpoint_ = endpoint{};
626 10643 }
627
628 133 select_socket_service::
629 133 select_socket_service(capy::execution_context& ctx)
630
2/2
✓ Branch 2 taken 133 times.
✓ Branch 5 taken 133 times.
133 : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
631 {
632 133 }
633
634 266 select_socket_service::
635 133 ~select_socket_service()
636 {
637 266 }
638
639 void
640 133 select_socket_service::
641 shutdown()
642 {
643
1/1
✓ Branch 2 taken 133 times.
133 std::lock_guard lock(state_->mutex_);
644
645
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 133 times.
133 while (auto* impl = state_->socket_list_.pop_front())
646 impl->close_socket();
647
648 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
649 // drains completed_ops_, calling destroy() on each queued op. Letting
650 // ~state_ release the ptrs (during service destruction, after scheduler
651 // shutdown) keeps every impl alive until all ops have been drained.
652 133 }
653
654 tcp_socket::socket_impl&
655 7091 select_socket_service::
656 create_impl()
657 {
658
1/1
✓ Branch 1 taken 7091 times.
7091 auto impl = std::make_shared<select_socket_impl>(*this);
659 7091 auto* raw = impl.get();
660
661 {
662
1/1
✓ Branch 2 taken 7091 times.
7091 std::lock_guard lock(state_->mutex_);
663 7091 state_->socket_list_.push_back(raw);
664
1/1
✓ Branch 3 taken 7091 times.
7091 state_->socket_ptrs_.emplace(raw, std::move(impl));
665 7091 }
666
667 7091 return *raw;
668 7091 }
669
670 void
671 7091 select_socket_service::
672 destroy_impl(tcp_socket::socket_impl& impl)
673 {
674 7091 auto* select_impl = static_cast<select_socket_impl*>(&impl);
675
1/1
✓ Branch 2 taken 7091 times.
7091 std::lock_guard lock(state_->mutex_);
676 7091 state_->socket_list_.remove(select_impl);
677
1/1
✓ Branch 2 taken 7091 times.
7091 state_->socket_ptrs_.erase(select_impl);
678 7091 }
679
680 std::error_code
681 3552 select_socket_service::
682 open_socket(tcp_socket::socket_impl& impl)
683 {
684 3552 auto* select_impl = static_cast<select_socket_impl*>(&impl);
685 3552 select_impl->close_socket();
686
687 3552 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
688
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3552 times.
3552 if (fd < 0)
689 return make_err(errno);
690
691 // Set non-blocking and close-on-exec
692 3552 int flags = ::fcntl(fd, F_GETFL, 0);
693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3552 times.
3552 if (flags == -1)
694 {
695 int errn = errno;
696 ::close(fd);
697 return make_err(errn);
698 }
699
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3552 times.
3552 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
700 {
701 int errn = errno;
702 ::close(fd);
703 return make_err(errn);
704 }
705
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3552 times.
3552 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
706 {
707 int errn = errno;
708 ::close(fd);
709 return make_err(errn);
710 }
711
712 // Check fd is within select() limits
713
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3552 times.
3552 if (fd >= FD_SETSIZE)
714 {
715 ::close(fd);
716 return make_err(EMFILE); // Too many open files
717 }
718
719 3552 select_impl->fd_ = fd;
720 3552 return {};
721 }
722
723 void
724 155924 select_socket_service::
725 post(select_op* op)
726 {
727 155924 state_->sched_.post(op);
728 155924 }
729
730 void
731 3823 select_socket_service::
732 work_started() noexcept
733 {
734 3823 state_->sched_.work_started();
735 3823 }
736
737 void
738 159 select_socket_service::
739 work_finished() noexcept
740 {
741 159 state_->sched_.work_finished();
742 159 }
743
744 } // namespace boost::corosio::detail
745
746 #endif // BOOST_COROSIO_HAS_SELECT
747