1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_PULL_FROM_HPP
10  
#ifndef BOOST_CAPY_IO_PULL_FROM_HPP
11  
#define BOOST_CAPY_IO_PULL_FROM_HPP
11  
#define BOOST_CAPY_IO_PULL_FROM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/cond.hpp>
15  
#include <boost/capy/cond.hpp>
16  
#include <boost/capy/concept/buffer_sink.hpp>
16  
#include <boost/capy/concept/buffer_sink.hpp>
17  
#include <boost/capy/concept/read_source.hpp>
17  
#include <boost/capy/concept/read_source.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
19  
#include <boost/capy/io_task.hpp>
19  
#include <boost/capy/io_task.hpp>
20  

20  

21  
#include <cstddef>
21  
#include <cstddef>
22  
#include <span>
22  
#include <span>
23  

23  

24  
namespace boost {
24  
namespace boost {
25  
namespace capy {
25  
namespace capy {
26  

26  

27  
/** Transfer data from a ReadSource to a BufferSink.
27  
/** Transfer data from a ReadSource to a BufferSink.
28  

28  

29  
    This function reads data from the source directly into the sink's
29  
    This function reads data from the source directly into the sink's
30  
    internal buffers using the callee-owns-buffers model. The sink
30  
    internal buffers using the callee-owns-buffers model. The sink
31  
    provides writable buffers via `prepare()`, the source reads into
31  
    provides writable buffers via `prepare()`, the source reads into
32  
    them, and the sink commits the data. When the source signals EOF,
32  
    them, and the sink commits the data. When the source signals EOF,
33  
    `commit_eof()` is called on the sink to finalize the transfer.
33  
    `commit_eof()` is called on the sink to finalize the transfer.
34  

34  

35  
    @tparam Src The source type, must satisfy @ref ReadSource.
35  
    @tparam Src The source type, must satisfy @ref ReadSource.
36  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
36  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
37  

37  

38  
    @param source The source to read data from.
38  
    @param source The source to read data from.
39  
    @param sink The sink to write data to.
39  
    @param sink The sink to write data to.
40  

40  

41  
    @return A task that yields `(std::error_code, std::size_t)`.
41  
    @return A task that yields `(std::error_code, std::size_t)`.
42  
        On success, `ec` is default-constructed (no error) and `n` is
42  
        On success, `ec` is default-constructed (no error) and `n` is
43  
        the total number of bytes transferred. On error, `ec` contains
43  
        the total number of bytes transferred. On error, `ec` contains
44  
        the error code and `n` is the total number of bytes transferred
44  
        the error code and `n` is the total number of bytes transferred
45  
        before the error.
45  
        before the error.
46  

46  

47  
    @par Example
47  
    @par Example
48  
    @code
48  
    @code
49  
    task<void> transfer_body(ReadSource auto& source, BufferSink auto& sink)
49  
    task<void> transfer_body(ReadSource auto& source, BufferSink auto& sink)
50  
    {
50  
    {
51  
        auto [ec, n] = co_await pull_from(source, sink);
51  
        auto [ec, n] = co_await pull_from(source, sink);
52  
        if (ec)
52  
        if (ec)
53  
        {
53  
        {
54  
            // Handle error
54  
            // Handle error
55  
        }
55  
        }
56  
        // n bytes were transferred
56  
        // n bytes were transferred
57  
    }
57  
    }
58  
    @endcode
58  
    @endcode
59  

59  

60  
    @see ReadSource, BufferSink, push_to
60  
    @see ReadSource, BufferSink, push_to
61  
*/
61  
*/
62  
template<ReadSource Src, BufferSink Sink>
62  
template<ReadSource Src, BufferSink Sink>
63  
io_task<std::size_t>
63  
io_task<std::size_t>
64  
pull_from(Src& source, Sink& sink)
64  
pull_from(Src& source, Sink& sink)
65  
{
65  
{
66  
    mutable_buffer dst_arr[detail::max_iovec_];
66  
    mutable_buffer dst_arr[detail::max_iovec_];
67  
    std::size_t total = 0;
67  
    std::size_t total = 0;
68  

68  

69  
    for(;;)
69  
    for(;;)
70  
    {
70  
    {
71  
        auto dst_bufs = sink.prepare(dst_arr);
71  
        auto dst_bufs = sink.prepare(dst_arr);
72  
        if(dst_bufs.empty())
72  
        if(dst_bufs.empty())
73  
        {
73  
        {
74  
            // No buffer space available; commit nothing to flush
74  
            // No buffer space available; commit nothing to flush
75  
            auto [flush_ec] = co_await sink.commit(0);
75  
            auto [flush_ec] = co_await sink.commit(0);
76  
            if(flush_ec)
76  
            if(flush_ec)
77  
                co_return {flush_ec, total};
77  
                co_return {flush_ec, total};
78  
            continue;
78  
            continue;
79  
        }
79  
        }
80  

80  

81  
        auto [ec, n] = co_await source.read(
81  
        auto [ec, n] = co_await source.read(
82  
            std::span<mutable_buffer const>(dst_bufs));
82  
            std::span<mutable_buffer const>(dst_bufs));
83  

83  

84  
        auto [commit_ec] = co_await sink.commit(n);
84  
        auto [commit_ec] = co_await sink.commit(n);
85  
        total += n;
85  
        total += n;
86  

86  

87  
        if(commit_ec)
87  
        if(commit_ec)
88  
            co_return {commit_ec, total};
88  
            co_return {commit_ec, total};
89  

89  

90  
        if(ec == cond::eof)
90  
        if(ec == cond::eof)
91  
        {
91  
        {
92  
            auto [eof_ec] = co_await sink.commit_eof(0);
92  
            auto [eof_ec] = co_await sink.commit_eof(0);
93  
            co_return {eof_ec, total};
93  
            co_return {eof_ec, total};
94  
        }
94  
        }
95  

95  

96  
        if(ec)
96  
        if(ec)
97  
            co_return {ec, total};
97  
            co_return {ec, total};
98  
    }
98  
    }
99  
}
99  
}
100  

100  

101  
/** Transfer data from a ReadStream to a BufferSink.
101  
/** Transfer data from a ReadStream to a BufferSink.
102  

102  

103  
    This function reads data from the stream directly into the sink's
103  
    This function reads data from the stream directly into the sink's
104  
    internal buffers using the callee-owns-buffers model. The sink
104  
    internal buffers using the callee-owns-buffers model. The sink
105  
    provides writable buffers via `prepare()`, the stream reads into
105  
    provides writable buffers via `prepare()`, the stream reads into
106  
    them using `read_some()`, and the sink commits the data. When the
106  
    them using `read_some()`, and the sink commits the data. When the
107  
    stream signals EOF, `commit_eof()` is called on the sink to
107  
    stream signals EOF, `commit_eof()` is called on the sink to
108  
    finalize the transfer.
108  
    finalize the transfer.
109  

109  

110  
    This overload handles partial reads from the stream, committing
110  
    This overload handles partial reads from the stream, committing
111  
    data incrementally as it arrives. It loops until EOF is encountered
111  
    data incrementally as it arrives. It loops until EOF is encountered
112  
    or an error occurs.
112  
    or an error occurs.
113  

113  

114  
    @tparam Src The source type, must satisfy @ref ReadStream.
114  
    @tparam Src The source type, must satisfy @ref ReadStream.
115  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
115  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
116  

116  

117  
    @param source The stream to read data from.
117  
    @param source The stream to read data from.
118  
    @param sink The sink to write data to.
118  
    @param sink The sink to write data to.
119  

119  

120  
    @return A task that yields `(std::error_code, std::size_t)`.
120  
    @return A task that yields `(std::error_code, std::size_t)`.
121  
        On success, `ec` is default-constructed (no error) and `n` is
121  
        On success, `ec` is default-constructed (no error) and `n` is
122  
        the total number of bytes transferred. On error, `ec` contains
122  
        the total number of bytes transferred. On error, `ec` contains
123  
        the error code and `n` is the total number of bytes transferred
123  
        the error code and `n` is the total number of bytes transferred
124  
        before the error.
124  
        before the error.
125  

125  

126  
    @par Example
126  
    @par Example
127  
    @code
127  
    @code
128  
    task<void> transfer_body(ReadStream auto& stream, BufferSink auto& sink)
128  
    task<void> transfer_body(ReadStream auto& stream, BufferSink auto& sink)
129  
    {
129  
    {
130  
        auto [ec, n] = co_await pull_from(stream, sink);
130  
        auto [ec, n] = co_await pull_from(stream, sink);
131  
        if (ec)
131  
        if (ec)
132  
        {
132  
        {
133  
            // Handle error
133  
            // Handle error
134  
        }
134  
        }
135  
        // n bytes were transferred
135  
        // n bytes were transferred
136  
    }
136  
    }
137  
    @endcode
137  
    @endcode
138  

138  

139  
    @see ReadStream, BufferSink, push_to
139  
    @see ReadStream, BufferSink, push_to
140  
*/
140  
*/
141  
template<ReadStream Src, BufferSink Sink>
141  
template<ReadStream Src, BufferSink Sink>
142  
    requires (!ReadSource<Src>)
142  
    requires (!ReadSource<Src>)
143  
io_task<std::size_t>
143  
io_task<std::size_t>
144  
pull_from(Src& source, Sink& sink)
144  
pull_from(Src& source, Sink& sink)
145  
{
145  
{
146  
    mutable_buffer dst_arr[detail::max_iovec_];
146  
    mutable_buffer dst_arr[detail::max_iovec_];
147  
    std::size_t total = 0;
147  
    std::size_t total = 0;
148  

148  

149  
    for(;;)
149  
    for(;;)
150  
    {
150  
    {
151  
        // Prepare destination buffers from the sink
151  
        // Prepare destination buffers from the sink
152  
        auto dst_bufs = sink.prepare(dst_arr);
152  
        auto dst_bufs = sink.prepare(dst_arr);
153  
        if(dst_bufs.empty())
153  
        if(dst_bufs.empty())
154  
        {
154  
        {
155  
            // No buffer space available; commit nothing to flush
155  
            // No buffer space available; commit nothing to flush
156  
            auto [flush_ec] = co_await sink.commit(0);
156  
            auto [flush_ec] = co_await sink.commit(0);
157  
            if(flush_ec)
157  
            if(flush_ec)
158  
                co_return {flush_ec, total};
158  
                co_return {flush_ec, total};
159  
            continue;
159  
            continue;
160  
        }
160  
        }
161  

161  

162  
        // Read data from the stream into the sink's buffers
162  
        // Read data from the stream into the sink's buffers
163  
        auto [ec, n] = co_await source.read_some(
163  
        auto [ec, n] = co_await source.read_some(
164  
            std::span<mutable_buffer const>(dst_bufs));
164  
            std::span<mutable_buffer const>(dst_bufs));
165  

165  

166  
        auto [commit_ec] = co_await sink.commit(n);
166  
        auto [commit_ec] = co_await sink.commit(n);
167  
        total += n;
167  
        total += n;
168  

168  

169  
        if(commit_ec)
169  
        if(commit_ec)
170  
            co_return {commit_ec, total};
170  
            co_return {commit_ec, total};
171  

171  

172  
        if(ec == cond::eof)
172  
        if(ec == cond::eof)
173  
        {
173  
        {
174  
            auto [eof_ec] = co_await sink.commit_eof(0);
174  
            auto [eof_ec] = co_await sink.commit_eof(0);
175  
            co_return {eof_ec, total};
175  
            co_return {eof_ec, total};
176  
        }
176  
        }
177  

177  

178  
        // Check for other errors
178  
        // Check for other errors
179  
        if(ec)
179  
        if(ec)
180  
            co_return {ec, total};
180  
            co_return {ec, total};
181  
    }
181  
    }
182  
}
182  
}
183  

183  

184  
} // namespace capy
184  
} // namespace capy
185  
} // namespace boost
185  
} // namespace boost
186  

186  

187  
#endif
187  
#endif