Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_IO_STREAM_HPP
11 : #define BOOST_COROSIO_IO_STREAM_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/io_object.hpp>
15 : #include <boost/capy/io_result.hpp>
16 : #include <boost/corosio/io_buffer_param.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/io_env.hpp>
19 : #include <system_error>
20 :
21 : #include <coroutine>
22 : #include <cstddef>
23 : #include <stop_token>
24 :
25 : namespace boost::corosio {
26 :
27 : /** Platform stream with read/write operations.
28 :
29 : This base class provides the fundamental async read and write
30 : operations for kernel-level stream I/O. Derived classes wrap
31 : OS-specific stream implementations (sockets, pipes, etc.) and
32 : satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
33 :
34 : @par Semantics
35 : Concrete classes wrap direct platform I/O completed by the kernel.
36 : Functions taking `io_stream&` signal "platform implementation
37 : required" - use this when you need actual kernel I/O rather than
38 : a mock or test double.
39 :
40 : For generic stream algorithms that work with test mocks,
41 : use `template<capy::Stream S>` instead of `io_stream&`.
42 :
43 : @par Thread Safety
44 : Distinct objects: Safe.
45 : Shared objects: Unsafe. All calls to a single stream must be made
46 : from the same implicit or explicit serialization context.
47 :
48 : @par Example
49 : @code
50 : // Read until buffer full or EOF
51 : capy::task<> read_all( io_stream& stream, std::span<char> buf )
52 : {
53 : std::size_t total = 0;
54 : while( total < buf.size() )
55 : {
56 : auto [ec, n] = co_await stream.read_some(
57 : capy::buffer( buf.data() + total, buf.size() - total ) );
58 : if( ec == capy::cond::eof )
59 : break;
60 : if( ec.failed() )
61 : capy::detail::throw_system_error( ec );
62 : total += n;
63 : }
64 : }
65 : @endcode
66 :
67 : @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
68 : */
69 : class BOOST_COROSIO_DECL io_stream : public io_object
70 : {
71 : public:
72 : /** Asynchronously read data from the stream.
73 :
74 : This operation suspends the calling coroutine and initiates a
75 : kernel-level read. The coroutine resumes when the operation
76 : completes.
77 :
78 : @li The operation completes when:
79 : @li At least one byte has been read into the buffer sequence
80 : @li The peer closes the connection (EOF)
81 : @li An error occurs
82 : @li The operation is cancelled via stop token or `cancel()`
83 :
84 : @par Concurrency
85 : At most one write operation may be in flight concurrently with
86 : this read. No other read operations may be in flight until this
87 : operation completes. Note that concurrent in-flight operations
88 : does not imply the initiating calls may be made concurrently;
89 : all calls must be serialized.
90 :
91 : @par Cancellation
92 : Supports cancellation via `std::stop_token` propagated through
93 : the IoAwaitable protocol, or via the I/O object's `cancel()`
94 : member. When cancelled, the operation completes with an error
95 : that compares equal to `capy::cond::canceled`.
96 :
97 : @par Preconditions
98 : The stream must be open and connected.
99 :
100 : @param buffers The buffer sequence to read data into. The caller
101 : retains ownership and must ensure validity until the
102 : operation completes.
103 :
104 : @return An awaitable yielding `(error_code, std::size_t)`.
105 : On success, `bytes_transferred` contains the number of bytes
106 : read. Compare error codes to conditions, not specific values:
107 : @li `capy::cond::eof` - Peer closed connection (TCP FIN)
108 : @li `capy::cond::canceled` - Operation was cancelled
109 :
110 : @par Example
111 : @code
112 : // Simple read with error handling
113 : auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
114 : if( ec == capy::cond::eof )
115 : co_return; // Connection closed gracefully
116 : if( ec.failed() )
117 : capy::detail::throw_system_error( ec );
118 : process( buf, n );
119 : @endcode
120 :
121 : @note This operation may read fewer bytes than the buffer
122 : capacity. Use a loop or `capy::async_read` to read an
123 : exact amount.
124 :
125 : @see write_some, capy::async_read
126 : */
127 : template<capy::MutableBufferSequence MB>
128 190483 : auto read_some(MB const& buffers)
129 : {
130 190483 : return read_some_awaitable<MB>(*this, buffers);
131 : }
132 :
133 : /** Asynchronously write data to the stream.
134 :
135 : This operation suspends the calling coroutine and initiates a
136 : kernel-level write. The coroutine resumes when the operation
137 : completes.
138 :
139 : @li The operation completes when:
140 : @li At least one byte has been written from the buffer sequence
141 : @li An error occurs (including connection reset by peer)
142 : @li The operation is cancelled via stop token or `cancel()`
143 :
144 : @par Concurrency
145 : At most one read operation may be in flight concurrently with
146 : this write. No other write operations may be in flight until
147 : this operation completes. Note that concurrent in-flight
148 : operations does not imply the initiating calls may be made
149 : concurrently; all calls must be serialized.
150 :
151 : @par Cancellation
152 : Supports cancellation via `std::stop_token` propagated through
153 : the IoAwaitable protocol, or via the I/O object's `cancel()`
154 : member. When cancelled, the operation completes with an error
155 : that compares equal to `capy::cond::canceled`.
156 :
157 : @par Preconditions
158 : The stream must be open and connected.
159 :
160 : @param buffers The buffer sequence containing data to write.
161 : The caller retains ownership and must ensure validity
162 : until the operation completes.
163 :
164 : @return An awaitable yielding `(error_code, std::size_t)`.
165 : On success, `bytes_transferred` contains the number of bytes
166 : written. Compare error codes to conditions, not specific
167 : values:
168 : @li `capy::cond::canceled` - Operation was cancelled
169 : @li `std::errc::broken_pipe` - Peer closed connection
170 :
171 : @par Example
172 : @code
173 : // Write all data
174 : std::string_view data = "Hello, World!";
175 : std::size_t written = 0;
176 : while( written < data.size() )
177 : {
178 : auto [ec, n] = co_await stream.write_some(
179 : capy::buffer( data.data() + written,
180 : data.size() - written ) );
181 : if( ec.failed() )
182 : capy::detail::throw_system_error( ec );
183 : written += n;
184 : }
185 : @endcode
186 :
187 : @note This operation may write fewer bytes than the buffer
188 : contains. Use a loop or `capy::async_write` to write
189 : all data.
190 :
191 : @see read_some, capy::async_write
192 : */
193 : template<capy::ConstBufferSequence CB>
194 190167 : auto write_some(CB const& buffers)
195 : {
196 190167 : return write_some_awaitable<CB>(*this, buffers);
197 : }
198 :
199 : protected:
200 : /// Awaitable for async read operations.
201 : template<class MutableBufferSequence>
202 : struct read_some_awaitable
203 : {
204 : io_stream& ios_;
205 : MutableBufferSequence buffers_;
206 : std::stop_token token_;
207 : mutable std::error_code ec_;
208 : mutable std::size_t bytes_transferred_ = 0;
209 :
210 190483 : read_some_awaitable(
211 : io_stream& ios,
212 : MutableBufferSequence buffers) noexcept
213 190483 : : ios_(ios)
214 190483 : , buffers_(std::move(buffers))
215 : {
216 190483 : }
217 :
218 190483 : bool await_ready() const noexcept
219 : {
220 190483 : return token_.stop_requested();
221 : }
222 :
223 190483 : capy::io_result<std::size_t> await_resume() const noexcept
224 : {
225 190483 : if (token_.stop_requested())
226 196 : return {make_error_code(std::errc::operation_canceled), 0};
227 190287 : return {ec_, bytes_transferred_};
228 : }
229 :
230 190483 : auto await_suspend(
231 : std::coroutine_handle<> h,
232 : capy::io_env const* env) -> std::coroutine_handle<>
233 : {
234 190483 : token_ = env->stop_token;
235 190483 : return ios_.get().read_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
236 : }
237 : };
238 :
239 : /// Awaitable for async write operations.
240 : template<class ConstBufferSequence>
241 : struct write_some_awaitable
242 : {
243 : io_stream& ios_;
244 : ConstBufferSequence buffers_;
245 : std::stop_token token_;
246 : mutable std::error_code ec_;
247 : mutable std::size_t bytes_transferred_ = 0;
248 :
249 190167 : write_some_awaitable(
250 : io_stream& ios,
251 : ConstBufferSequence buffers) noexcept
252 190167 : : ios_(ios)
253 190167 : , buffers_(std::move(buffers))
254 : {
255 190167 : }
256 :
257 190167 : bool await_ready() const noexcept
258 : {
259 190167 : return token_.stop_requested();
260 : }
261 :
262 190167 : capy::io_result<std::size_t> await_resume() const noexcept
263 : {
264 190167 : if (token_.stop_requested())
265 0 : return {make_error_code(std::errc::operation_canceled), 0};
266 190167 : return {ec_, bytes_transferred_};
267 : }
268 :
269 190167 : auto await_suspend(
270 : std::coroutine_handle<> h,
271 : capy::io_env const* env) -> std::coroutine_handle<>
272 : {
273 190167 : token_ = env->stop_token;
274 190167 : return ios_.get().write_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
275 : }
276 : };
277 :
278 : public:
279 : /** Platform-specific stream implementation interface.
280 :
281 : Derived classes implement this interface to provide kernel-level
282 : read and write operations for each supported platform (IOCP,
283 : epoll, kqueue, io_uring).
284 : */
285 : struct io_stream_impl : io_object_impl
286 : {
287 : /// Initiate platform read operation.
288 : virtual std::coroutine_handle<> read_some(
289 : std::coroutine_handle<>,
290 : capy::executor_ref,
291 : io_buffer_param,
292 : std::stop_token,
293 : std::error_code*,
294 : std::size_t*) = 0;
295 :
296 : /// Initiate platform write operation.
297 : virtual std::coroutine_handle<> write_some(
298 : std::coroutine_handle<>,
299 : capy::executor_ref,
300 : io_buffer_param,
301 : std::stop_token,
302 : std::error_code*,
303 : std::size_t*) = 0;
304 : };
305 :
306 : protected:
307 : /// Construct stream bound to the given execution context.
308 : explicit
309 17258 : io_stream(
310 : capy::execution_context& ctx) noexcept
311 17258 : : io_object(ctx)
312 : {
313 17258 : }
314 :
315 : private:
316 : /// Return implementation downcasted to stream interface.
317 380650 : io_stream_impl& get() const noexcept
318 : {
319 380650 : return *static_cast<io_stream_impl*>(impl_);
320 : }
321 : };
322 :
323 : } // namespace boost::corosio
324 :
325 : #endif
|