1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/void_to_monostate.hpp>
14  
#include <boost/capy/detail/void_to_monostate.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <coroutine>
17  
#include <coroutine>
18  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
20  
#include <boost/capy/task.hpp>
20  
#include <boost/capy/task.hpp>
21  

21  

22  
#include <array>
22  
#include <array>
23  
#include <atomic>
23  
#include <atomic>
24  
#include <exception>
24  
#include <exception>
25 -
#include <ranges>
 
26 -
#include <stdexcept>
 
27  
#include <optional>
25  
#include <optional>
28  
#include <stop_token>
26  
#include <stop_token>
29  
#include <tuple>
27  
#include <tuple>
30  
#include <type_traits>
28  
#include <type_traits>
31 -
#include <vector>
 
32  
#include <utility>
29  
#include <utility>
33  

30  

34  
namespace boost {
31  
namespace boost {
35  
namespace capy {
32  
namespace capy {
36  

33  

37  
namespace detail {
34  
namespace detail {
38  

35  

39  
/** Holds the result of a single task within when_all.
36  
/** Holds the result of a single task within when_all.
40  
*/
37  
*/
41  
template<typename T>
38  
template<typename T>
42  
struct result_holder
39  
struct result_holder
43  
{
40  
{
44  
    std::optional<T> value_;
41  
    std::optional<T> value_;
45  

42  

46  
    void set(T v)
43  
    void set(T v)
47  
    {
44  
    {
48  
        value_ = std::move(v);
45  
        value_ = std::move(v);
49  
    }
46  
    }
50  

47  

51  
    T get() &&
48  
    T get() &&
52  
    {
49  
    {
53  
        return std::move(*value_);
50  
        return std::move(*value_);
54  
    }
51  
    }
55  
};
52  
};
56  

53  

57  
/** Specialization for void tasks - returns monostate to preserve index mapping.
54  
/** Specialization for void tasks - returns monostate to preserve index mapping.
58  
*/
55  
*/
59  
template<>
56  
template<>
60  
struct result_holder<void>
57  
struct result_holder<void>
61  
{
58  
{
62  
    std::monostate get() && { return {}; }
59  
    std::monostate get() && { return {}; }
63  
};
60  
};
64  

61  

65 -
/** Core shared state for when_all operations.
62 +
/** Shared state for when_all operation.
66 -

 
67 -
    Contains all members and methods common to both heterogeneous (variadic)
 
68 -
    and homogeneous (range) when_all implementations. State classes embed
 
69 -
    this via composition to avoid CRTP destructor ordering issues.
 
70  

63  

71 -
    @par Thread Safety
64 +
    @tparam Ts The result types of the tasks.
72 -
    Atomic operations protect exception capture and completion count.
 
73  
*/
65  
*/
74 -
struct when_all_core
66 +
template<typename... Ts>
 
67 +
struct when_all_state
75  
{
68  
{
 
69 +
    static constexpr std::size_t task_count = sizeof...(Ts);
 
70 +

 
71 +
    // Completion tracking - when_all waits for all children
76  
    std::atomic<std::size_t> remaining_count_;
72  
    std::atomic<std::size_t> remaining_count_;
77  

73  

 
74 +
    // Result storage in input order
 
75 +
    std::tuple<result_holder<Ts>...> results_;
 
76 +

 
77 +
    // Runner handles - destroyed in await_resume while allocator is valid
 
78 +
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
 
79 +

78  
    // Exception storage - first error wins, others discarded
80  
    // Exception storage - first error wins, others discarded
79  
    std::atomic<bool> has_exception_{false};
81  
    std::atomic<bool> has_exception_{false};
80  
    std::exception_ptr first_exception_;
82  
    std::exception_ptr first_exception_;
81  

83  

 
84 +
    // Stop propagation - on error, request stop for siblings
82  
    std::stop_source stop_source_;
85  
    std::stop_source stop_source_;
83  

86  

84 -
    // Bridges parent's stop token to our stop_source
87 +
    // Connects parent's stop_token to our stop_source
85  
    struct stop_callback_fn
88  
    struct stop_callback_fn
86  
    {
89  
    {
87  
        std::stop_source* source_;
90  
        std::stop_source* source_;
88  
        void operator()() const { source_->request_stop(); }
91  
        void operator()() const { source_->request_stop(); }
89  
    };
92  
    };
90  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
93  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
91  
    std::optional<stop_callback_t> parent_stop_callback_;
94  
    std::optional<stop_callback_t> parent_stop_callback_;
92  

95  

 
96 +
    // Parent resumption
93  
    std::coroutine_handle<> continuation_;
97  
    std::coroutine_handle<> continuation_;
94  
    io_env const* caller_env_ = nullptr;
98  
    io_env const* caller_env_ = nullptr;
95  

99  

96 -
    explicit when_all_core(std::size_t count) noexcept
100 +
    when_all_state()
97 -
        : remaining_count_(count)
101 +
        : remaining_count_(task_count)
98  
    {
102  
    {
99  
    }
103  
    }
100  

104  

101 -
    /** Capture an exception (first one wins). */
105 +
    // Runners self-destruct in final_suspend. No destruction needed here.
 
106 +

 
107 +
    /** Capture an exception (first one wins).
 
108 +
    */
102  
    void capture_exception(std::exception_ptr ep)
109  
    void capture_exception(std::exception_ptr ep)
103  
    {
110  
    {
104  
        bool expected = false;
111  
        bool expected = false;
105  
        if(has_exception_.compare_exchange_strong(
112  
        if(has_exception_.compare_exchange_strong(
106  
            expected, true, std::memory_order_relaxed))
113  
            expected, true, std::memory_order_relaxed))
107  
            first_exception_ = ep;
114  
            first_exception_ = ep;
108 -
};
 
109 -

 
110 -
/** Shared state for heterogeneous when_all (variadic overload).
 
111 -

 
112 -
    @tparam Ts The result types of the tasks.
 
113 -
*/
 
114 -
template<typename... Ts>
 
115 -
struct when_all_state
 
116 -
{
 
117 -
    static constexpr std::size_t task_count = sizeof...(Ts);
 
118 -

 
119 -
    when_all_core core_;
 
120 -
    std::tuple<result_holder<Ts>...> results_;
 
121 -
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
 
122 -

 
123 -
    when_all_state()
 
124 -
        : core_(task_count)
 
125 -
    {
 
126 -
    }
 
127 -
};
 
128 -

 
129 -
/** Shared state for homogeneous when_all (range overload).
 
130 -

 
131 -
    Stores all results in a vector indexed by task position.
 
132 -

 
133 -
    @tparam T The common result type of all tasks.
 
134 -
*/
 
135 -
template<typename T>
 
136 -
struct when_all_homogeneous_state
 
137 -
{
 
138 -
    when_all_core core_;
 
139 -
    std::vector<std::optional<T>> results_;
 
140 -
    std::vector<std::coroutine_handle<>> runner_handles_;
 
141 -

 
142 -
    explicit when_all_homogeneous_state(std::size_t count)
 
143 -
        : core_(count)
 
144 -
        , results_(count)
 
145 -
        , runner_handles_(count)
 
146 -
    {
 
147 -
    }
 
148  
    }
115  
    }
149 -
    void set_result(std::size_t index, T value)
 
150 -
    {
 
151 -
        results_[index].emplace(std::move(value));
 
152 -
    }
 
153  

116  

154  
};
117  
};
155  

118  

156 -
/** Specialization for void tasks (no result storage). */
119 +
/** Wrapper coroutine that intercepts task completion.
157 -
template<>
 
158 -
struct when_all_homogeneous_state<void>
 
159 -
{
 
160 -
    when_all_core core_;
 
161 -
    std::vector<std::coroutine_handle<>> runner_handles_;
 
162 -

 
163 -
    explicit when_all_homogeneous_state(std::size_t count)
 
164 -
        : core_(count)
 
165 -
        , runner_handles_(count)
 
166 -
    {
 
167 -
    }
 
168 -
};
 
169 -

 
170 -
/** Wrapper coroutine that intercepts task completion for when_all.
 
171 -

 
172 -
    Parameterized on StateType to work with both heterogeneous (variadic)
 
173 -
    and homogeneous (range) state types. All state types expose their
 
174 -
    shared members through a `core_` member of type when_all_core.
 
175  

120  

176 -
    @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
121 +
    This runner awaits its assigned task and stores the result in
 
122 +
    the shared state, or captures the exception and requests stop.
177  
*/
123  
*/
178 -
template<typename StateType>
124 +
template<typename T, typename... Ts>
179  
struct when_all_runner
125  
struct when_all_runner
180  
{
126  
{
181 -
    struct promise_type
127 +
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
182  
    {
128  
    {
183 -
        StateType* state_ = nullptr;
129 +
        when_all_state<Ts...>* state_ = nullptr;
184 -
        std::size_t index_ = 0;
 
185  
        io_env env_;
130  
        io_env env_;
186  

131  

187 -
        when_all_runner get_return_object() noexcept
132 +
        when_all_runner get_return_object()
188  
        {
133  
        {
189 -
            return when_all_runner(
134 +
            return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
190 -
                std::coroutine_handle<promise_type>::from_promise(*this));
 
191  
        }
135  
        }
192  

136  

193  
        std::suspend_always initial_suspend() noexcept
137  
        std::suspend_always initial_suspend() noexcept
194  
        {
138  
        {
195  
            return {};
139  
            return {};
196  
        }
140  
        }
197  

141  

198  
        auto final_suspend() noexcept
142  
        auto final_suspend() noexcept
199  
        {
143  
        {
200  
            struct awaiter
144  
            struct awaiter
201  
            {
145  
            {
202  
                promise_type* p_;
146  
                promise_type* p_;
203 -
                bool await_ready() const noexcept { return false; }
147 +

 
148 +
                bool await_ready() const noexcept
 
149 +
                {
 
150 +
                    return false;
 
151 +
                }
 
152 +

204  
                auto await_suspend(std::coroutine_handle<> h) noexcept
153  
                auto await_suspend(std::coroutine_handle<> h) noexcept
205  
                {
154  
                {
206 -
                    auto& core = p_->state_->core_;
155 +
                    // Extract everything needed before self-destruction.
207 -
                    auto* counter = &core.remaining_count_;
156 +
                    auto* state = p_->state_;
208 -
                    auto* caller_env = core.caller_env_;
157 +
                    auto* counter = &state->remaining_count_;
209 -
                    auto cont = core.continuation_;
158 +
                    auto* caller_env = state->caller_env_;
 
159 +
                    auto cont = state->continuation_;
210  

160  

211  
                    h.destroy();
161  
                    h.destroy();
212  

162  

 
163 +
                    // If last runner, dispatch parent for symmetric transfer.
213  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
164  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
214  
                    if(remaining == 1)
165  
                    if(remaining == 1)
215  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
166  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
216  
                    return detail::symmetric_transfer(std::noop_coroutine());
167  
                    return detail::symmetric_transfer(std::noop_coroutine());
217  
                }
168  
                }
218 -
                void await_resume() const noexcept {}
169 +

 
170 +
                void await_resume() const noexcept
 
171 +
                {
 
172 +
                }
219  
            };
173  
            };
220  
            return awaiter{this};
174  
            return awaiter{this};
221  
        }
175  
        }
222  

176  

223 -
        void return_void() noexcept {}
177 +
        void return_void()
 
178 +
        {
 
179 +
        }
224  

180  

225  
        void unhandled_exception()
181  
        void unhandled_exception()
226  
        {
182  
        {
227 -
            state_->core_.capture_exception(std::current_exception());
183 +
            state_->capture_exception(std::current_exception());
228 -
            state_->core_.stop_source_.request_stop();
184 +
            // Request stop for sibling tasks
 
185 +
            state_->stop_source_.request_stop();
229  
        }
186  
        }
230  

187  

231  
        template<class Awaitable>
188  
        template<class Awaitable>
232  
        struct transform_awaiter
189  
        struct transform_awaiter
233  
        {
190  
        {
234  
            std::decay_t<Awaitable> a_;
191  
            std::decay_t<Awaitable> a_;
235  
            promise_type* p_;
192  
            promise_type* p_;
236  

193  

237 -
            bool await_ready() { return a_.await_ready(); }
194 +
            bool await_ready()
238 -
            decltype(auto) await_resume() { return a_.await_resume(); }
195 +
            {
 
196 +
                return a_.await_ready();
 
197 +
            }
 
198 +

 
199 +
            decltype(auto) await_resume()
 
200 +
            {
 
201 +
                return a_.await_resume();
 
202 +
            }
239  

203  

240  
            template<class Promise>
204  
            template<class Promise>
241  
            auto await_suspend(std::coroutine_handle<Promise> h)
205  
            auto await_suspend(std::coroutine_handle<Promise> h)
242  
            {
206  
            {
243  
                using R = decltype(a_.await_suspend(h, &p_->env_));
207  
                using R = decltype(a_.await_suspend(h, &p_->env_));
244  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
208  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
245  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
209  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
246  
                else
210  
                else
247  
                    return a_.await_suspend(h, &p_->env_);
211  
                    return a_.await_suspend(h, &p_->env_);
248  
            }
212  
            }
249  
        };
213  
        };
250  

214  

251  
        template<class Awaitable>
215  
        template<class Awaitable>
252  
        auto await_transform(Awaitable&& a)
216  
        auto await_transform(Awaitable&& a)
253  
        {
217  
        {
254  
            using A = std::decay_t<Awaitable>;
218  
            using A = std::decay_t<Awaitable>;
255  
            if constexpr (IoAwaitable<A>)
219  
            if constexpr (IoAwaitable<A>)
256  
            {
220  
            {
257  
                return transform_awaiter<Awaitable>{
221  
                return transform_awaiter<Awaitable>{
258  
                    std::forward<Awaitable>(a), this};
222  
                    std::forward<Awaitable>(a), this};
259  
            }
223  
            }
260  
            else
224  
            else
261  
            {
225  
            {
262  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
226  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
263  
            }
227  
            }
264  
        }
228  
        }
265  
    };
229  
    };
266  

230  

267  
    std::coroutine_handle<promise_type> h_;
231  
    std::coroutine_handle<promise_type> h_;
268  

232  

269 -
    explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
233 +
    explicit when_all_runner(std::coroutine_handle<promise_type> h)
270  
        : h_(h)
234  
        : h_(h)
271  
    {
235  
    {
272  
    }
236  
    }
273  

237  

274  
    // Enable move for all clang versions - some versions need it
238  
    // Enable move for all clang versions - some versions need it
275 -
    when_all_runner(when_all_runner&& other) noexcept
239 +
    when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
276 -
        : h_(std::exchange(other.h_, nullptr))
 
277 -
    {
 
278 -
    }
 
279  

240  

 
241 +
    // Non-copyable
280  
    when_all_runner(when_all_runner const&) = delete;
242  
    when_all_runner(when_all_runner const&) = delete;
281  
    when_all_runner& operator=(when_all_runner const&) = delete;
243  
    when_all_runner& operator=(when_all_runner const&) = delete;
282  
    when_all_runner& operator=(when_all_runner&&) = delete;
244  
    when_all_runner& operator=(when_all_runner&&) = delete;
283  

245  

284  
    auto release() noexcept
246  
    auto release() noexcept
285  
    {
247  
    {
286  
        return std::exchange(h_, nullptr);
248  
        return std::exchange(h_, nullptr);
287  
    }
249  
    }
288  
};
250  
};
289  

251  

290 -
/** Create a runner coroutine for a single awaitable (variadic path).
252 +
/** Create a runner coroutine for a single awaitable.
291  

253  

292 -
    Uses compile-time index for tuple-based result storage.
254 +
    Awaitable is passed directly to ensure proper coroutine frame storage.
293  
*/
255  
*/
294  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
256  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
295 -
when_all_runner<when_all_state<Ts...>>
257 +
when_all_runner<awaitable_result_t<Awaitable>, Ts...>
296  
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
258  
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
297  
{
259  
{
298  
    using T = awaitable_result_t<Awaitable>;
260  
    using T = awaitable_result_t<Awaitable>;
299  
    if constexpr (std::is_void_v<T>)
261  
    if constexpr (std::is_void_v<T>)
300  
    {
262  
    {
301  
        co_await std::move(inner);
263  
        co_await std::move(inner);
302  
    }
264  
    }
303  
    else
265  
    else
304  
    {
266  
    {
305  
        std::get<Index>(state->results_).set(co_await std::move(inner));
267  
        std::get<Index>(state->results_).set(co_await std::move(inner));
306  
    }
268  
    }
307  
}
269  
}
308  

270  

309 -
/** Create a runner coroutine for a single awaitable (range path).
271 +
/** Internal awaitable that launches all runner coroutines and waits.
310 -

 
311 -
    Uses runtime index for vector-based result storage.
 
312 -
*/
 
313 -
template<IoAwaitable Awaitable, typename StateType>
 
314 -
when_all_runner<StateType>
 
315 -
make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
 
316 -
{
 
317 -
    using T = awaitable_result_t<Awaitable>;
 
318 -
    if constexpr (std::is_void_v<T>)
 
319 -
    {
 
320 -
        co_await std::move(inner);
 
321 -
    }
 
322 -
    else
 
323 -
    {
 
324 -
        state->set_result(index, co_await std::move(inner));
 
325 -
    }
 
326 -
}
 
327 -

 
328 -
/** Internal awaitable that launches all variadic runner coroutines.
 
329  

272  

330 -
    CRITICAL: If the last task finishes synchronously then the parent
273 +
    This awaitable is used inside the when_all coroutine to handle
331 -
    coroutine resumes, destroying its frame, and destroying this object
274 +
    the concurrent execution of child awaitables.
332 -
    prior to the completion of await_suspend. Therefore, await_suspend
 
333 -
    must ensure `this` cannot be referenced after calling `launch_one`
 
334 -
    for the last time.
 
335  
*/
275  
*/
336  
template<IoAwaitable... Awaitables>
276  
template<IoAwaitable... Awaitables>
337  
class when_all_launcher
277  
class when_all_launcher
338  
{
278  
{
339  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
279  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
340  

280  

341  
    std::tuple<Awaitables...>* awaitables_;
281  
    std::tuple<Awaitables...>* awaitables_;
342  
    state_type* state_;
282  
    state_type* state_;
343  

283  

344  
public:
284  
public:
345  
    when_all_launcher(
285  
    when_all_launcher(
346  
        std::tuple<Awaitables...>* awaitables,
286  
        std::tuple<Awaitables...>* awaitables,
347  
        state_type* state)
287  
        state_type* state)
348  
        : awaitables_(awaitables)
288  
        : awaitables_(awaitables)
349  
        , state_(state)
289  
        , state_(state)
350  
    {
290  
    {
351  
    }
291  
    }
352  

292  

353  
    bool await_ready() const noexcept
293  
    bool await_ready() const noexcept
354  
    {
294  
    {
355  
        return sizeof...(Awaitables) == 0;
295  
        return sizeof...(Awaitables) == 0;
356  
    }
296  
    }
357  

297  

358  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
298  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
359  
    {
299  
    {
360 -
        state_->core_.continuation_ = continuation;
300 +
        state_->continuation_ = continuation;
361 -
        state_->core_.caller_env_ = caller_env;
301 +
        state_->caller_env_ = caller_env;
362  

302  

 
303 +
        // Forward parent's stop requests to children
363  
        if(caller_env->stop_token.stop_possible())
304  
        if(caller_env->stop_token.stop_possible())
364  
        {
305  
        {
365 -
            state_->core_.parent_stop_callback_.emplace(
306 +
            state_->parent_stop_callback_.emplace(
366  
                caller_env->stop_token,
307  
                caller_env->stop_token,
367 -
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
308 +
                typename state_type::stop_callback_fn{&state_->stop_source_});
368  

309  

369  
            if(caller_env->stop_token.stop_requested())
310  
            if(caller_env->stop_token.stop_requested())
370 -
                state_->core_.stop_source_.request_stop();
311 +
                state_->stop_source_.request_stop();
371  
        }
312  
        }
372  

313  

373 -
        auto token = state_->core_.stop_source_.get_token();
314 +
        // CRITICAL: If the last task finishes synchronously then the parent
 
315 +
        // coroutine resumes, destroying its frame, and destroying this object
 
316 +
        // prior to the completion of await_suspend. Therefore, await_suspend
 
317 +
        // must ensure `this` cannot be referenced after calling `launch_one`
 
318 +
        // for the last time.
 
319 +
        auto token = state_->stop_source_.get_token();
374  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
320  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
375  
            (..., launch_one<Is>(caller_env->executor, token));
321  
            (..., launch_one<Is>(caller_env->executor, token));
376  
        }(std::index_sequence_for<Awaitables...>{});
322  
        }(std::index_sequence_for<Awaitables...>{});
377  

323  

 
324 +
        // Let signal_completion() handle resumption
378  
        return std::noop_coroutine();
325  
        return std::noop_coroutine();
379  
    }
326  
    }
380  

327  

381  
    void await_resume() const noexcept
328  
    void await_resume() const noexcept
382  
    {
329  
    {
 
330 +
        // Results are extracted by the when_all coroutine from state
383  
    }
331  
    }
384  

332  

385  
private:
333  
private:
386  
    template<std::size_t I>
334  
    template<std::size_t I>
387  
    void launch_one(executor_ref caller_ex, std::stop_token token)
335  
    void launch_one(executor_ref caller_ex, std::stop_token token)
388  
    {
336  
    {
389  
        auto runner = make_when_all_runner<I>(
337  
        auto runner = make_when_all_runner<I>(
390  
            std::move(std::get<I>(*awaitables_)), state_);
338  
            std::move(std::get<I>(*awaitables_)), state_);
391  

339  

392  
        auto h = runner.release();
340  
        auto h = runner.release();
393  
        h.promise().state_ = state_;
341  
        h.promise().state_ = state_;
394 -
        h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
342 +
        h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->frame_allocator};
395  

343  

396  
        std::coroutine_handle<> ch{h};
344  
        std::coroutine_handle<> ch{h};
397  
        state_->runner_handles_[I] = ch;
345  
        state_->runner_handles_[I] = ch;
398 -
        state_->core_.caller_env_->executor.post(ch);
346 +
        state_->caller_env_->executor.post(ch);
399  
    }
347  
    }
400  
};
348  
};
401  

349  

402  
/** Helper to extract a single result from state.
350  
/** Helper to extract a single result from state.
403  
    This is a separate function to work around a GCC-11 ICE that occurs
351  
    This is a separate function to work around a GCC-11 ICE that occurs
404  
    when using nested immediately-invoked lambdas with pack expansion.
352  
    when using nested immediately-invoked lambdas with pack expansion.
405  
*/
353  
*/
406  
template<std::size_t I, typename... Ts>
354  
template<std::size_t I, typename... Ts>
407  
auto extract_single_result(when_all_state<Ts...>& state)
355  
auto extract_single_result(when_all_state<Ts...>& state)
408  
{
356  
{
409  
    return std::move(std::get<I>(state.results_)).get();
357  
    return std::move(std::get<I>(state.results_)).get();
410  
}
358  
}
411  

359  

412  
/** Extract all results from state as a tuple.
360  
/** Extract all results from state as a tuple.
413  
*/
361  
*/
414  
template<typename... Ts>
362  
template<typename... Ts>
415  
auto extract_results(when_all_state<Ts...>& state)
363  
auto extract_results(when_all_state<Ts...>& state)
416  
{
364  
{
417  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
365  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
418  
        return std::tuple(extract_single_result<Is>(state)...);
366  
        return std::tuple(extract_single_result<Is>(state)...);
419  
    }(std::index_sequence_for<Ts...>{});
367  
    }(std::index_sequence_for<Ts...>{});
420  
}
368  
}
421 -
/** Launches all homogeneous runners concurrently.
 
422 -

 
423 -
    Two-phase approach: create all runners first, then post all.
 
424 -
    This avoids lifetime issues if a task completes synchronously.
 
425 -
*/
 
426 -
template<typename Range>
 
427 -
class when_all_homogeneous_launcher
 
428 -
{
 
429 -
    using Awaitable = std::ranges::range_value_t<Range>;
 
430 -
    using T = awaitable_result_t<Awaitable>;
 
431 -

 
432 -
    Range* range_;
 
433 -
    when_all_homogeneous_state<T>* state_;
 
434 -

 
435 -
public:
 
436 -
    when_all_homogeneous_launcher(
 
437 -
        Range* range,
 
438 -
        when_all_homogeneous_state<T>* state)
 
439 -
        : range_(range)
 
440 -
        , state_(state)
 
441 -
    {
 
442 -
    }
 
443 -

 
444 -
    bool await_ready() const noexcept
 
445 -
    {
 
446 -
        return std::ranges::empty(*range_);
 
447 -
    }
 
448 -

 
449 -
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
 
450 -
    {
 
451 -
        state_->core_.continuation_ = continuation;
 
452 -
        state_->core_.caller_env_ = caller_env;
 
453 -

 
454 -
        if(caller_env->stop_token.stop_possible())
 
455 -
        {
 
456 -
            state_->core_.parent_stop_callback_.emplace(
 
457 -
                caller_env->stop_token,
 
458 -
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
 
459 -

 
460 -
            if(caller_env->stop_token.stop_requested())
 
461 -
                state_->core_.stop_source_.request_stop();
 
462 -
        }
 
463 -

 
464 -
        auto token = state_->core_.stop_source_.get_token();
 
465 -

 
466 -
        // Phase 1: Create all runners without dispatching.
 
467 -
        std::size_t index = 0;
 
468 -
        for(auto&& a : *range_)
 
469 -
        {
 
470 -
            auto runner = make_when_all_homogeneous_runner(
 
471 -
                std::move(a), state_, index);
 
472 -

 
473 -
            auto h = runner.release();
 
474 -
            h.promise().state_ = state_;
 
475 -
            h.promise().index_ = index;
 
476 -
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
 
477 -

 
478 -
            state_->runner_handles_[index] = std::coroutine_handle<>{h};
 
479 -
            ++index;
 
480 -
        }
 
481 -

 
482 -
        // Phase 2: Post all runners. Any may complete synchronously.
 
483 -
        // After last post, state_ and this may be destroyed.
 
484 -
        std::coroutine_handle<>* handles = state_->runner_handles_.data();
 
485 -
        std::size_t count = state_->runner_handles_.size();
 
486 -
        for(std::size_t i = 0; i < count; ++i)
 
487 -
            caller_env->executor.post(handles[i]);
 
488 -

 
489 -
        return std::noop_coroutine();
 
490 -
    }
 
491 -

 
492 -
    void await_resume() const noexcept
 
493 -
    {
 
494 -
    }
 
495 -
};
 
496 -

 
497  

369  

498  
} // namespace detail
370  
} // namespace detail
499  

371  

500  
/** Compute the when_all result tuple type.
372  
/** Compute the when_all result tuple type.
501  

373  

502  
    Void-returning tasks contribute std::monostate to preserve the
374  
    Void-returning tasks contribute std::monostate to preserve the
503  
    task-index-to-result-index mapping, matching when_any's approach.
375  
    task-index-to-result-index mapping, matching when_any's approach.
504  

376  

505  
    Example: when_all_result_t<int, void, string> = std::tuple<int, std::monostate, string>
377  
    Example: when_all_result_t<int, void, string> = std::tuple<int, std::monostate, string>
506  
    Example: when_all_result_t<void, void> = std::tuple<std::monostate, std::monostate>
378  
    Example: when_all_result_t<void, void> = std::tuple<std::monostate, std::monostate>
507  
*/
379  
*/
508  
template<typename... Ts>
380  
template<typename... Ts>
509  
using when_all_result_t = std::tuple<void_to_monostate_t<Ts>...>;
381  
using when_all_result_t = std::tuple<void_to_monostate_t<Ts>...>;
510  

382  

511  
/** Execute multiple awaitables concurrently and collect their results.
383  
/** Execute multiple awaitables concurrently and collect their results.
512  

384  

513  
    Launches all awaitables simultaneously and waits for all to complete
385  
    Launches all awaitables simultaneously and waits for all to complete
514  
    before returning. Results are collected in input order. If any
386  
    before returning. Results are collected in input order. If any
515  
    awaitable throws, cancellation is requested for siblings and the first
387  
    awaitable throws, cancellation is requested for siblings and the first
516  
    exception is rethrown after all awaitables complete.
388  
    exception is rethrown after all awaitables complete.
517  

389  

518  
    @li All child awaitables run concurrently on the caller's executor
390  
    @li All child awaitables run concurrently on the caller's executor
519  
    @li Results are returned as a tuple in input order
391  
    @li Results are returned as a tuple in input order
520  
    @li Void-returning awaitables contribute std::monostate to the
392  
    @li Void-returning awaitables contribute std::monostate to the
521  
        result tuple, preserving the task-index-to-result-index mapping
393  
        result tuple, preserving the task-index-to-result-index mapping
522  
    @li First exception wins; subsequent exceptions are discarded
394  
    @li First exception wins; subsequent exceptions are discarded
523  
    @li Stop is requested for siblings on first error
395  
    @li Stop is requested for siblings on first error
524  
    @li Completes only after all children have finished
396  
    @li Completes only after all children have finished
525  

397  

526  
    @par Thread Safety
398  
    @par Thread Safety
527  
    The returned task must be awaited from a single execution context.
399  
    The returned task must be awaited from a single execution context.
528  
    Child awaitables execute concurrently but complete through the caller's
400  
    Child awaitables execute concurrently but complete through the caller's
529  
    executor.
401  
    executor.
530  

402  

531  
    @param awaitables The awaitables to execute concurrently. Each must
403  
    @param awaitables The awaitables to execute concurrently. Each must
532  
        satisfy @ref IoAwaitable and is consumed (moved-from) when
404  
        satisfy @ref IoAwaitable and is consumed (moved-from) when
533  
        `when_all` is awaited.
405  
        `when_all` is awaited.
534  

406  

535  
    @return A task yielding a tuple of results in input order. Void tasks
407  
    @return A task yielding a tuple of results in input order. Void tasks
536  
        contribute std::monostate to preserve index correspondence.
408  
        contribute std::monostate to preserve index correspondence.
537  

409  

538  
    @par Example
410  
    @par Example
539  

411  

540  
    @code
412  
    @code
541  
    task<> example()
413  
    task<> example()
542  
    {
414  
    {
543  
        // Concurrent fetch, results collected in order
415  
        // Concurrent fetch, results collected in order
544  
        auto [user, posts] = co_await when_all(
416  
        auto [user, posts] = co_await when_all(
545  
            fetch_user( id ),      // task<User>
417  
            fetch_user( id ),      // task<User>
546  
            fetch_posts( id )      // task<std::vector<Post>>
418  
            fetch_posts( id )      // task<std::vector<Post>>
547  
        );
419  
        );
548  

420  

549  
        // Void awaitables contribute monostate
421  
        // Void awaitables contribute monostate
550  
        auto [a, _, b] = co_await when_all(
422  
        auto [a, _, b] = co_await when_all(
551  
            fetch_int(),           // task<int>
423  
            fetch_int(),           // task<int>
552  
            log_event( "start" ),  // task<void>  → monostate
424  
            log_event( "start" ),  // task<void>  → monostate
553  
            fetch_str()            // task<string>
425  
            fetch_str()            // task<string>
554  
        );
426  
        );
555  
        // a is int, _ is monostate, b is string
427  
        // a is int, _ is monostate, b is string
556  
    }
428  
    }
557  
    @endcode
429  
    @endcode
558  

430  

559  
    @see IoAwaitable, task
431  
    @see IoAwaitable, task
560  
*/
432  
*/
561  
template<IoAwaitable... As>
433  
template<IoAwaitable... As>
562  
[[nodiscard]] auto when_all(As... awaitables)
434  
[[nodiscard]] auto when_all(As... awaitables)
563  
    -> task<when_all_result_t<awaitable_result_t<As>...>>
435  
    -> task<when_all_result_t<awaitable_result_t<As>...>>
564  
{
436  
{
565  
    // State is stored in the coroutine frame, using the frame allocator
437  
    // State is stored in the coroutine frame, using the frame allocator
566  
    detail::when_all_state<awaitable_result_t<As>...> state;
438  
    detail::when_all_state<awaitable_result_t<As>...> state;
567  

439  

568  
    // Store awaitables in the frame
440  
    // Store awaitables in the frame
569  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
441  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
570  

442  

571  
    // Launch all awaitables and wait for completion
443  
    // Launch all awaitables and wait for completion
572  
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
444  
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
573  

445  

574  
    // Propagate first exception if any.
446  
    // Propagate first exception if any.
575  
    // Safe without explicit acquire: capture_exception() is sequenced-before
447  
    // Safe without explicit acquire: capture_exception() is sequenced-before
576  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
448  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
577  
    // last task's decrement that resumes this coroutine.
449  
    // last task's decrement that resumes this coroutine.
578 -
    if(state.core_.first_exception_)
450 +
    if(state.first_exception_)
579 -
        std::rethrow_exception(state.core_.first_exception_);
451 +
        std::rethrow_exception(state.first_exception_);
580  

452  

581 -
}
 
582 -

 
583 -
/** Execute a range of awaitables concurrently and collect their results.
 
584 -

 
585 -
    Launches all awaitables in the range simultaneously and waits for all
 
586 -
    to complete. Results are collected in a vector preserving input order.
 
587 -
    If any awaitable throws, cancellation is requested for siblings and
 
588 -
    the first exception is rethrown after all awaitables complete.
 
589 -

 
590 -
    @li All child awaitables run concurrently on the caller's executor
 
591 -
    @li Results are returned as a vector in input order
 
592 -
    @li First exception wins; subsequent exceptions are discarded
 
593 -
    @li Stop is requested for siblings on first error
 
594 -
    @li Completes only after all children have finished
 
595 -

 
596 -
    @par Thread Safety
 
597 -
    The returned task must be awaited from a single execution context.
 
598 -
    Child awaitables execute concurrently but complete through the caller's
 
599 -
    executor.
 
600 -

 
601 -
    @param awaitables Range of awaitables to execute concurrently (must
 
602 -
        not be empty). Each element must satisfy @ref IoAwaitable and is
 
603 -
        consumed (moved-from) when `when_all` is awaited.
 
604 -

 
605 -
    @return A task yielding a vector where each element is the result of
 
606 -
        the corresponding awaitable, in input order.
 
607 -

 
608 -
    @throws std::invalid_argument if range is empty (thrown before
 
609 -
        coroutine suspends).
 
610 -
    @throws Rethrows the first child exception after all children
 
611 -
        complete.
 
612 -

 
613 -
    @par Example
 
614 -
    @code
 
615 -
    task<void> example()
 
616 -
    {
 
617 -
        std::vector<task<Response>> requests;
 
618 -
        for (auto const& url : urls)
 
619 -
            requests.push_back(fetch(url));
 
620 -

 
621 -
        auto responses = co_await when_all(std::move(requests));
 
622 -
    }
 
623 -
    @endcode
 
624 -

 
625 -
    @see IoAwaitableRange, when_all
 
626 -
*/
 
627 -
template<IoAwaitableRange R>
 
628 -
    requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
 
629 -
[[nodiscard]] auto when_all(R&& awaitables)
 
630 -
    -> task<std::vector<awaitable_result_t<std::ranges::range_value_t<R>>>>
 
631 -
{
 
632 -
    using Awaitable = std::ranges::range_value_t<R>;
 
633 -
    using T = awaitable_result_t<Awaitable>;
 
634 -
    using OwnedRange = std::remove_cvref_t<R>;
 
635 -

 
636 -
    auto count = std::ranges::size(awaitables);
 
637 -
    if(count == 0)
 
638 -
        throw std::invalid_argument("when_all requires at least one awaitable");
 
639 -

 
640 -
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
 
641 -

 
642 -
    detail::when_all_homogeneous_state<T> state(count);
 
643 -

 
644 -
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
 
645 -
        &owned_awaitables, &state);
 
646 -

 
647 -
    if(state.core_.first_exception_)
 
648 -
        std::rethrow_exception(state.core_.first_exception_);
 
649 -

 
650 -
    std::vector<T> results;
 
651 -
    results.reserve(count);
 
652 -
    for(auto& opt : state.results_)
 
653 -
        results.push_back(std::move(*opt));
 
654 -

 
655 -
    co_return results;
 
656 -
}
 
657 -

 
658 -
/** Execute a range of void awaitables concurrently.
 
659 -

 
660 -
    Launches all awaitables in the range simultaneously and waits for all
 
661 -
    to complete. Since all awaitables return void, no results are collected.
 
662 -
    If any awaitable throws, cancellation is requested for siblings and
 
663 -
    the first exception is rethrown after all awaitables complete.
 
664 -

 
665 -
    @li All child awaitables run concurrently on the caller's executor
 
666 -
    @li First exception wins; subsequent exceptions are discarded
 
667 -
    @li Stop is requested for siblings on first error
 
668 -
    @li Completes only after all children have finished
 
669 -

 
670 -
    @par Thread Safety
 
671 -
    The returned task must be awaited from a single execution context.
 
672 -
    Child awaitables execute concurrently but complete through the caller's
 
673 -
    executor.
 
674 -

 
675 -
    @param awaitables Range of void awaitables to execute concurrently
 
676 -
        (must not be empty).
 
677 -

 
678 -
    @throws std::invalid_argument if range is empty (thrown before
 
679 -
        coroutine suspends).
 
680 -
    @throws Rethrows the first child exception after all children
 
681 -
        complete.
 
682 -

 
683 -
    @par Example
 
684 -
    @code
 
685 -
    task<void> example()
 
686 -
    {
 
687 -
        std::vector<task<void>> jobs;
 
688 -
        for (int i = 0; i < n; ++i)
 
689 -
            jobs.push_back(process(i));
 
690 -

 
691 -
        co_await when_all(std::move(jobs));
 
692 -
    }
 
693 -
    @endcode
 
694 -

 
695 -
    @see IoAwaitableRange, when_all
 
696 -
*/
 
697 -
template<IoAwaitableRange R>
 
698 -
    requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
 
699 -
[[nodiscard]] auto when_all(R&& awaitables) -> task<void>
 
700 -
{
 
701 -
    using OwnedRange = std::remove_cvref_t<R>;
 
702 -

 
703 -
    auto count = std::ranges::size(awaitables);
 
704 -
    if(count == 0)
 
705 -
        throw std::invalid_argument("when_all requires at least one awaitable");
 
706 -

 
707 -
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
 
708 -

 
709 -
    detail::when_all_homogeneous_state<void> state(count);
 
710 -

 
711 -
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
 
712 -
        &owned_awaitables, &state);
 
713 -

 
714 -
    if(state.core_.first_exception_)
 
715 -
        std::rethrow_exception(state.core_.first_exception_);
 
716  
    co_return detail::extract_results(state);
453  
    co_return detail::extract_results(state);
717  
}
454  
}
718  

455  

719  
} // namespace capy
456  
} // namespace capy
720  
} // namespace boost
457  
} // namespace boost
721  

458  

722  
#endif
459  
#endif