libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

80.6% Lines (333/413) 94.4% Functions (34/36) 64.3% Branches (142/221)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 // Register an op with the reactor, handling cached edge events.
34 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
35 void
36 5164 epoll_socket_impl::
37 register_op(
38 epoll_op& op,
39 epoll_op*& desc_slot,
40 bool& ready_flag) noexcept
41 {
42 5164 svc_.work_started();
43
44 5164 std::lock_guard lock(desc_state_.mutex);
45 5164 bool io_done = false;
46
2/2
✓ Branch 0 taken 93 times.
✓ Branch 1 taken 5071 times.
5164 if (ready_flag)
47 {
48 93 ready_flag = false;
49 93 op.perform_io();
50
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 93 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
93 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
51
1/2
✓ Branch 0 taken 93 times.
✗ Branch 1 not taken.
93 if (!io_done)
52 93 op.errn = 0;
53 }
54
55
3/6
✓ Branch 0 taken 5164 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 5164 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 5164 times.
5164 if (io_done || op.cancelled.load(std::memory_order_acquire))
56 {
57 svc_.post(&op);
58 svc_.work_finished();
59 }
60 else
61 {
62 5164 desc_slot = &op;
63 }
64 5164 }
65
66 void
67 104 epoll_op::canceller::
68 operator()() const noexcept
69 {
70 104 op->cancel();
71 104 }
72
73 void
74 epoll_connect_op::
75 cancel() noexcept
76 {
77 if (socket_impl_)
78 socket_impl_->cancel_single_op(*this);
79 else
80 request_cancel();
81 }
82
83 void
84 98 epoll_read_op::
85 cancel() noexcept
86 {
87
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
88 98 socket_impl_->cancel_single_op(*this);
89 else
90 request_cancel();
91 98 }
92
93 void
94 epoll_write_op::
95 cancel() noexcept
96 {
97 if (socket_impl_)
98 socket_impl_->cancel_single_op(*this);
99 else
100 request_cancel();
101 }
102
103 void
104 74933 epoll_op::
105 operator()()
106 {
107 74933 stop_cb.reset();
108
109 74933 socket_impl_->svc_.scheduler().reset_inline_budget();
110
111
2/2
✓ Branch 1 taken 157 times.
✓ Branch 2 taken 74776 times.
74933 if (cancelled.load(std::memory_order_acquire))
112 157 *ec_out = capy::error::canceled;
113
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 74776 times.
74776 else if (errn != 0)
114 *ec_out = make_err(errn);
115
4/6
✓ Branch 1 taken 37406 times.
✓ Branch 2 taken 37370 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 37406 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 74776 times.
74776 else if (is_read_operation() && bytes_transferred == 0)
116 *ec_out = capy::error::eof;
117 else
118 74776 *ec_out = {};
119
120 74933 *bytes_out = bytes_transferred;
121
122 // Move to stack before resuming coroutine. The coroutine might close
123 // the socket, releasing the last wrapper ref. If impl_ptr were the
124 // last ref and we destroyed it while still in operator(), we'd have
125 // use-after-free. Moving to local ensures destruction happens at
126 // function exit, after all member accesses are complete.
127 74933 capy::executor_ref saved_ex( std::move( ex ) );
128 74933 std::coroutine_handle<> saved_h( std::move( h ) );
129 74933 auto prevent_premature_destruction = std::move(impl_ptr);
130
2/2
✓ Branch 1 taken 74933 times.
✓ Branch 4 taken 74933 times.
74933 dispatch_coro(saved_ex, saved_h).resume();
131 74933 }
132
133 void
134 4963 epoll_connect_op::
135 operator()()
136 {
137 4963 stop_cb.reset();
138
139 4963 socket_impl_->svc_.scheduler().reset_inline_budget();
140
141
3/4
✓ Branch 0 taken 4962 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 4962 times.
✗ Branch 4 not taken.
4963 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
142
143 // Cache endpoints on successful connect
144
3/4
✓ Branch 0 taken 4962 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 4962 times.
✗ Branch 3 not taken.
4963 if (success && socket_impl_)
145 {
146 // Query local endpoint via getsockname (may fail, but remote is always known)
147 4962 endpoint local_ep;
148 4962 sockaddr_in local_addr{};
149 4962 socklen_t local_len = sizeof(local_addr);
150
1/2
✓ Branch 1 taken 4962 times.
✗ Branch 2 not taken.
4962 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
151 4962 local_ep = from_sockaddr_in(local_addr);
152 // Always cache remote endpoint; local may be default if getsockname failed
153 4962 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
154 }
155
156
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4963 times.
4963 if (cancelled.load(std::memory_order_acquire))
157 *ec_out = capy::error::canceled;
158
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 4962 times.
4963 else if (errn != 0)
159 1 *ec_out = make_err(errn);
160 else
161 4962 *ec_out = {};
162
163 // Move to stack before resuming. See epoll_op::operator()() for rationale.
164 4963 capy::executor_ref saved_ex( std::move( ex ) );
165 4963 std::coroutine_handle<> saved_h( std::move( h ) );
166 4963 auto prevent_premature_destruction = std::move(impl_ptr);
167
2/2
✓ Branch 1 taken 4963 times.
✓ Branch 4 taken 4963 times.
4963 dispatch_coro(saved_ex, saved_h).resume();
168 4963 }
169
170 9936 epoll_socket_impl::
171 9936 epoll_socket_impl(epoll_socket_service& svc) noexcept
172 9936 : svc_(svc)
173 {
174 9936 }
175
176 9936 epoll_socket_impl::
177 ~epoll_socket_impl() = default;
178
179 void
180 9936 epoll_socket_impl::
181 release()
182 {
183 9936 close_socket();
184 9936 svc_.destroy_impl(*this);
185 9936 }
186
187 std::coroutine_handle<>
188 4963 epoll_socket_impl::
189 connect(
190 std::coroutine_handle<> h,
191 capy::executor_ref ex,
192 endpoint ep,
193 std::stop_token token,
194 std::error_code* ec)
195 {
196 4963 auto& op = conn_;
197
198 4963 sockaddr_in addr = detail::to_sockaddr_in(ep);
199
1/1
✓ Branch 1 taken 4963 times.
4963 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
200
201
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4963 times.
4963 if (result == 0)
202 {
203 sockaddr_in local_addr{};
204 socklen_t local_len = sizeof(local_addr);
205 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
206 local_endpoint_ = detail::from_sockaddr_in(local_addr);
207 remote_endpoint_ = ep;
208 }
209
210
2/4
✓ Branch 0 taken 4963 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4963 times.
4963 if (result == 0 || errno != EINPROGRESS)
211 {
212 int err = (result < 0) ? errno : 0;
213 if (svc_.scheduler().try_consume_inline_budget())
214 {
215 *ec = err ? make_err(err) : std::error_code{};
216 return ex.dispatch(h);
217 }
218 op.reset();
219 op.h = h;
220 op.ex = ex;
221 op.ec_out = ec;
222 op.fd = fd_;
223 op.target_endpoint = ep;
224 op.start(token, this);
225 op.impl_ptr = shared_from_this();
226 op.complete(err, 0);
227 svc_.post(&op);
228 return std::noop_coroutine();
229 }
230
231 // EINPROGRESS — register with reactor
232 4963 op.reset();
233 4963 op.h = h;
234 4963 op.ex = ex;
235 4963 op.ec_out = ec;
236 4963 op.fd = fd_;
237 4963 op.target_endpoint = ep;
238 4963 op.start(token, this);
239
1/1
✓ Branch 1 taken 4963 times.
4963 op.impl_ptr = shared_from_this();
240
241 4963 register_op(op, desc_state_.connect_op, desc_state_.write_ready);
242 4963 return std::noop_coroutine();
243 }
244
245 std::coroutine_handle<>
246 112378 epoll_socket_impl::
247 read_some(
248 std::coroutine_handle<> h,
249 capy::executor_ref ex,
250 io_buffer_param param,
251 std::stop_token token,
252 std::error_code* ec,
253 std::size_t* bytes_out)
254 {
255 112378 auto& op = rd_;
256 112378 op.reset();
257
258 112378 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
259 112378 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
260
261
6/8
✓ Branch 0 taken 112377 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 112377 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 112377 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 112377 times.
112378 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
262 {
263 1 op.empty_buffer_read = true;
264 1 op.h = h;
265 1 op.ex = ex;
266 1 op.ec_out = ec;
267 1 op.bytes_out = bytes_out;
268 1 op.start(token, this);
269
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
270 1 op.complete(0, 0);
271
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
272 1 return std::noop_coroutine();
273 }
274
275
2/2
✓ Branch 0 taken 112377 times.
✓ Branch 1 taken 112377 times.
224754 for (int i = 0; i < op.iovec_count; ++i)
276 {
277 112377 op.iovecs[i].iov_base = bufs[i].data();
278 112377 op.iovecs[i].iov_len = bufs[i].size();
279 }
280
281 // Speculative read
282 ssize_t n;
283 do {
284
1/1
✓ Branch 1 taken 112377 times.
112377 n = ::readv(fd_, op.iovecs, op.iovec_count);
285
3/4
✓ Branch 0 taken 201 times.
✓ Branch 1 taken 112176 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 201 times.
112377 } while (n < 0 && errno == EINTR);
286
287
3/6
✓ Branch 0 taken 201 times.
✓ Branch 1 taken 112176 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 201 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
112377 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
288 {
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 112176 times.
112176 int err = (n < 0) ? errno : 0;
290 112176 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
291
292
2/2
✓ Branch 2 taken 74818 times.
✓ Branch 3 taken 37358 times.
112176 if (svc_.scheduler().try_consume_inline_budget())
293 {
294
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 74818 times.
74818 if (err)
295 *ec = make_err(err);
296
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 74813 times.
74818 else if (n == 0)
297 5 *ec = capy::error::eof;
298 else
299 74813 *ec = {};
300 74818 *bytes_out = bytes;
301
1/1
✓ Branch 1 taken 74818 times.
74818 return ex.dispatch(h);
302 }
303 37358 op.h = h;
304 37358 op.ex = ex;
305 37358 op.ec_out = ec;
306 37358 op.bytes_out = bytes_out;
307 37358 op.start(token, this);
308
1/1
✓ Branch 1 taken 37358 times.
37358 op.impl_ptr = shared_from_this();
309 37358 op.complete(err, bytes);
310
1/1
✓ Branch 1 taken 37358 times.
37358 svc_.post(&op);
311 37358 return std::noop_coroutine();
312 }
313
314 // EAGAIN — register with reactor
315 201 op.h = h;
316 201 op.ex = ex;
317 201 op.ec_out = ec;
318 201 op.bytes_out = bytes_out;
319 201 op.fd = fd_;
320 201 op.start(token, this);
321
1/1
✓ Branch 1 taken 201 times.
201 op.impl_ptr = shared_from_this();
322
323 201 register_op(op, desc_state_.read_op, desc_state_.read_ready);
324 201 return std::noop_coroutine();
325 }
326
327 std::coroutine_handle<>
328 112225 epoll_socket_impl::
329 write_some(
330 std::coroutine_handle<> h,
331 capy::executor_ref ex,
332 io_buffer_param param,
333 std::stop_token token,
334 std::error_code* ec,
335 std::size_t* bytes_out)
336 {
337 112225 auto& op = wr_;
338 112225 op.reset();
339
340 112225 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
341 112225 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
342
343
6/8
✓ Branch 0 taken 112224 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 112224 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 112224 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 112224 times.
112225 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
344 {
345 1 op.h = h;
346 1 op.ex = ex;
347 1 op.ec_out = ec;
348 1 op.bytes_out = bytes_out;
349 1 op.start(token, this);
350
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
351 1 op.complete(0, 0);
352
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
353 1 return std::noop_coroutine();
354 }
355
356
2/2
✓ Branch 0 taken 112224 times.
✓ Branch 1 taken 112224 times.
224448 for (int i = 0; i < op.iovec_count; ++i)
357 {
358 112224 op.iovecs[i].iov_base = bufs[i].data();
359 112224 op.iovecs[i].iov_len = bufs[i].size();
360 }
361
362 // Speculative write
363 112224 msghdr msg{};
364 112224 msg.msg_iov = op.iovecs;
365 112224 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
366
367 ssize_t n;
368 do {
369
1/1
✓ Branch 1 taken 112224 times.
112224 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
370
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 112223 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
112224 } while (n < 0 && errno == EINTR);
371
372
4/6
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 112223 times.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 time.
✗ Branch 5 not taken.
112224 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
373 {
374
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 112223 times.
112224 int err = (n < 0) ? errno : 0;
375 112224 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
376
377
2/2
✓ Branch 2 taken 74852 times.
✓ Branch 3 taken 37372 times.
112224 if (svc_.scheduler().try_consume_inline_budget())
378 {
379
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 74851 times.
74852 *ec = err ? make_err(err) : std::error_code{};
380 74852 *bytes_out = bytes;
381
1/1
✓ Branch 1 taken 74852 times.
74852 return ex.dispatch(h);
382 }
383 37372 op.h = h;
384 37372 op.ex = ex;
385 37372 op.ec_out = ec;
386 37372 op.bytes_out = bytes_out;
387 37372 op.start(token, this);
388
1/1
✓ Branch 1 taken 37372 times.
37372 op.impl_ptr = shared_from_this();
389 37372 op.complete(err, bytes);
390
1/1
✓ Branch 1 taken 37372 times.
37372 svc_.post(&op);
391 37372 return std::noop_coroutine();
392 }
393
394 // EAGAIN — register with reactor
395 op.h = h;
396 op.ex = ex;
397 op.ec_out = ec;
398 op.bytes_out = bytes_out;
399 op.fd = fd_;
400 op.start(token, this);
401 op.impl_ptr = shared_from_this();
402
403 register_op(op, desc_state_.write_op, desc_state_.write_ready);
404 return std::noop_coroutine();
405 }
406
407 std::error_code
408 3 epoll_socket_impl::
409 shutdown(tcp_socket::shutdown_type what) noexcept
410 {
411 int how;
412
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
413 {
414 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
415 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
416 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
417 default:
418 return make_err(EINVAL);
419 }
420
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
421 return make_err(errno);
422 3 return {};
423 }
424
425 std::error_code
426 5 epoll_socket_impl::
427 set_no_delay(bool value) noexcept
428 {
429
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
430
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
431 return make_err(errno);
432 5 return {};
433 }
434
435 bool
436 5 epoll_socket_impl::
437 no_delay(std::error_code& ec) const noexcept
438 {
439 5 int flag = 0;
440 5 socklen_t len = sizeof(flag);
441
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
442 {
443 ec = make_err(errno);
444 return false;
445 }
446 5 ec = {};
447 5 return flag != 0;
448 }
449
450 std::error_code
451 4 epoll_socket_impl::
452 set_keep_alive(bool value) noexcept
453 {
454
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
455
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
456 return make_err(errno);
457 4 return {};
458 }
459
460 bool
461 4 epoll_socket_impl::
462 keep_alive(std::error_code& ec) const noexcept
463 {
464 4 int flag = 0;
465 4 socklen_t len = sizeof(flag);
466
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
467 {
468 ec = make_err(errno);
469 return false;
470 }
471 4 ec = {};
472 4 return flag != 0;
473 }
474
475 std::error_code
476 1 epoll_socket_impl::
477 set_receive_buffer_size(int size) noexcept
478 {
479
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
480 return make_err(errno);
481 1 return {};
482 }
483
484 int
485 3 epoll_socket_impl::
486 receive_buffer_size(std::error_code& ec) const noexcept
487 {
488 3 int size = 0;
489 3 socklen_t len = sizeof(size);
490
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
491 {
492 ec = make_err(errno);
493 return 0;
494 }
495 3 ec = {};
496 3 return size;
497 }
498
499 std::error_code
500 1 epoll_socket_impl::
501 set_send_buffer_size(int size) noexcept
502 {
503
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
504 return make_err(errno);
505 1 return {};
506 }
507
508 int
509 3 epoll_socket_impl::
510 send_buffer_size(std::error_code& ec) const noexcept
511 {
512 3 int size = 0;
513 3 socklen_t len = sizeof(size);
514
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
515 {
516 ec = make_err(errno);
517 return 0;
518 }
519 3 ec = {};
520 3 return size;
521 }
522
523 std::error_code
524 8 epoll_socket_impl::
525 set_linger(bool enabled, int timeout) noexcept
526 {
527
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 7 times.
8 if (timeout < 0)
528 1 return make_err(EINVAL);
529 struct ::linger lg;
530
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 1 time.
7 lg.l_onoff = enabled ? 1 : 0;
531 7 lg.l_linger = timeout;
532
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
533 return make_err(errno);
534 7 return {};
535 }
536
537 tcp_socket::linger_options
538 3 epoll_socket_impl::
539 linger(std::error_code& ec) const noexcept
540 {
541 3 struct ::linger lg{};
542 3 socklen_t len = sizeof(lg);
543
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
544 {
545 ec = make_err(errno);
546 return {};
547 }
548 3 ec = {};
549 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
550 }
551
552 void
553 15006 epoll_socket_impl::
554 cancel() noexcept
555 {
556 15006 std::shared_ptr<epoll_socket_impl> self;
557 try {
558
1/1
✓ Branch 1 taken 15006 times.
15006 self = shared_from_this();
559 } catch (const std::bad_weak_ptr&) {
560 return;
561 }
562
563 15006 conn_.request_cancel();
564 15006 rd_.request_cancel();
565 15006 wr_.request_cancel();
566
567 15006 epoll_op* conn_claimed = nullptr;
568 15006 epoll_op* rd_claimed = nullptr;
569 15006 epoll_op* wr_claimed = nullptr;
570 {
571 15006 std::lock_guard lock(desc_state_.mutex);
572
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15006 times.
15006 if (desc_state_.connect_op == &conn_)
573 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
574
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 14955 times.
15006 if (desc_state_.read_op == &rd_)
575 51 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
576
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15006 times.
15006 if (desc_state_.write_op == &wr_)
577 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
578 15006 }
579
580
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15006 times.
15006 if (conn_claimed)
581 {
582 conn_.impl_ptr = self;
583 svc_.post(&conn_);
584 svc_.work_finished();
585 }
586
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 14955 times.
15006 if (rd_claimed)
587 {
588 51 rd_.impl_ptr = self;
589 51 svc_.post(&rd_);
590 51 svc_.work_finished();
591 }
592
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15006 times.
15006 if (wr_claimed)
593 {
594 wr_.impl_ptr = self;
595 svc_.post(&wr_);
596 svc_.work_finished();
597 }
598 15006 }
599
600 void
601 98 epoll_socket_impl::
602 cancel_single_op(epoll_op& op) noexcept
603 {
604 98 op.request_cancel();
605
606 98 epoll_op** desc_op_ptr = nullptr;
607
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
608
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
609 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
610
611
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (desc_op_ptr)
612 {
613 98 epoll_op* claimed = nullptr;
614 {
615 98 std::lock_guard lock(desc_state_.mutex);
616
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (*desc_op_ptr == &op)
617 98 claimed = std::exchange(*desc_op_ptr, nullptr);
618 98 }
619
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (claimed)
620 {
621 try {
622
1/1
✓ Branch 1 taken 98 times.
98 op.impl_ptr = shared_from_this();
623 } catch (const std::bad_weak_ptr&) {}
624 98 svc_.post(&op);
625 98 svc_.work_finished();
626 }
627 }
628 98 }
629
630 void
631 14910 epoll_socket_impl::
632 close_socket() noexcept
633 {
634 14910 cancel();
635
636 // Keep impl alive if descriptor_state is queued in the scheduler.
637 // Without this, destroy_impl() drops the last shared_ptr while
638 // the queued descriptor_state node would become dangling.
639
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 14903 times.
14910 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
640 {
641 try {
642
1/1
✓ Branch 1 taken 7 times.
7 desc_state_.impl_ref_ = shared_from_this();
643 } catch (std::bad_weak_ptr const&) {}
644 }
645
646
2/2
✓ Branch 0 taken 9936 times.
✓ Branch 1 taken 4974 times.
14910 if (fd_ >= 0)
647 {
648
1/2
✓ Branch 0 taken 9936 times.
✗ Branch 1 not taken.
9936 if (desc_state_.registered_events != 0)
649 9936 svc_.scheduler().deregister_descriptor(fd_);
650 9936 ::close(fd_);
651 9936 fd_ = -1;
652 }
653
654 14910 desc_state_.fd = -1;
655 {
656 14910 std::lock_guard lock(desc_state_.mutex);
657 14910 desc_state_.read_op = nullptr;
658 14910 desc_state_.write_op = nullptr;
659 14910 desc_state_.connect_op = nullptr;
660 14910 desc_state_.read_ready = false;
661 14910 desc_state_.write_ready = false;
662 14910 }
663 14910 desc_state_.registered_events = 0;
664
665 14910 local_endpoint_ = endpoint{};
666 14910 remote_endpoint_ = endpoint{};
667 14910 }
668
669 203 epoll_socket_service::
670 203 epoll_socket_service(capy::execution_context& ctx)
671
2/2
✓ Branch 2 taken 203 times.
✓ Branch 5 taken 203 times.
203 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
672 {
673 203 }
674
675 406 epoll_socket_service::
676 203 ~epoll_socket_service()
677 {
678 406 }
679
680 void
681 203 epoll_socket_service::
682 shutdown()
683 {
684
1/1
✓ Branch 2 taken 203 times.
203 std::lock_guard lock(state_->mutex_);
685
686
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 203 times.
203 while (auto* impl = state_->socket_list_.pop_front())
687 impl->close_socket();
688
689 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
690 // drains completed_ops_, calling destroy() on each queued op. If we
691 // released our shared_ptrs now, an epoll_op::destroy() could free the
692 // last ref to an impl whose embedded descriptor_state is still linked
693 // in the queue — use-after-free on the next pop(). Letting ~state_
694 // release the ptrs (during service destruction, after scheduler
695 // shutdown) keeps every impl alive until all ops have been drained.
696 203 }
697
698 tcp_socket::socket_impl&
699 9936 epoll_socket_service::
700 create_impl()
701 {
702
1/1
✓ Branch 1 taken 9936 times.
9936 auto impl = std::make_shared<epoll_socket_impl>(*this);
703 9936 auto* raw = impl.get();
704
705 {
706
1/1
✓ Branch 2 taken 9936 times.
9936 std::lock_guard lock(state_->mutex_);
707 9936 state_->socket_list_.push_back(raw);
708
1/1
✓ Branch 3 taken 9936 times.
9936 state_->socket_ptrs_.emplace(raw, std::move(impl));
709 9936 }
710
711 9936 return *raw;
712 9936 }
713
714 void
715 9936 epoll_socket_service::
716 destroy_impl(tcp_socket::socket_impl& impl)
717 {
718 9936 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
719
1/1
✓ Branch 2 taken 9936 times.
9936 std::lock_guard lock(state_->mutex_);
720 9936 state_->socket_list_.remove(epoll_impl);
721
1/1
✓ Branch 2 taken 9936 times.
9936 state_->socket_ptrs_.erase(epoll_impl);
722 9936 }
723
724 std::error_code
725 4974 epoll_socket_service::
726 open_socket(tcp_socket::socket_impl& impl)
727 {
728 4974 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
729 4974 epoll_impl->close_socket();
730
731 4974 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
732
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4974 times.
4974 if (fd < 0)
733 return make_err(errno);
734
735 4974 epoll_impl->fd_ = fd;
736
737 // Register fd with epoll (edge-triggered mode)
738 4974 epoll_impl->desc_state_.fd = fd;
739 {
740
1/1
✓ Branch 1 taken 4974 times.
4974 std::lock_guard lock(epoll_impl->desc_state_.mutex);
741 4974 epoll_impl->desc_state_.read_op = nullptr;
742 4974 epoll_impl->desc_state_.write_op = nullptr;
743 4974 epoll_impl->desc_state_.connect_op = nullptr;
744 4974 }
745 4974 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
746
747 4974 return {};
748 }
749
750 void
751 74881 epoll_socket_service::
752 post(epoll_op* op)
753 {
754 74881 state_->sched_.post(op);
755 74881 }
756
757 void
758 5164 epoll_socket_service::
759 work_started() noexcept
760 {
761 5164 state_->sched_.work_started();
762 5164 }
763
764 void
765 149 epoll_socket_service::
766 work_finished() noexcept
767 {
768 149 state_->sched_.work_finished();
769 149 }
770
771 } // namespace boost::corosio::detail
772
773 #endif // BOOST_COROSIO_HAS_EPOLL
774