libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp

79.7% Lines (177/222) 100.0% Functions (18/18) 55.7% Branches (68/122)
libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18 #include "src/detail/make_err.hpp"
19
20 #include <utility>
21
22 #include <errno.h>
23 #include <netinet/in.h>
24 #include <sys/epoll.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 6 epoll_accept_op::
32 cancel() noexcept
33 {
34
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (acceptor_impl_)
35 6 acceptor_impl_->cancel_single_op(*this);
36 else
37 request_cancel();
38 6 }
39
40 void
41 4971 epoll_accept_op::
42 operator()()
43 {
44 4971 stop_cb.reset();
45
46 4971 static_cast<epoll_acceptor_impl*>(acceptor_impl_)
47 4971 ->service().scheduler().reset_inline_budget();
48
49
3/4
✓ Branch 0 taken 4971 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 4962 times.
✓ Branch 4 taken 9 times.
4971 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
50
51
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 4962 times.
4971 if (cancelled.load(std::memory_order_acquire))
52 9 *ec_out = capy::error::canceled;
53
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4962 times.
4962 else if (errn != 0)
54 *ec_out = make_err(errn);
55 else
56 4962 *ec_out = {};
57
58 // Set up the peer socket on success
59
4/6
✓ Branch 0 taken 4962 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 4962 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4962 times.
✗ Branch 5 not taken.
4971 if (success && accepted_fd >= 0 && acceptor_impl_)
60 {
61 4962 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
62 4962 ->service().socket_service();
63
1/2
✓ Branch 0 taken 4962 times.
✗ Branch 1 not taken.
4962 if (socket_svc)
64 {
65
1/1
✓ Branch 1 taken 4962 times.
4962 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
66 4962 impl.set_socket(accepted_fd);
67
68 4962 impl.desc_state_.fd = accepted_fd;
69 {
70
1/1
✓ Branch 1 taken 4962 times.
4962 std::lock_guard lock(impl.desc_state_.mutex);
71 4962 impl.desc_state_.read_op = nullptr;
72 4962 impl.desc_state_.write_op = nullptr;
73 4962 impl.desc_state_.connect_op = nullptr;
74 4962 }
75
1/1
✓ Branch 2 taken 4962 times.
4962 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
76
77 4962 impl.set_endpoints(
78 4962 static_cast<epoll_acceptor_impl*>(acceptor_impl_)->local_endpoint(),
79 4962 from_sockaddr_in(peer_addr));
80
81
1/2
✓ Branch 0 taken 4962 times.
✗ Branch 1 not taken.
4962 if (impl_out)
82 4962 *impl_out = &impl;
83 4962 accepted_fd = -1;
84 }
85 else
86 {
87 // No socket service — treat as error
88 *ec_out = make_err(ENOENT);
89 success = false;
90 }
91 }
92
93
3/4
✓ Branch 0 taken 4962 times.
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4962 times.
4971 if (!success || !acceptor_impl_)
94 {
95
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (accepted_fd >= 0)
96 {
97 ::close(accepted_fd);
98 accepted_fd = -1;
99 }
100
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (impl_out)
101 9 *impl_out = nullptr;
102 }
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 4971 capy::executor_ref saved_ex( std::move( ex ) );
106 4971 std::coroutine_handle<> saved_h( std::move( h ) );
107 4971 auto prevent_premature_destruction = std::move(impl_ptr);
108
2/2
✓ Branch 1 taken 4971 times.
✓ Branch 4 taken 4971 times.
4971 dispatch_coro(saved_ex, saved_h).resume();
109 4971 }
110
111 64 epoll_acceptor_impl::
112 64 epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
113 64 : svc_(svc)
114 {
115 64 }
116
117 void
118 64 epoll_acceptor_impl::
119 release()
120 {
121 64 close_socket();
122 64 svc_.destroy_acceptor_impl(*this);
123 64 }
124
125 std::coroutine_handle<>
126 4971 epoll_acceptor_impl::
127 accept(
128 std::coroutine_handle<> h,
129 capy::executor_ref ex,
130 std::stop_token token,
131 std::error_code* ec,
132 io_object::io_object_impl** impl_out)
133 {
134 4971 auto& op = acc_;
135 4971 op.reset();
136 4971 op.h = h;
137 4971 op.ex = ex;
138 4971 op.ec_out = ec;
139 4971 op.impl_out = impl_out;
140 4971 op.fd = fd_;
141 4971 op.start(token, this);
142
143 4971 sockaddr_in addr{};
144 4971 socklen_t addrlen = sizeof(addr);
145 int accepted;
146 do {
147
1/1
✓ Branch 1 taken 4971 times.
4971 accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
148 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
149
3/4
✓ Branch 0 taken 4969 times.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4969 times.
4971 } while (accepted < 0 && errno == EINTR);
150
151
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 4969 times.
4971 if (accepted >= 0)
152 {
153 {
154
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(desc_state_.mutex);
155 2 desc_state_.read_ready = false;
156 2 }
157
158
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
2 if (svc_.scheduler().try_consume_inline_budget())
159 {
160 auto* socket_svc = svc_.socket_service();
161 if (socket_svc)
162 {
163 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
164 impl.set_socket(accepted);
165
166 impl.desc_state_.fd = accepted;
167 {
168 std::lock_guard lock(impl.desc_state_.mutex);
169 impl.desc_state_.read_op = nullptr;
170 impl.desc_state_.write_op = nullptr;
171 impl.desc_state_.connect_op = nullptr;
172 }
173 socket_svc->scheduler().register_descriptor(accepted, &impl.desc_state_);
174
175 impl.set_endpoints(local_endpoint_, from_sockaddr_in(addr));
176
177 *ec = {};
178 if (impl_out)
179 *impl_out = &impl;
180 }
181 else
182 {
183 ::close(accepted);
184 *ec = make_err(ENOENT);
185 if (impl_out)
186 *impl_out = nullptr;
187 }
188 return ex.dispatch(h);
189 }
190
191 2 op.accepted_fd = accepted;
192 2 op.peer_addr = addr;
193 2 op.complete(0, 0);
194
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
195
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
196 2 return std::noop_coroutine();
197 }
198
199
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 4969 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
4969 if (errno == EAGAIN || errno == EWOULDBLOCK)
200 {
201
1/1
✓ Branch 1 taken 4969 times.
4969 op.impl_ptr = shared_from_this();
202 4969 svc_.work_started();
203
204
1/1
✓ Branch 1 taken 4969 times.
4969 std::lock_guard lock(desc_state_.mutex);
205 4969 bool io_done = false;
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4969 times.
4969 if (desc_state_.read_ready)
207 {
208 desc_state_.read_ready = false;
209 op.perform_io();
210 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
211 if (!io_done)
212 op.errn = 0;
213 }
214
215
3/6
✓ Branch 0 taken 4969 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 4969 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 4969 times.
4969 if (io_done || op.cancelled.load(std::memory_order_acquire))
216 {
217 svc_.post(&op);
218 svc_.work_finished();
219 }
220 else
221 {
222 4969 desc_state_.read_op = &op;
223 }
224 4969 return std::noop_coroutine();
225 4969 }
226
227 op.complete(errno, 0);
228 op.impl_ptr = shared_from_this();
229 svc_.post(&op);
230 // completion is always posted to scheduler queue, never inline.
231 return std::noop_coroutine();
232 }
233
234 void
235 129 epoll_acceptor_impl::
236 cancel() noexcept
237 {
238 129 cancel_single_op(acc_);
239 129 }
240
241 void
242 135 epoll_acceptor_impl::
243 cancel_single_op(epoll_op& op) noexcept
244 {
245 135 op.request_cancel();
246
247 135 epoll_op* claimed = nullptr;
248 {
249 135 std::lock_guard lock(desc_state_.mutex);
250
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 126 times.
135 if (desc_state_.read_op == &op)
251 9 claimed = std::exchange(desc_state_.read_op, nullptr);
252 135 }
253
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 126 times.
135 if (claimed)
254 {
255 try {
256
1/1
✓ Branch 1 taken 9 times.
9 op.impl_ptr = shared_from_this();
257 } catch (const std::bad_weak_ptr&) {}
258 9 svc_.post(&op);
259 9 svc_.work_finished();
260 }
261 135 }
262
263 void
264 128 epoll_acceptor_impl::
265 close_socket() noexcept
266 {
267 128 cancel();
268
269
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 128 times.
128 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
270 {
271 try {
272 desc_state_.impl_ref_ = shared_from_this();
273 } catch (std::bad_weak_ptr const&) {}
274 }
275
276
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 66 times.
128 if (fd_ >= 0)
277 {
278
1/2
✓ Branch 0 taken 62 times.
✗ Branch 1 not taken.
62 if (desc_state_.registered_events != 0)
279 62 svc_.scheduler().deregister_descriptor(fd_);
280 62 ::close(fd_);
281 62 fd_ = -1;
282 }
283
284 128 desc_state_.fd = -1;
285 {
286 128 std::lock_guard lock(desc_state_.mutex);
287 128 desc_state_.read_op = nullptr;
288 128 desc_state_.read_ready = false;
289 128 desc_state_.write_ready = false;
290 128 }
291 128 desc_state_.registered_events = 0;
292
293 // Clear cached endpoint
294 128 local_endpoint_ = endpoint{};
295 128 }
296
297 203 epoll_acceptor_service::
298 203 epoll_acceptor_service(capy::execution_context& ctx)
299 203 : ctx_(ctx)
300
2/2
✓ Branch 2 taken 203 times.
✓ Branch 5 taken 203 times.
203 , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
301 {
302 203 }
303
304 406 epoll_acceptor_service::
305 203 ~epoll_acceptor_service()
306 {
307 406 }
308
309 void
310 203 epoll_acceptor_service::
311 shutdown()
312 {
313
1/1
✓ Branch 2 taken 203 times.
203 std::lock_guard lock(state_->mutex_);
314
315
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 203 times.
203 while (auto* impl = state_->acceptor_list_.pop_front())
316 impl->close_socket();
317
318 // Don't clear acceptor_ptrs_ here — same rationale as
319 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
320 // after scheduler shutdown has drained all queued ops.
321 203 }
322
323 tcp_acceptor::acceptor_impl&
324 64 epoll_acceptor_service::
325 create_acceptor_impl()
326 {
327
1/1
✓ Branch 1 taken 64 times.
64 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
328 64 auto* raw = impl.get();
329
330
1/1
✓ Branch 2 taken 64 times.
64 std::lock_guard lock(state_->mutex_);
331 64 state_->acceptor_list_.push_back(raw);
332
1/1
✓ Branch 3 taken 64 times.
64 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
333
334 64 return *raw;
335 64 }
336
337 void
338 64 epoll_acceptor_service::
339 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
340 {
341 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
342
1/1
✓ Branch 2 taken 64 times.
64 std::lock_guard lock(state_->mutex_);
343 64 state_->acceptor_list_.remove(epoll_impl);
344
1/1
✓ Branch 2 taken 64 times.
64 state_->acceptor_ptrs_.erase(epoll_impl);
345 64 }
346
347 std::error_code
348 64 epoll_acceptor_service::
349 open_acceptor(
350 tcp_acceptor::acceptor_impl& impl,
351 endpoint ep,
352 int backlog)
353 {
354 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
355 64 epoll_impl->close_socket();
356
357 64 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
358
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
64 if (fd < 0)
359 return make_err(errno);
360
361 64 int reuse = 1;
362 64 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
363
364 64 sockaddr_in addr = detail::to_sockaddr_in(ep);
365
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 62 times.
64 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
366 {
367 2 int errn = errno;
368
1/1
✓ Branch 1 taken 2 times.
2 ::close(fd);
369 2 return make_err(errn);
370 }
371
372
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 if (::listen(fd, backlog) < 0)
373 {
374 int errn = errno;
375 ::close(fd);
376 return make_err(errn);
377 }
378
379 62 epoll_impl->fd_ = fd;
380
381 // Register fd with epoll (edge-triggered mode)
382 62 epoll_impl->desc_state_.fd = fd;
383 {
384
1/1
✓ Branch 1 taken 62 times.
62 std::lock_guard lock(epoll_impl->desc_state_.mutex);
385 62 epoll_impl->desc_state_.read_op = nullptr;
386 62 }
387
1/1
✓ Branch 2 taken 62 times.
62 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
388
389 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
390 62 sockaddr_in local_addr{};
391 62 socklen_t local_len = sizeof(local_addr);
392
1/2
✓ Branch 1 taken 62 times.
✗ Branch 2 not taken.
62 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
393 62 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
394
395 62 return {};
396 }
397
398 void
399 11 epoll_acceptor_service::
400 post(epoll_op* op)
401 {
402 11 state_->sched_.post(op);
403 11 }
404
405 void
406 4969 epoll_acceptor_service::
407 work_started() noexcept
408 {
409 4969 state_->sched_.work_started();
410 4969 }
411
412 void
413 9 epoll_acceptor_service::
414 work_finished() noexcept
415 {
416 9 state_->sched_.work_finished();
417 9 }
418
419 epoll_socket_service*
420 4962 epoll_acceptor_service::
421 socket_service() const noexcept
422 {
423 4962 auto* svc = ctx_.find_service<detail::socket_service>();
424
2/4
✓ Branch 0 taken 4962 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4962 times.
✗ Branch 3 not taken.
4962 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
425 }
426
427 } // namespace boost::corosio::detail
428
429 #endif // BOOST_COROSIO_HAS_EPOLL
430