1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/write_sink.hpp>
19  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <coroutine>
20  
#include <coroutine>
21  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
22  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_task.hpp>
23  
#include <boost/capy/io_task.hpp>
24  

24  

25  
#include <concepts>
25  
#include <concepts>
26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <cstddef>
27  
#include <cstddef>
28  
#include <exception>
28  
#include <exception>
29  
#include <new>
29  
#include <new>
30  
#include <span>
30  
#include <span>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <system_error>
32  
#include <system_error>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
namespace boost {
35  
namespace boost {
36  
namespace capy {
36  
namespace capy {
37  

37  

38  
/** Type-erased wrapper for any WriteSink.
38  
/** Type-erased wrapper for any WriteSink.
39  

39  

40  
    This class provides type erasure for any type satisfying the
40  
    This class provides type erasure for any type satisfying the
41  
    @ref WriteSink concept, enabling runtime polymorphism for
41  
    @ref WriteSink concept, enabling runtime polymorphism for
42  
    sink write operations. It uses cached awaitable storage to achieve
42  
    sink write operations. It uses cached awaitable storage to achieve
43  
    zero steady-state allocation after construction.
43  
    zero steady-state allocation after construction.
44  

44  

45  
    The wrapper supports two construction modes:
45  
    The wrapper supports two construction modes:
46  
    - **Owning**: Pass by value to transfer ownership. The wrapper
46  
    - **Owning**: Pass by value to transfer ownership. The wrapper
47  
      allocates storage and owns the sink.
47  
      allocates storage and owns the sink.
48  
    - **Reference**: Pass a pointer to wrap without ownership. The
48  
    - **Reference**: Pass a pointer to wrap without ownership. The
49  
      pointed-to sink must outlive this wrapper.
49  
      pointed-to sink must outlive this wrapper.
50  

50  

51  
    @par Awaitable Preallocation
51  
    @par Awaitable Preallocation
52  
    The constructor preallocates storage for the type-erased awaitable.
52  
    The constructor preallocates storage for the type-erased awaitable.
53  
    This reserves all virtual address space at server startup
53  
    This reserves all virtual address space at server startup
54  
    so memory usage can be measured up front, rather than
54  
    so memory usage can be measured up front, rather than
55  
    allocating piecemeal as traffic arrives.
55  
    allocating piecemeal as traffic arrives.
56  

56  

57  
    @par Immediate Completion
57  
    @par Immediate Completion
58  
    Operations complete immediately without suspending when the
58  
    Operations complete immediately without suspending when the
59  
    buffer sequence is empty, or when the underlying sink's
59  
    buffer sequence is empty, or when the underlying sink's
60  
    awaitable reports readiness via `await_ready`.
60  
    awaitable reports readiness via `await_ready`.
61  

61  

62  
    @par Thread Safety
62  
    @par Thread Safety
63  
    Not thread-safe. Concurrent operations on the same wrapper
63  
    Not thread-safe. Concurrent operations on the same wrapper
64  
    are undefined behavior.
64  
    are undefined behavior.
65  

65  

66  
    @par Example
66  
    @par Example
67  
    @code
67  
    @code
68  
    // Owning - takes ownership of the sink
68  
    // Owning - takes ownership of the sink
69  
    any_write_sink ws(some_sink{args...});
69  
    any_write_sink ws(some_sink{args...});
70  

70  

71  
    // Reference - wraps without ownership
71  
    // Reference - wraps without ownership
72  
    some_sink sink;
72  
    some_sink sink;
73  
    any_write_sink ws(&sink);
73  
    any_write_sink ws(&sink);
74  

74  

75  
    const_buffer buf(data, size);
75  
    const_buffer buf(data, size);
76  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
76  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77  
    auto [ec2] = co_await ws.write_eof();
77  
    auto [ec2] = co_await ws.write_eof();
78  
    @endcode
78  
    @endcode
79  

79  

80  
    @see any_write_stream, WriteSink
80  
    @see any_write_stream, WriteSink
81  
*/
81  
*/
82  
class any_write_sink
82  
class any_write_sink
83  
{
83  
{
84  
    struct vtable;
84  
    struct vtable;
85  
    struct write_awaitable_ops;
85  
    struct write_awaitable_ops;
86  
    struct eof_awaitable_ops;
86  
    struct eof_awaitable_ops;
87  

87  

88  
    template<WriteSink S>
88  
    template<WriteSink S>
89  
    struct vtable_for_impl;
89  
    struct vtable_for_impl;
90  

90  

91  
    void* sink_ = nullptr;
91  
    void* sink_ = nullptr;
92  
    vtable const* vt_ = nullptr;
92  
    vtable const* vt_ = nullptr;
93  
    void* cached_awaitable_ = nullptr;
93  
    void* cached_awaitable_ = nullptr;
94  
    void* storage_ = nullptr;
94  
    void* storage_ = nullptr;
95  
    write_awaitable_ops const* active_write_ops_ = nullptr;
95  
    write_awaitable_ops const* active_write_ops_ = nullptr;
96  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
96  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
97  

97  

98  
public:
98  
public:
99  
    /** Destructor.
99  
    /** Destructor.
100  

100  

101  
        Destroys the owned sink (if any) and releases the cached
101  
        Destroys the owned sink (if any) and releases the cached
102  
        awaitable storage.
102  
        awaitable storage.
103  
    */
103  
    */
104  
    ~any_write_sink();
104  
    ~any_write_sink();
105  

105  

106  
    /** Construct a default instance.
106  
    /** Construct a default instance.
107  

107  

108  
        Constructs an empty wrapper. Operations on a default-constructed
108  
        Constructs an empty wrapper. Operations on a default-constructed
109  
        wrapper result in undefined behavior.
109  
        wrapper result in undefined behavior.
110  
    */
110  
    */
111  
    any_write_sink() = default;
111  
    any_write_sink() = default;
112  

112  

113  
    /** Non-copyable.
113  
    /** Non-copyable.
114  

114  

115  
        The awaitable cache is per-instance and cannot be shared.
115  
        The awaitable cache is per-instance and cannot be shared.
116  
    */
116  
    */
117  
    any_write_sink(any_write_sink const&) = delete;
117  
    any_write_sink(any_write_sink const&) = delete;
118  
    any_write_sink& operator=(any_write_sink const&) = delete;
118  
    any_write_sink& operator=(any_write_sink const&) = delete;
119  

119  

120  
    /** Construct by moving.
120  
    /** Construct by moving.
121  

121  

122  
        Transfers ownership of the wrapped sink (if owned) and
122  
        Transfers ownership of the wrapped sink (if owned) and
123  
        cached awaitable storage from `other`. After the move, `other` is
123  
        cached awaitable storage from `other`. After the move, `other` is
124  
        in a default-constructed state.
124  
        in a default-constructed state.
125  

125  

126  
        @param other The wrapper to move from.
126  
        @param other The wrapper to move from.
127  
    */
127  
    */
128  
    any_write_sink(any_write_sink&& other) noexcept
128  
    any_write_sink(any_write_sink&& other) noexcept
129  
        : sink_(std::exchange(other.sink_, nullptr))
129  
        : sink_(std::exchange(other.sink_, nullptr))
130  
        , vt_(std::exchange(other.vt_, nullptr))
130  
        , vt_(std::exchange(other.vt_, nullptr))
131  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
131  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132  
        , storage_(std::exchange(other.storage_, nullptr))
132  
        , storage_(std::exchange(other.storage_, nullptr))
133  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
133  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
134  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135  
    {
135  
    {
136  
    }
136  
    }
137  

137  

138  
    /** Assign by moving.
138  
    /** Assign by moving.
139  

139  

140  
        Destroys any owned sink and releases existing resources,
140  
        Destroys any owned sink and releases existing resources,
141  
        then transfers ownership from `other`.
141  
        then transfers ownership from `other`.
142  

142  

143  
        @param other The wrapper to move from.
143  
        @param other The wrapper to move from.
144  
        @return Reference to this wrapper.
144  
        @return Reference to this wrapper.
145  
    */
145  
    */
146  
    any_write_sink&
146  
    any_write_sink&
147  
    operator=(any_write_sink&& other) noexcept;
147  
    operator=(any_write_sink&& other) noexcept;
148  

148  

149  
    /** Construct by taking ownership of a WriteSink.
149  
    /** Construct by taking ownership of a WriteSink.
150  

150  

151  
        Allocates storage and moves the sink into this wrapper.
151  
        Allocates storage and moves the sink into this wrapper.
152  
        The wrapper owns the sink and will destroy it.
152  
        The wrapper owns the sink and will destroy it.
153  

153  

154  
        @param s The sink to take ownership of.
154  
        @param s The sink to take ownership of.
155  
    */
155  
    */
156  
    template<WriteSink S>
156  
    template<WriteSink S>
157  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
157  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158  
    any_write_sink(S s);
158  
    any_write_sink(S s);
159  

159  

160  
    /** Construct by wrapping a WriteSink without ownership.
160  
    /** Construct by wrapping a WriteSink without ownership.
161  

161  

162  
        Wraps the given sink by pointer. The sink must remain
162  
        Wraps the given sink by pointer. The sink must remain
163  
        valid for the lifetime of this wrapper.
163  
        valid for the lifetime of this wrapper.
164  

164  

165  
        @param s Pointer to the sink to wrap.
165  
        @param s Pointer to the sink to wrap.
166  
    */
166  
    */
167  
    template<WriteSink S>
167  
    template<WriteSink S>
168  
    any_write_sink(S* s);
168  
    any_write_sink(S* s);
169  

169  

170  
    /** Check if the wrapper contains a valid sink.
170  
    /** Check if the wrapper contains a valid sink.
171  

171  

172  
        @return `true` if wrapping a sink, `false` if default-constructed
172  
        @return `true` if wrapping a sink, `false` if default-constructed
173  
            or moved-from.
173  
            or moved-from.
174  
    */
174  
    */
175  
    bool
175  
    bool
176  
    has_value() const noexcept
176  
    has_value() const noexcept
177  
    {
177  
    {
178  
        return sink_ != nullptr;
178  
        return sink_ != nullptr;
179  
    }
179  
    }
180  

180  

181  
    /** Check if the wrapper contains a valid sink.
181  
    /** Check if the wrapper contains a valid sink.
182  

182  

183  
        @return `true` if wrapping a sink, `false` if default-constructed
183  
        @return `true` if wrapping a sink, `false` if default-constructed
184  
            or moved-from.
184  
            or moved-from.
185  
    */
185  
    */
186  
    explicit
186  
    explicit
187  
    operator bool() const noexcept
187  
    operator bool() const noexcept
188  
    {
188  
    {
189  
        return has_value();
189  
        return has_value();
190  
    }
190  
    }
191  

191  

192  
    /** Initiate a partial write operation.
192  
    /** Initiate a partial write operation.
193  

193  

194  
        Attempt to write up to `buffer_size( buffers )` bytes from
194  
        Attempt to write up to `buffer_size( buffers )` bytes from
195  
        the provided buffer sequence. May consume less than the
195  
        the provided buffer sequence. May consume less than the
196  
        full sequence.
196  
        full sequence.
197  

197  

198  
        @param buffers The buffer sequence containing data to write.
198  
        @param buffers The buffer sequence containing data to write.
199  

199  

200  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
200  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
201  

201  

202  
        @par Immediate Completion
202  
        @par Immediate Completion
203  
        The operation completes immediately without suspending
203  
        The operation completes immediately without suspending
204  
        the calling coroutine when:
204  
        the calling coroutine when:
205  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
205  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
206  
        @li The underlying sink's awaitable reports immediate
206  
        @li The underlying sink's awaitable reports immediate
207  
            readiness via `await_ready`.
207  
            readiness via `await_ready`.
208  

208  

209  
        @note This is a partial operation and may not process the
209  
        @note This is a partial operation and may not process the
210  
        entire buffer sequence. Use @ref write for guaranteed
210  
        entire buffer sequence. Use @ref write for guaranteed
211  
        complete transfer.
211  
        complete transfer.
212  

212  

213  
        @par Preconditions
213  
        @par Preconditions
214  
        The wrapper must contain a valid sink (`has_value() == true`).
214  
        The wrapper must contain a valid sink (`has_value() == true`).
215  
    */
215  
    */
216  
    template<ConstBufferSequence CB>
216  
    template<ConstBufferSequence CB>
217  
    auto
217  
    auto
218  
    write_some(CB buffers);
218  
    write_some(CB buffers);
219  

219  

220  
    /** Initiate a complete write operation.
220  
    /** Initiate a complete write operation.
221  

221  

222  
        Writes data from the provided buffer sequence. The operation
222  
        Writes data from the provided buffer sequence. The operation
223  
        completes when all bytes have been consumed, or an error
223  
        completes when all bytes have been consumed, or an error
224  
        occurs. Forwards to the underlying sink's `write` operation,
224  
        occurs. Forwards to the underlying sink's `write` operation,
225  
        windowed through @ref buffer_param when the sequence exceeds
225  
        windowed through @ref buffer_param when the sequence exceeds
226  
        the per-call buffer limit.
226  
        the per-call buffer limit.
227  

227  

228  
        @param buffers The buffer sequence containing data to write.
228  
        @param buffers The buffer sequence containing data to write.
229  

229  

230  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
230  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
231  

231  

232  
        @par Immediate Completion
232  
        @par Immediate Completion
233  
        The operation completes immediately without suspending
233  
        The operation completes immediately without suspending
234  
        the calling coroutine when:
234  
        the calling coroutine when:
235  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
235  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
236  
        @li Every underlying `write` call completes
236  
        @li Every underlying `write` call completes
237  
            immediately (the wrapped sink reports readiness
237  
            immediately (the wrapped sink reports readiness
238  
            via `await_ready` on each iteration).
238  
            via `await_ready` on each iteration).
239  

239  

240  
        @par Preconditions
240  
        @par Preconditions
241  
        The wrapper must contain a valid sink (`has_value() == true`).
241  
        The wrapper must contain a valid sink (`has_value() == true`).
242  
    */
242  
    */
243  
    template<ConstBufferSequence CB>
243  
    template<ConstBufferSequence CB>
244  
    io_task<std::size_t>
244  
    io_task<std::size_t>
245  
    write(CB buffers);
245  
    write(CB buffers);
246  

246  

247  
    /** Atomically write data and signal end-of-stream.
247  
    /** Atomically write data and signal end-of-stream.
248  

248  

249  
        Writes all data from the buffer sequence and then signals
249  
        Writes all data from the buffer sequence and then signals
250  
        end-of-stream. The implementation decides how to partition
250  
        end-of-stream. The implementation decides how to partition
251  
        the data across calls to the underlying sink's @ref write
251  
        the data across calls to the underlying sink's @ref write
252  
        and `write_eof`. When the caller's buffer sequence is
252  
        and `write_eof`. When the caller's buffer sequence is
253  
        non-empty, the final call to the underlying sink is always
253  
        non-empty, the final call to the underlying sink is always
254  
        `write_eof` with a non-empty buffer sequence. When the
254  
        `write_eof` with a non-empty buffer sequence. When the
255  
        caller's buffer sequence is empty, only `write_eof()` with
255  
        caller's buffer sequence is empty, only `write_eof()` with
256  
        no data is called.
256  
        no data is called.
257  

257  

258  
        @param buffers The buffer sequence containing data to write.
258  
        @param buffers The buffer sequence containing data to write.
259  

259  

260  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
260  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
261  

261  

262  
        @par Immediate Completion
262  
        @par Immediate Completion
263  
        The operation completes immediately without suspending
263  
        The operation completes immediately without suspending
264  
        the calling coroutine when:
264  
        the calling coroutine when:
265  
        @li The buffer sequence is empty. Only the @ref write_eof()
265  
        @li The buffer sequence is empty. Only the @ref write_eof()
266  
            call is performed.
266  
            call is performed.
267  
        @li All underlying operations complete immediately (the
267  
        @li All underlying operations complete immediately (the
268  
            wrapped sink reports readiness via `await_ready`).
268  
            wrapped sink reports readiness via `await_ready`).
269  

269  

270  
        @par Preconditions
270  
        @par Preconditions
271  
        The wrapper must contain a valid sink (`has_value() == true`).
271  
        The wrapper must contain a valid sink (`has_value() == true`).
272  
    */
272  
    */
273  
    template<ConstBufferSequence CB>
273  
    template<ConstBufferSequence CB>
274  
    io_task<std::size_t>
274  
    io_task<std::size_t>
275  
    write_eof(CB buffers);
275  
    write_eof(CB buffers);
276  

276  

277  
    /** Signal end of data.
277  
    /** Signal end of data.
278  

278  

279  
        Indicates that no more data will be written to the sink.
279  
        Indicates that no more data will be written to the sink.
280  
        The operation completes when the sink is finalized, or
280  
        The operation completes when the sink is finalized, or
281  
        an error occurs.
281  
        an error occurs.
282  

282  

283  
        @return An awaitable that await-returns `(error_code)`.
283  
        @return An awaitable that await-returns `(error_code)`.
284  

284  

285  
        @par Immediate Completion
285  
        @par Immediate Completion
286  
        The operation completes immediately without suspending
286  
        The operation completes immediately without suspending
287  
        the calling coroutine when the underlying sink's awaitable
287  
        the calling coroutine when the underlying sink's awaitable
288  
        reports immediate readiness via `await_ready`.
288  
        reports immediate readiness via `await_ready`.
289  

289  

290  
        @par Preconditions
290  
        @par Preconditions
291  
        The wrapper must contain a valid sink (`has_value() == true`).
291  
        The wrapper must contain a valid sink (`has_value() == true`).
292  
    */
292  
    */
293  
    auto
293  
    auto
294  
    write_eof();
294  
    write_eof();
295  

295  

296  
protected:
296  
protected:
297  
    /** Rebind to a new sink after move.
297  
    /** Rebind to a new sink after move.
298  

298  

299  
        Updates the internal pointer to reference a new sink object.
299  
        Updates the internal pointer to reference a new sink object.
300  
        Used by owning wrappers after move assignment when the owned
300  
        Used by owning wrappers after move assignment when the owned
301  
        object has moved to a new location.
301  
        object has moved to a new location.
302  

302  

303  
        @param new_sink The new sink to bind to. Must be the same
303  
        @param new_sink The new sink to bind to. Must be the same
304  
            type as the original sink.
304  
            type as the original sink.
305  

305  

306  
        @note Terminates if called with a sink of different type
306  
        @note Terminates if called with a sink of different type
307  
            than the original.
307  
            than the original.
308  
    */
308  
    */
309  
    template<WriteSink S>
309  
    template<WriteSink S>
310  
    void
310  
    void
311  
    rebind(S& new_sink) noexcept
311  
    rebind(S& new_sink) noexcept
312  
    {
312  
    {
313  
        if(vt_ != &vtable_for_impl<S>::value)
313  
        if(vt_ != &vtable_for_impl<S>::value)
314  
            std::terminate();
314  
            std::terminate();
315  
        sink_ = &new_sink;
315  
        sink_ = &new_sink;
316  
    }
316  
    }
317  

317  

318  
private:
318  
private:
319  
    auto
319  
    auto
320  
    write_some_(std::span<const_buffer const> buffers);
320  
    write_some_(std::span<const_buffer const> buffers);
321  

321  

322  
    auto
322  
    auto
323  
    write_(std::span<const_buffer const> buffers);
323  
    write_(std::span<const_buffer const> buffers);
324  

324  

325  
    auto
325  
    auto
326  
    write_eof_buffers_(std::span<const_buffer const> buffers);
326  
    write_eof_buffers_(std::span<const_buffer const> buffers);
327  
};
327  
};
328  

328  

329  
struct any_write_sink::write_awaitable_ops
329  
struct any_write_sink::write_awaitable_ops
330  
{
330  
{
331  
    bool (*await_ready)(void*);
331  
    bool (*await_ready)(void*);
332  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
332  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
333  
    io_result<std::size_t> (*await_resume)(void*);
333  
    io_result<std::size_t> (*await_resume)(void*);
334  
    void (*destroy)(void*) noexcept;
334  
    void (*destroy)(void*) noexcept;
335  
};
335  
};
336  

336  

337  
struct any_write_sink::eof_awaitable_ops
337  
struct any_write_sink::eof_awaitable_ops
338  
{
338  
{
339  
    bool (*await_ready)(void*);
339  
    bool (*await_ready)(void*);
340  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
340  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
341  
    io_result<> (*await_resume)(void*);
341  
    io_result<> (*await_resume)(void*);
342  
    void (*destroy)(void*) noexcept;
342  
    void (*destroy)(void*) noexcept;
343  
};
343  
};
344  

344  

345  
struct any_write_sink::vtable
345  
struct any_write_sink::vtable
346  
{
346  
{
347  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
347  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
348  
        void* sink,
348  
        void* sink,
349  
        void* storage,
349  
        void* storage,
350  
        std::span<const_buffer const> buffers);
350  
        std::span<const_buffer const> buffers);
351  
    write_awaitable_ops const* (*construct_write_awaitable)(
351  
    write_awaitable_ops const* (*construct_write_awaitable)(
352  
        void* sink,
352  
        void* sink,
353  
        void* storage,
353  
        void* storage,
354  
        std::span<const_buffer const> buffers);
354  
        std::span<const_buffer const> buffers);
355  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
355  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
356  
        void* sink,
356  
        void* sink,
357  
        void* storage,
357  
        void* storage,
358  
        std::span<const_buffer const> buffers);
358  
        std::span<const_buffer const> buffers);
359  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
359  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
360  
        void* sink,
360  
        void* sink,
361  
        void* storage);
361  
        void* storage);
362  
    std::size_t awaitable_size;
362  
    std::size_t awaitable_size;
363  
    std::size_t awaitable_align;
363  
    std::size_t awaitable_align;
364  
    void (*destroy)(void*) noexcept;
364  
    void (*destroy)(void*) noexcept;
365  
};
365  
};
366  

366  

367  
template<WriteSink S>
367  
template<WriteSink S>
368  
struct any_write_sink::vtable_for_impl
368  
struct any_write_sink::vtable_for_impl
369  
{
369  
{
370  
    using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
370  
    using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
371  
        std::span<const_buffer const>{}));
371  
        std::span<const_buffer const>{}));
372  
    using WriteAwaitable = decltype(std::declval<S&>().write(
372  
    using WriteAwaitable = decltype(std::declval<S&>().write(
373  
        std::span<const_buffer const>{}));
373  
        std::span<const_buffer const>{}));
374  
    using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
374  
    using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
375  
        std::span<const_buffer const>{}));
375  
        std::span<const_buffer const>{}));
376  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
376  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
377  

377  

378  
    static void
378  
    static void
379  
    do_destroy_impl(void* sink) noexcept
379  
    do_destroy_impl(void* sink) noexcept
380  
    {
380  
    {
381  
        static_cast<S*>(sink)->~S();
381  
        static_cast<S*>(sink)->~S();
382  
    }
382  
    }
383  

383  

384  
    static write_awaitable_ops const*
384  
    static write_awaitable_ops const*
385  
    construct_write_some_awaitable_impl(
385  
    construct_write_some_awaitable_impl(
386  
        void* sink,
386  
        void* sink,
387  
        void* storage,
387  
        void* storage,
388  
        std::span<const_buffer const> buffers)
388  
        std::span<const_buffer const> buffers)
389  
    {
389  
    {
390  
        auto& s = *static_cast<S*>(sink);
390  
        auto& s = *static_cast<S*>(sink);
391  
        ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
391  
        ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
392  

392  

393  
        static constexpr write_awaitable_ops ops = {
393  
        static constexpr write_awaitable_ops ops = {
394  
            +[](void* p) {
394  
            +[](void* p) {
395  
                return static_cast<WriteSomeAwaitable*>(p)->await_ready();
395  
                return static_cast<WriteSomeAwaitable*>(p)->await_ready();
396  
            },
396  
            },
397  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
397  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
398  
                return detail::call_await_suspend(
398  
                return detail::call_await_suspend(
399  
                    static_cast<WriteSomeAwaitable*>(p), h, env);
399  
                    static_cast<WriteSomeAwaitable*>(p), h, env);
400  
            },
400  
            },
401  
            +[](void* p) {
401  
            +[](void* p) {
402  
                return static_cast<WriteSomeAwaitable*>(p)->await_resume();
402  
                return static_cast<WriteSomeAwaitable*>(p)->await_resume();
403  
            },
403  
            },
404  
            +[](void* p) noexcept {
404  
            +[](void* p) noexcept {
405  
                static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
405  
                static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
406  
            }
406  
            }
407  
        };
407  
        };
408  
        return &ops;
408  
        return &ops;
409  
    }
409  
    }
410  

410  

411  
    static write_awaitable_ops const*
411  
    static write_awaitable_ops const*
412  
    construct_write_awaitable_impl(
412  
    construct_write_awaitable_impl(
413  
        void* sink,
413  
        void* sink,
414  
        void* storage,
414  
        void* storage,
415  
        std::span<const_buffer const> buffers)
415  
        std::span<const_buffer const> buffers)
416  
    {
416  
    {
417  
        auto& s = *static_cast<S*>(sink);
417  
        auto& s = *static_cast<S*>(sink);
418  
        ::new(storage) WriteAwaitable(s.write(buffers));
418  
        ::new(storage) WriteAwaitable(s.write(buffers));
419  

419  

420  
        static constexpr write_awaitable_ops ops = {
420  
        static constexpr write_awaitable_ops ops = {
421  
            +[](void* p) {
421  
            +[](void* p) {
422  
                return static_cast<WriteAwaitable*>(p)->await_ready();
422  
                return static_cast<WriteAwaitable*>(p)->await_ready();
423  
            },
423  
            },
424  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
424  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
425  
                return detail::call_await_suspend(
425  
                return detail::call_await_suspend(
426  
                    static_cast<WriteAwaitable*>(p), h, env);
426  
                    static_cast<WriteAwaitable*>(p), h, env);
427  
            },
427  
            },
428  
            +[](void* p) {
428  
            +[](void* p) {
429  
                return static_cast<WriteAwaitable*>(p)->await_resume();
429  
                return static_cast<WriteAwaitable*>(p)->await_resume();
430  
            },
430  
            },
431  
            +[](void* p) noexcept {
431  
            +[](void* p) noexcept {
432  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
432  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
433  
            }
433  
            }
434  
        };
434  
        };
435  
        return &ops;
435  
        return &ops;
436  
    }
436  
    }
437  

437  

438  
    static write_awaitable_ops const*
438  
    static write_awaitable_ops const*
439  
    construct_write_eof_buffers_awaitable_impl(
439  
    construct_write_eof_buffers_awaitable_impl(
440  
        void* sink,
440  
        void* sink,
441  
        void* storage,
441  
        void* storage,
442  
        std::span<const_buffer const> buffers)
442  
        std::span<const_buffer const> buffers)
443  
    {
443  
    {
444  
        auto& s = *static_cast<S*>(sink);
444  
        auto& s = *static_cast<S*>(sink);
445  
        ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
445  
        ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
446  

446  

447  
        static constexpr write_awaitable_ops ops = {
447  
        static constexpr write_awaitable_ops ops = {
448  
            +[](void* p) {
448  
            +[](void* p) {
449  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
449  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
450  
            },
450  
            },
451  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
451  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
452  
                return detail::call_await_suspend(
452  
                return detail::call_await_suspend(
453  
                    static_cast<WriteEofBuffersAwaitable*>(p), h, env);
453  
                    static_cast<WriteEofBuffersAwaitable*>(p), h, env);
454  
            },
454  
            },
455  
            +[](void* p) {
455  
            +[](void* p) {
456  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
456  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
457  
            },
457  
            },
458  
            +[](void* p) noexcept {
458  
            +[](void* p) noexcept {
459  
                static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
459  
                static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
460  
            }
460  
            }
461  
        };
461  
        };
462  
        return &ops;
462  
        return &ops;
463  
    }
463  
    }
464  

464  

465  
    static eof_awaitable_ops const*
465  
    static eof_awaitable_ops const*
466  
    construct_eof_awaitable_impl(
466  
    construct_eof_awaitable_impl(
467  
        void* sink,
467  
        void* sink,
468  
        void* storage)
468  
        void* storage)
469  
    {
469  
    {
470  
        auto& s = *static_cast<S*>(sink);
470  
        auto& s = *static_cast<S*>(sink);
471  
        ::new(storage) EofAwaitable(s.write_eof());
471  
        ::new(storage) EofAwaitable(s.write_eof());
472  

472  

473  
        static constexpr eof_awaitable_ops ops = {
473  
        static constexpr eof_awaitable_ops ops = {
474  
            +[](void* p) {
474  
            +[](void* p) {
475  
                return static_cast<EofAwaitable*>(p)->await_ready();
475  
                return static_cast<EofAwaitable*>(p)->await_ready();
476  
            },
476  
            },
477  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
477  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
478  
                return detail::call_await_suspend(
478  
                return detail::call_await_suspend(
479  
                    static_cast<EofAwaitable*>(p), h, env);
479  
                    static_cast<EofAwaitable*>(p), h, env);
480  
            },
480  
            },
481  
            +[](void* p) {
481  
            +[](void* p) {
482  
                return static_cast<EofAwaitable*>(p)->await_resume();
482  
                return static_cast<EofAwaitable*>(p)->await_resume();
483  
            },
483  
            },
484  
            +[](void* p) noexcept {
484  
            +[](void* p) noexcept {
485  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
485  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
486  
            }
486  
            }
487  
        };
487  
        };
488  
        return &ops;
488  
        return &ops;
489  
    }
489  
    }
490  

490  

491  
    static constexpr std::size_t max4(
491  
    static constexpr std::size_t max4(
492  
        std::size_t a, std::size_t b,
492  
        std::size_t a, std::size_t b,
493  
        std::size_t c, std::size_t d) noexcept
493  
        std::size_t c, std::size_t d) noexcept
494  
    {
494  
    {
495  
        std::size_t ab = a > b ? a : b;
495  
        std::size_t ab = a > b ? a : b;
496  
        std::size_t cd = c > d ? c : d;
496  
        std::size_t cd = c > d ? c : d;
497  
        return ab > cd ? ab : cd;
497  
        return ab > cd ? ab : cd;
498  
    }
498  
    }
499  

499  

500  
    static constexpr std::size_t max_awaitable_size =
500  
    static constexpr std::size_t max_awaitable_size =
501  
        max4(sizeof(WriteSomeAwaitable),
501  
        max4(sizeof(WriteSomeAwaitable),
502  
             sizeof(WriteAwaitable),
502  
             sizeof(WriteAwaitable),
503  
             sizeof(WriteEofBuffersAwaitable),
503  
             sizeof(WriteEofBuffersAwaitable),
504  
             sizeof(EofAwaitable));
504  
             sizeof(EofAwaitable));
505  

505  

506  
    static constexpr std::size_t max_awaitable_align =
506  
    static constexpr std::size_t max_awaitable_align =
507  
        max4(alignof(WriteSomeAwaitable),
507  
        max4(alignof(WriteSomeAwaitable),
508  
             alignof(WriteAwaitable),
508  
             alignof(WriteAwaitable),
509  
             alignof(WriteEofBuffersAwaitable),
509  
             alignof(WriteEofBuffersAwaitable),
510  
             alignof(EofAwaitable));
510  
             alignof(EofAwaitable));
511  

511  

512  
    static constexpr vtable value = {
512  
    static constexpr vtable value = {
513  
        &construct_write_some_awaitable_impl,
513  
        &construct_write_some_awaitable_impl,
514  
        &construct_write_awaitable_impl,
514  
        &construct_write_awaitable_impl,
515  
        &construct_write_eof_buffers_awaitable_impl,
515  
        &construct_write_eof_buffers_awaitable_impl,
516  
        &construct_eof_awaitable_impl,
516  
        &construct_eof_awaitable_impl,
517  
        max_awaitable_size,
517  
        max_awaitable_size,
518  
        max_awaitable_align,
518  
        max_awaitable_align,
519  
        &do_destroy_impl
519  
        &do_destroy_impl
520  
    };
520  
    };
521  
};
521  
};
522  

522  

523  
inline
523  
inline
524  
any_write_sink::~any_write_sink()
524  
any_write_sink::~any_write_sink()
525  
{
525  
{
526  
    if(storage_)
526  
    if(storage_)
527  
    {
527  
    {
528  
        vt_->destroy(sink_);
528  
        vt_->destroy(sink_);
529  
        ::operator delete(storage_);
529  
        ::operator delete(storage_);
530  
    }
530  
    }
531  
    if(cached_awaitable_)
531  
    if(cached_awaitable_)
532  
    {
532  
    {
533  
        if(active_write_ops_)
533  
        if(active_write_ops_)
534  
            active_write_ops_->destroy(cached_awaitable_);
534  
            active_write_ops_->destroy(cached_awaitable_);
535  
        else if(active_eof_ops_)
535  
        else if(active_eof_ops_)
536  
            active_eof_ops_->destroy(cached_awaitable_);
536  
            active_eof_ops_->destroy(cached_awaitable_);
537  
        ::operator delete(cached_awaitable_);
537  
        ::operator delete(cached_awaitable_);
538  
    }
538  
    }
539  
}
539  
}
540  

540  

541  
inline any_write_sink&
541  
inline any_write_sink&
542  
any_write_sink::operator=(any_write_sink&& other) noexcept
542  
any_write_sink::operator=(any_write_sink&& other) noexcept
543  
{
543  
{
544  
    if(this != &other)
544  
    if(this != &other)
545  
    {
545  
    {
546  
        if(storage_)
546  
        if(storage_)
547  
        {
547  
        {
548  
            vt_->destroy(sink_);
548  
            vt_->destroy(sink_);
549  
            ::operator delete(storage_);
549  
            ::operator delete(storage_);
550  
        }
550  
        }
551  
        if(cached_awaitable_)
551  
        if(cached_awaitable_)
552  
        {
552  
        {
553  
            if(active_write_ops_)
553  
            if(active_write_ops_)
554  
                active_write_ops_->destroy(cached_awaitable_);
554  
                active_write_ops_->destroy(cached_awaitable_);
555  
            else if(active_eof_ops_)
555  
            else if(active_eof_ops_)
556  
                active_eof_ops_->destroy(cached_awaitable_);
556  
                active_eof_ops_->destroy(cached_awaitable_);
557  
            ::operator delete(cached_awaitable_);
557  
            ::operator delete(cached_awaitable_);
558  
        }
558  
        }
559  
        sink_ = std::exchange(other.sink_, nullptr);
559  
        sink_ = std::exchange(other.sink_, nullptr);
560  
        vt_ = std::exchange(other.vt_, nullptr);
560  
        vt_ = std::exchange(other.vt_, nullptr);
561  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
561  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
562  
        storage_ = std::exchange(other.storage_, nullptr);
562  
        storage_ = std::exchange(other.storage_, nullptr);
563  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
563  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
564  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
564  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
565  
    }
565  
    }
566  
    return *this;
566  
    return *this;
567  
}
567  
}
568  

568  

569  
template<WriteSink S>
569  
template<WriteSink S>
570  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
570  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
571  
any_write_sink::any_write_sink(S s)
571  
any_write_sink::any_write_sink(S s)
572  
    : vt_(&vtable_for_impl<S>::value)
572  
    : vt_(&vtable_for_impl<S>::value)
573  
{
573  
{
574  
    struct guard {
574  
    struct guard {
575  
        any_write_sink* self;
575  
        any_write_sink* self;
576  
        bool committed = false;
576  
        bool committed = false;
577  
        ~guard() {
577  
        ~guard() {
578  
            if(!committed && self->storage_) {
578  
            if(!committed && self->storage_) {
579  
                self->vt_->destroy(self->sink_);
579  
                self->vt_->destroy(self->sink_);
580  
                ::operator delete(self->storage_);
580  
                ::operator delete(self->storage_);
581  
                self->storage_ = nullptr;
581  
                self->storage_ = nullptr;
582  
                self->sink_ = nullptr;
582  
                self->sink_ = nullptr;
583  
            }
583  
            }
584  
        }
584  
        }
585  
    } g{this};
585  
    } g{this};
586  

586  

587  
    storage_ = ::operator new(sizeof(S));
587  
    storage_ = ::operator new(sizeof(S));
588  
    sink_ = ::new(storage_) S(std::move(s));
588  
    sink_ = ::new(storage_) S(std::move(s));
589  

589  

590  
    // Preallocate the awaitable storage (sized for max of write/eof)
590  
    // Preallocate the awaitable storage (sized for max of write/eof)
591  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
591  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
592  

592  

593  
    g.committed = true;
593  
    g.committed = true;
594  
}
594  
}
595  

595  

596  
template<WriteSink S>
596  
template<WriteSink S>
597  
any_write_sink::any_write_sink(S* s)
597  
any_write_sink::any_write_sink(S* s)
598  
    : sink_(s)
598  
    : sink_(s)
599  
    , vt_(&vtable_for_impl<S>::value)
599  
    , vt_(&vtable_for_impl<S>::value)
600  
{
600  
{
601  
    // Preallocate the awaitable storage (sized for max of write/eof)
601  
    // Preallocate the awaitable storage (sized for max of write/eof)
602  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
602  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
603  
}
603  
}
604  

604  

605  
inline auto
605  
inline auto
606  
any_write_sink::write_some_(
606  
any_write_sink::write_some_(
607  
    std::span<const_buffer const> buffers)
607  
    std::span<const_buffer const> buffers)
608  
{
608  
{
609  
    struct awaitable
609  
    struct awaitable
610  
    {
610  
    {
611  
        any_write_sink* self_;
611  
        any_write_sink* self_;
612  
        std::span<const_buffer const> buffers_;
612  
        std::span<const_buffer const> buffers_;
613  

613  

614  
        bool
614  
        bool
615  
        await_ready() const noexcept
615  
        await_ready() const noexcept
616  
        {
616  
        {
617  
            return false;
617  
            return false;
618  
        }
618  
        }
619  

619  

620  
        std::coroutine_handle<>
620  
        std::coroutine_handle<>
621  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
621  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
622  
        {
622  
        {
623  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
623  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
624  
                self_->sink_,
624  
                self_->sink_,
625  
                self_->cached_awaitable_,
625  
                self_->cached_awaitable_,
626  
                buffers_);
626  
                buffers_);
627  

627  

628  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
628  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
629  
                return h;
629  
                return h;
630  

630  

631  
            return self_->active_write_ops_->await_suspend(
631  
            return self_->active_write_ops_->await_suspend(
632  
                self_->cached_awaitable_, h, env);
632  
                self_->cached_awaitable_, h, env);
633  
        }
633  
        }
634  

634  

635  
        io_result<std::size_t>
635  
        io_result<std::size_t>
636  
        await_resume()
636  
        await_resume()
637  
        {
637  
        {
638  
            struct guard {
638  
            struct guard {
639  
                any_write_sink* self;
639  
                any_write_sink* self;
640  
                ~guard() {
640  
                ~guard() {
641  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
641  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
642  
                    self->active_write_ops_ = nullptr;
642  
                    self->active_write_ops_ = nullptr;
643  
                }
643  
                }
644  
            } g{self_};
644  
            } g{self_};
645  
            return self_->active_write_ops_->await_resume(
645  
            return self_->active_write_ops_->await_resume(
646  
                self_->cached_awaitable_);
646  
                self_->cached_awaitable_);
647  
        }
647  
        }
648  
    };
648  
    };
649  
    return awaitable{this, buffers};
649  
    return awaitable{this, buffers};
650  
}
650  
}
651  

651  

652  
inline auto
652  
inline auto
653  
any_write_sink::write_(
653  
any_write_sink::write_(
654  
    std::span<const_buffer const> buffers)
654  
    std::span<const_buffer const> buffers)
655  
{
655  
{
656  
    struct awaitable
656  
    struct awaitable
657  
    {
657  
    {
658  
        any_write_sink* self_;
658  
        any_write_sink* self_;
659  
        std::span<const_buffer const> buffers_;
659  
        std::span<const_buffer const> buffers_;
660  

660  

661  
        bool
661  
        bool
662  
        await_ready() const noexcept
662  
        await_ready() const noexcept
663  
        {
663  
        {
664  
            return false;
664  
            return false;
665  
        }
665  
        }
666  

666  

667  
        std::coroutine_handle<>
667  
        std::coroutine_handle<>
668  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
668  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
669  
        {
669  
        {
670  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
670  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
671  
                self_->sink_,
671  
                self_->sink_,
672  
                self_->cached_awaitable_,
672  
                self_->cached_awaitable_,
673  
                buffers_);
673  
                buffers_);
674  

674  

675  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
675  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
676  
                return h;
676  
                return h;
677  

677  

678  
            return self_->active_write_ops_->await_suspend(
678  
            return self_->active_write_ops_->await_suspend(
679  
                self_->cached_awaitable_, h, env);
679  
                self_->cached_awaitable_, h, env);
680  
        }
680  
        }
681  

681  

682  
        io_result<std::size_t>
682  
        io_result<std::size_t>
683  
        await_resume()
683  
        await_resume()
684  
        {
684  
        {
685  
            struct guard {
685  
            struct guard {
686  
                any_write_sink* self;
686  
                any_write_sink* self;
687  
                ~guard() {
687  
                ~guard() {
688  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
688  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
689  
                    self->active_write_ops_ = nullptr;
689  
                    self->active_write_ops_ = nullptr;
690  
                }
690  
                }
691  
            } g{self_};
691  
            } g{self_};
692  
            return self_->active_write_ops_->await_resume(
692  
            return self_->active_write_ops_->await_resume(
693  
                self_->cached_awaitable_);
693  
                self_->cached_awaitable_);
694  
        }
694  
        }
695  
    };
695  
    };
696  
    return awaitable{this, buffers};
696  
    return awaitable{this, buffers};
697  
}
697  
}
698  

698  

699  
inline auto
699  
inline auto
700  
any_write_sink::write_eof()
700  
any_write_sink::write_eof()
701  
{
701  
{
702  
    struct awaitable
702  
    struct awaitable
703  
    {
703  
    {
704  
        any_write_sink* self_;
704  
        any_write_sink* self_;
705  

705  

706  
        bool
706  
        bool
707  
        await_ready() const noexcept
707  
        await_ready() const noexcept
708  
        {
708  
        {
709  
            return false;
709  
            return false;
710  
        }
710  
        }
711  

711  

712  
        std::coroutine_handle<>
712  
        std::coroutine_handle<>
713  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
713  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
714  
        {
714  
        {
715  
            // Construct the underlying awaitable into cached storage
715  
            // Construct the underlying awaitable into cached storage
716  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
716  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
717  
                self_->sink_,
717  
                self_->sink_,
718  
                self_->cached_awaitable_);
718  
                self_->cached_awaitable_);
719  

719  

720  
            // Check if underlying is immediately ready
720  
            // Check if underlying is immediately ready
721  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
721  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
722  
                return h;
722  
                return h;
723  

723  

724  
            // Forward to underlying awaitable
724  
            // Forward to underlying awaitable
725  
            return self_->active_eof_ops_->await_suspend(
725  
            return self_->active_eof_ops_->await_suspend(
726  
                self_->cached_awaitable_, h, env);
726  
                self_->cached_awaitable_, h, env);
727  
        }
727  
        }
728  

728  

729  
        io_result<>
729  
        io_result<>
730  
        await_resume()
730  
        await_resume()
731  
        {
731  
        {
732  
            struct guard {
732  
            struct guard {
733  
                any_write_sink* self;
733  
                any_write_sink* self;
734  
                ~guard() {
734  
                ~guard() {
735  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
735  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
736  
                    self->active_eof_ops_ = nullptr;
736  
                    self->active_eof_ops_ = nullptr;
737  
                }
737  
                }
738  
            } g{self_};
738  
            } g{self_};
739  
            return self_->active_eof_ops_->await_resume(
739  
            return self_->active_eof_ops_->await_resume(
740  
                self_->cached_awaitable_);
740  
                self_->cached_awaitable_);
741  
        }
741  
        }
742  
    };
742  
    };
743  
    return awaitable{this};
743  
    return awaitable{this};
744  
}
744  
}
745  

745  

746  
inline auto
746  
inline auto
747  
any_write_sink::write_eof_buffers_(
747  
any_write_sink::write_eof_buffers_(
748  
    std::span<const_buffer const> buffers)
748  
    std::span<const_buffer const> buffers)
749  
{
749  
{
750  
    struct awaitable
750  
    struct awaitable
751  
    {
751  
    {
752  
        any_write_sink* self_;
752  
        any_write_sink* self_;
753  
        std::span<const_buffer const> buffers_;
753  
        std::span<const_buffer const> buffers_;
754  

754  

755  
        bool
755  
        bool
756  
        await_ready() const noexcept
756  
        await_ready() const noexcept
757  
        {
757  
        {
758  
            return false;
758  
            return false;
759  
        }
759  
        }
760  

760  

761  
        std::coroutine_handle<>
761  
        std::coroutine_handle<>
762  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
762  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
763  
        {
763  
        {
764  
            self_->active_write_ops_ =
764  
            self_->active_write_ops_ =
765  
                self_->vt_->construct_write_eof_buffers_awaitable(
765  
                self_->vt_->construct_write_eof_buffers_awaitable(
766  
                    self_->sink_,
766  
                    self_->sink_,
767  
                    self_->cached_awaitable_,
767  
                    self_->cached_awaitable_,
768  
                    buffers_);
768  
                    buffers_);
769  

769  

770  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
770  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
771  
                return h;
771  
                return h;
772  

772  

773  
            return self_->active_write_ops_->await_suspend(
773  
            return self_->active_write_ops_->await_suspend(
774  
                self_->cached_awaitable_, h, env);
774  
                self_->cached_awaitable_, h, env);
775  
        }
775  
        }
776  

776  

777  
        io_result<std::size_t>
777  
        io_result<std::size_t>
778  
        await_resume()
778  
        await_resume()
779  
        {
779  
        {
780  
            struct guard {
780  
            struct guard {
781  
                any_write_sink* self;
781  
                any_write_sink* self;
782  
                ~guard() {
782  
                ~guard() {
783  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
783  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
784  
                    self->active_write_ops_ = nullptr;
784  
                    self->active_write_ops_ = nullptr;
785  
                }
785  
                }
786  
            } g{self_};
786  
            } g{self_};
787  
            return self_->active_write_ops_->await_resume(
787  
            return self_->active_write_ops_->await_resume(
788  
                self_->cached_awaitable_);
788  
                self_->cached_awaitable_);
789  
        }
789  
        }
790  
    };
790  
    };
791  
    return awaitable{this, buffers};
791  
    return awaitable{this, buffers};
792  
}
792  
}
793  

793  

794  
template<ConstBufferSequence CB>
794  
template<ConstBufferSequence CB>
795  
auto
795  
auto
796  
any_write_sink::write_some(CB buffers)
796  
any_write_sink::write_some(CB buffers)
797  
{
797  
{
798  
    struct awaitable
798  
    struct awaitable
799  
    {
799  
    {
800  
        any_write_sink* self_;
800  
        any_write_sink* self_;
801  
        const_buffer_array<detail::max_iovec_> ba_;
801  
        const_buffer_array<detail::max_iovec_> ba_;
802  

802  

803  
        awaitable(
803  
        awaitable(
804  
            any_write_sink* self,
804  
            any_write_sink* self,
805  
            CB const& buffers)
805  
            CB const& buffers)
806  
            : self_(self)
806  
            : self_(self)
807  
            , ba_(buffers)
807  
            , ba_(buffers)
808  
        {
808  
        {
809  
        }
809  
        }
810  

810  

811  
        bool
811  
        bool
812  
        await_ready() const noexcept
812  
        await_ready() const noexcept
813  
        {
813  
        {
814  
            return ba_.to_span().empty();
814  
            return ba_.to_span().empty();
815  
        }
815  
        }
816  

816  

817  
        std::coroutine_handle<>
817  
        std::coroutine_handle<>
818  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
818  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
819  
        {
819  
        {
820  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
820  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
821  
                self_->sink_,
821  
                self_->sink_,
822  
                self_->cached_awaitable_,
822  
                self_->cached_awaitable_,
823  
                ba_.to_span());
823  
                ba_.to_span());
824  

824  

825  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
825  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
826  
                return h;
826  
                return h;
827  

827  

828  
            return self_->active_write_ops_->await_suspend(
828  
            return self_->active_write_ops_->await_suspend(
829  
                self_->cached_awaitable_, h, env);
829  
                self_->cached_awaitable_, h, env);
830  
        }
830  
        }
831  

831  

832  
        io_result<std::size_t>
832  
        io_result<std::size_t>
833  
        await_resume()
833  
        await_resume()
834  
        {
834  
        {
835  
            if(ba_.to_span().empty())
835  
            if(ba_.to_span().empty())
836  
                return {{}, 0};
836  
                return {{}, 0};
837  

837  

838  
            struct guard {
838  
            struct guard {
839  
                any_write_sink* self;
839  
                any_write_sink* self;
840  
                ~guard() {
840  
                ~guard() {
841  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
841  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
842  
                    self->active_write_ops_ = nullptr;
842  
                    self->active_write_ops_ = nullptr;
843  
                }
843  
                }
844  
            } g{self_};
844  
            } g{self_};
845  
            return self_->active_write_ops_->await_resume(
845  
            return self_->active_write_ops_->await_resume(
846  
                self_->cached_awaitable_);
846  
                self_->cached_awaitable_);
847  
        }
847  
        }
848  
    };
848  
    };
849  
    return awaitable{this, buffers};
849  
    return awaitable{this, buffers};
850  
}
850  
}
851  

851  

852  
template<ConstBufferSequence CB>
852  
template<ConstBufferSequence CB>
853  
io_task<std::size_t>
853  
io_task<std::size_t>
854  
any_write_sink::write(CB buffers)
854  
any_write_sink::write(CB buffers)
855  
{
855  
{
856  
    buffer_param<CB> bp(buffers);
856  
    buffer_param<CB> bp(buffers);
857  
    std::size_t total = 0;
857  
    std::size_t total = 0;
858  

858  

859  
    for(;;)
859  
    for(;;)
860  
    {
860  
    {
861  
        auto bufs = bp.data();
861  
        auto bufs = bp.data();
862  
        if(bufs.empty())
862  
        if(bufs.empty())
863  
            break;
863  
            break;
864  

864  

865  
        auto [ec, n] = co_await write_(bufs);
865  
        auto [ec, n] = co_await write_(bufs);
866  
        total += n;
866  
        total += n;
867  
        if(ec)
867  
        if(ec)
868  
            co_return {ec, total};
868  
            co_return {ec, total};
869  
        bp.consume(n);
869  
        bp.consume(n);
870  
    }
870  
    }
871  

871  

872  
    co_return {{}, total};
872  
    co_return {{}, total};
873  
}
873  
}
874  

874  

875  
template<ConstBufferSequence CB>
875  
template<ConstBufferSequence CB>
876  
io_task<std::size_t>
876  
io_task<std::size_t>
877  
any_write_sink::write_eof(CB buffers)
877  
any_write_sink::write_eof(CB buffers)
878  
{
878  
{
879  
    const_buffer_param<CB> bp(buffers);
879  
    const_buffer_param<CB> bp(buffers);
880  
    std::size_t total = 0;
880  
    std::size_t total = 0;
881  

881  

882  
    for(;;)
882  
    for(;;)
883  
    {
883  
    {
884  
        auto bufs = bp.data();
884  
        auto bufs = bp.data();
885  
        if(bufs.empty())
885  
        if(bufs.empty())
886  
        {
886  
        {
887  
            auto [ec] = co_await write_eof();
887  
            auto [ec] = co_await write_eof();
888  
            co_return {ec, total};
888  
            co_return {ec, total};
889  
        }
889  
        }
890  

890  

891  
        if(! bp.more())
891  
        if(! bp.more())
892  
        {
892  
        {
893  
            // Last window — send atomically with EOF
893  
            // Last window — send atomically with EOF
894  
            auto [ec, n] = co_await write_eof_buffers_(bufs);
894  
            auto [ec, n] = co_await write_eof_buffers_(bufs);
895  
            total += n;
895  
            total += n;
896  
            co_return {ec, total};
896  
            co_return {ec, total};
897  
        }
897  
        }
898  

898  

899  
        auto [ec, n] = co_await write_(bufs);
899  
        auto [ec, n] = co_await write_(bufs);
900  
        total += n;
900  
        total += n;
901  
        if(ec)
901  
        if(ec)
902  
            co_return {ec, total};
902  
            co_return {ec, total};
903  
        bp.consume(n);
903  
        bp.consume(n);
904  
    }
904  
    }
905  
}
905  
}
906  

906  

907  
} // namespace capy
907  
} // namespace capy
908  
} // namespace boost
908  
} // namespace boost
909  

909  

910  
#endif
910  
#endif