LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 99.3 % 147 146 1
Test Date: 2026-03-12 20:08:21 Functions: 91.2 % 668 609 59

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      11                 : #define BOOST_CAPY_WHEN_ANY_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/void_to_monostate.hpp>
      15                 : #include <boost/capy/concept/executor.hpp>
      16                 : #include <boost/capy/concept/io_awaitable.hpp>
      17                 : #include <coroutine>
      18                 : #include <boost/capy/ex/executor_ref.hpp>
      19                 : #include <boost/capy/ex/frame_allocator.hpp>
      20                 : #include <boost/capy/ex/io_env.hpp>
      21                 : #include <boost/capy/task.hpp>
      22                 : 
      23                 : #include <array>
      24                 : #include <atomic>
      25                 : #include <exception>
      26                 : #include <optional>
      27                 : #include <ranges>
      28                 : #include <stdexcept>
      29                 : #include <stop_token>
      30                 : #include <tuple>
      31                 : #include <type_traits>
      32                 : #include <utility>
      33                 : #include <variant>
      34                 : #include <vector>
      35                 : 
      36                 : /*
      37                 :    when_any - Race multiple tasks, return first completion
      38                 :    ========================================================
      39                 : 
      40                 :    OVERVIEW:
      41                 :    ---------
      42                 :    when_any launches N tasks concurrently and completes when the FIRST task
      43                 :    finishes (success or failure). It then requests stop for all siblings and
      44                 :    waits for them to acknowledge before returning.
      45                 : 
      46                 :    ARCHITECTURE:
      47                 :    -------------
      48                 :    The design mirrors when_all but with inverted completion semantics:
      49                 : 
      50                 :      when_all:  complete when remaining_count reaches 0 (all done)
      51                 :      when_any:  complete when has_winner becomes true (first done)
      52                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      53                 : 
      54                 :    Key components:
      55                 :      - when_any_state:    Shared state tracking winner and completion
      56                 :      - when_any_runner:   Wrapper coroutine for each child task
      57                 :      - when_any_launcher: Awaitable that starts all runners concurrently
      58                 : 
      59                 :    CRITICAL INVARIANTS:
      60                 :    --------------------
      61                 :    1. Exactly one task becomes the winner (via atomic compare_exchange)
      62                 :    2. All tasks must complete before parent resumes (cleanup safety)
      63                 :    3. Stop is requested immediately when winner is determined
      64                 :    4. Only the winner's result/exception is stored
      65                 : 
      66                 :    POSITIONAL VARIANT:
      67                 :    -------------------
      68                 :    The variadic overload returns a std::variant with one alternative per
      69                 :    input task, preserving positional correspondence. Use .index() on
      70                 :    the variant to identify which task won.
      71                 : 
      72                 :    Example: when_any(task<int>, task<string>, task<int>)
      73                 :      - Raw types after void->monostate: int, string, int
      74                 :      - Result variant: std::variant<int, string, int>
      75                 :      - variant.index() tells you which task won (0, 1, or 2)
      76                 : 
      77                 :    VOID HANDLING:
      78                 :    --------------
      79                 :    void tasks contribute std::monostate to the variant.
      80                 :    All-void tasks result in: variant<monostate, monostate, monostate>
      81                 : 
      82                 :    MEMORY MODEL:
      83                 :    -------------
      84                 :    Synchronization chain from winner's write to parent's read:
      85                 : 
      86                 :    1. Winner thread writes result_/winner_exception_ (non-atomic)
      87                 :    2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
      88                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      89                 :       → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      90                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      91                 :    5. Parent coroutine resumes and reads result_/winner_exception_
      92                 : 
      93                 :    Synchronization analysis:
      94                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      95                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      96                 :      in the modification order of remaining_count_
      97                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
      98                 :      modification order, establishing happens-before from winner's writes
      99                 :    - Executor dispatch() is expected to provide queue-based synchronization
     100                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     101                 :    - Even inline executors work (same thread = sequenced-before)
     102                 : 
     103                 :    Alternative considered: Adding winner_ready_ atomic (set with release after
     104                 :    storing winner data, acquired before reading) would make synchronization
     105                 :    self-contained and not rely on executor implementation details. Current
     106                 :    approach is correct but requires careful reasoning about release sequences
     107                 :    and executor behavior.
     108                 : 
     109                 :    EXCEPTION SEMANTICS:
     110                 :    --------------------
     111                 :    Unlike when_all (which captures first exception, discards others), when_any
     112                 :    treats exceptions as valid completions. If the winning task threw, that
     113                 :    exception is rethrown. Exceptions from non-winners are silently discarded.
     114                 : */
     115                 : 
     116                 : namespace boost {
     117                 : namespace capy {
     118                 : 
     119                 : namespace detail {
     120                 : 
     121                 : /** Core shared state for when_any operations.
     122                 : 
     123                 :     Contains all members and methods common to both heterogeneous (variadic)
     124                 :     and homogeneous (range) when_any implementations. State classes embed
     125                 :     this via composition to avoid CRTP destructor ordering issues.
     126                 : 
     127                 :     @par Thread Safety
     128                 :     Atomic operations protect winner selection and completion count.
     129                 : */
     130                 : struct when_any_core
     131                 : {
     132                 :     std::atomic<std::size_t> remaining_count_;
     133                 :     std::size_t winner_index_{0};
     134                 :     std::exception_ptr winner_exception_;
     135                 :     std::stop_source stop_source_;
     136                 : 
     137                 :     // Bridges parent's stop token to our stop_source
     138                 :     struct stop_callback_fn
     139                 :     {
     140                 :         std::stop_source* source_;
     141 HIT           9 :         void operator()() const noexcept { source_->request_stop(); }
     142                 :     };
     143                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     144                 :     std::optional<stop_callback_t> parent_stop_callback_;
     145                 : 
     146                 :     std::coroutine_handle<> continuation_;
     147                 :     io_env const* caller_env_ = nullptr;
     148                 : 
     149                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     150                 :     std::atomic<bool> has_winner_{false};
     151                 : 
     152              73 :     explicit when_any_core(std::size_t count) noexcept
     153              73 :         : remaining_count_(count)
     154                 :     {
     155              73 :     }
     156                 : 
     157                 :     /** Atomically claim winner status; exactly one task succeeds. */
     158             206 :     bool try_win(std::size_t index) noexcept
     159                 :     {
     160             206 :         bool expected = false;
     161             206 :         if(has_winner_.compare_exchange_strong(
     162                 :             expected, true, std::memory_order_acq_rel))
     163                 :         {
     164              73 :             winner_index_ = index;
     165              73 :             stop_source_.request_stop();
     166              73 :             return true;
     167                 :         }
     168             133 :         return false;
     169                 :     }
     170                 : 
     171                 :     /** @pre try_win() returned true. */
     172               9 :     void set_winner_exception(std::exception_ptr ep) noexcept
     173                 :     {
     174               9 :         winner_exception_ = ep;
     175               9 :     }
     176                 : 
     177                 :     // Runners signal completion directly via final_suspend; no member function needed.
     178                 : };
     179                 : 
     180                 : /** Shared state for heterogeneous when_any operation.
     181                 : 
     182                 :     Coordinates winner selection, result storage, and completion tracking
     183                 :     for all child tasks in a when_any operation. Uses composition with
     184                 :     when_any_core for shared functionality.
     185                 : 
     186                 :     @par Lifetime
     187                 :     Allocated on the parent coroutine's frame, outlives all runners.
     188                 : 
     189                 :     @tparam Ts Task result types.
     190                 : */
     191                 : template<typename... Ts>
     192                 : struct when_any_state
     193                 : {
     194                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     195                 :     using variant_type = std::variant<void_to_monostate_t<Ts>...>;
     196                 : 
     197                 :     when_any_core core_;
     198                 :     std::optional<variant_type> result_;
     199                 :     std::array<std::coroutine_handle<>, task_count> runner_handles_{};
     200                 : 
     201              51 :     when_any_state()
     202              51 :         : core_(task_count)
     203                 :     {
     204              51 :     }
     205                 : 
     206                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     207                 : 
     208                 :     /** @pre core_.try_win() returned true.
     209                 :         @note Uses in_place_index (not type) for positional variant access.
     210                 :     */
     211                 :     template<std::size_t I, typename T>
     212              37 :     void set_winner_result(T value)
     213                 :         noexcept(std::is_nothrow_move_constructible_v<T>)
     214                 :     {
     215              37 :         result_.emplace(std::in_place_index<I>, std::move(value));
     216              37 :     }
     217                 : 
     218                 :     /** @pre core_.try_win() returned true. */
     219                 :     template<std::size_t I>
     220               8 :     void set_winner_void() noexcept
     221                 :     {
     222               8 :         result_.emplace(std::in_place_index<I>, std::monostate{});
     223               8 :     }
     224                 : };
     225                 : 
     226                 : /** Wrapper coroutine that runs a single child task for when_any.
     227                 : 
     228                 :     Propagates executor/stop_token to the child, attempts to claim winner
     229                 :     status on completion, and signals completion for cleanup coordination.
     230                 : 
     231                 :     @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
     232                 : */
     233                 : template<typename StateType>
     234                 : struct when_any_runner
     235                 : {
     236                 :     struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
     237                 :     {
     238                 :         StateType* state_ = nullptr;
     239                 :         std::size_t index_ = 0;
     240                 :         io_env env_;
     241                 : 
     242             206 :         when_any_runner get_return_object() noexcept
     243                 :         {
     244             206 :             return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
     245                 :         }
     246                 : 
     247                 :         // Starts suspended; launcher sets up state/ex/token then resumes
     248             206 :         std::suspend_always initial_suspend() noexcept
     249                 :         {
     250             206 :             return {};
     251                 :         }
     252                 : 
     253             206 :         auto final_suspend() noexcept
     254                 :         {
     255                 :             struct awaiter
     256                 :             {
     257                 :                 promise_type* p_;
     258             206 :                 bool await_ready() const noexcept { return false; }
     259             206 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     260                 :                 {
     261                 :                     // Extract everything needed before self-destruction.
     262             206 :                     auto& core = p_->state_->core_;
     263             206 :                     auto* counter = &core.remaining_count_;
     264             206 :                     auto* caller_env = core.caller_env_;
     265             206 :                     auto cont = core.continuation_;
     266                 : 
     267             206 :                     h.destroy();
     268                 : 
     269                 :                     // If last runner, dispatch parent for symmetric transfer.
     270             206 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     271             206 :                     if(remaining == 1)
     272              73 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     273             133 :                     return detail::symmetric_transfer(std::noop_coroutine());
     274                 :                 }
     275 MIS           0 :                 void await_resume() const noexcept {}
     276                 :             };
     277 HIT         206 :             return awaiter{this};
     278                 :         }
     279                 : 
     280             193 :         void return_void() noexcept {}
     281                 : 
     282                 :         // Exceptions are valid completions in when_any (unlike when_all)
     283              13 :         void unhandled_exception()
     284                 :         {
     285              13 :             if(state_->core_.try_win(index_))
     286               9 :                 state_->core_.set_winner_exception(std::current_exception());
     287              13 :         }
     288                 : 
     289                 :         /** Injects executor and stop token into child awaitables. */
     290                 :         template<class Awaitable>
     291                 :         struct transform_awaiter
     292                 :         {
     293                 :             std::decay_t<Awaitable> a_;
     294                 :             promise_type* p_;
     295                 : 
     296             206 :             bool await_ready() { return a_.await_ready(); }
     297             206 :             auto await_resume() { return a_.await_resume(); }
     298                 : 
     299                 :             template<class Promise>
     300             200 :             auto await_suspend(std::coroutine_handle<Promise> h)
     301                 :             {
     302                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     303                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     304             200 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     305                 :                 else
     306                 :                     return a_.await_suspend(h, &p_->env_);
     307                 :             }
     308                 :         };
     309                 : 
     310                 :         template<class Awaitable>
     311             206 :         auto await_transform(Awaitable&& a)
     312                 :         {
     313                 :             using A = std::decay_t<Awaitable>;
     314                 :             if constexpr (IoAwaitable<A>)
     315                 :             {
     316                 :                 return transform_awaiter<Awaitable>{
     317             412 :                     std::forward<Awaitable>(a), this};
     318                 :             }
     319                 :             else
     320                 :             {
     321                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     322                 :             }
     323             206 :         }
     324                 :     };
     325                 : 
     326                 :     std::coroutine_handle<promise_type> h_;
     327                 : 
     328             206 :     explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
     329             206 :         : h_(h)
     330                 :     {
     331             206 :     }
     332                 : 
     333                 :     // Enable move for all clang versions - some versions need it
     334                 :     when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
     335                 : 
     336                 :     // Non-copyable
     337                 :     when_any_runner(when_any_runner const&) = delete;
     338                 :     when_any_runner& operator=(when_any_runner const&) = delete;
     339                 :     when_any_runner& operator=(when_any_runner&&) = delete;
     340                 : 
     341             206 :     auto release() noexcept
     342                 :     {
     343             206 :         return std::exchange(h_, nullptr);
     344                 :     }
     345                 : };
     346                 : 
     347                 : /** Indexed overload for heterogeneous when_any (compile-time index).
     348                 : 
     349                 :     Uses compile-time index I for variant construction via in_place_index.
     350                 :     Called from when_any_launcher::launch_one<I>().
     351                 : */
     352                 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
     353                 : when_any_runner<StateType>
     354             121 : make_when_any_runner(Awaitable inner, StateType* state)
     355                 : {
     356                 :     using T = awaitable_result_t<Awaitable>;
     357                 :     if constexpr (std::is_void_v<T>)
     358                 :     {
     359                 :         co_await std::move(inner);
     360                 :         if(state->core_.try_win(I))
     361                 :             state->template set_winner_void<I>();
     362                 :     }
     363                 :     else
     364                 :     {
     365                 :         auto result = co_await std::move(inner);
     366                 :         if(state->core_.try_win(I))
     367                 :         {
     368                 :             try
     369                 :             {
     370                 :                 state->template set_winner_result<I>(std::move(result));
     371                 :             }
     372                 :             catch(...)
     373                 :             {
     374                 :                 state->core_.set_winner_exception(std::current_exception());
     375                 :             }
     376                 :         }
     377                 :     }
     378             242 : }
     379                 : 
     380                 : /** Runtime-index overload for homogeneous when_any (range path).
     381                 : 
     382                 :     Uses requires-expressions to detect state capabilities:
     383                 :     - set_winner_void(): for heterogeneous void tasks (stores monostate)
     384                 :     - set_winner_result(): for non-void tasks
     385                 :     - Neither: for homogeneous void tasks (no result storage)
     386                 : */
     387                 : template<IoAwaitable Awaitable, typename StateType>
     388                 : when_any_runner<StateType>
     389              85 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
     390                 : {
     391                 :     using T = awaitable_result_t<Awaitable>;
     392                 :     if constexpr (std::is_void_v<T>)
     393                 :     {
     394                 :         co_await std::move(inner);
     395                 :         if(state->core_.try_win(index))
     396                 :         {
     397                 :             if constexpr (requires { state->set_winner_void(); })
     398                 :                 state->set_winner_void();
     399                 :         }
     400                 :     }
     401                 :     else
     402                 :     {
     403                 :         auto result = co_await std::move(inner);
     404                 :         if(state->core_.try_win(index))
     405                 :         {
     406                 :             try
     407                 :             {
     408                 :                 state->set_winner_result(std::move(result));
     409                 :             }
     410                 :             catch(...)
     411                 :             {
     412                 :                 state->core_.set_winner_exception(std::current_exception());
     413                 :             }
     414                 :         }
     415                 :     }
     416             170 : }
     417                 : 
     418                 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     419                 : template<IoAwaitable... Awaitables>
     420                 : class when_any_launcher
     421                 : {
     422                 :     using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
     423                 : 
     424                 :     std::tuple<Awaitables...>* tasks_;
     425                 :     state_type* state_;
     426                 : 
     427                 : public:
     428              51 :     when_any_launcher(
     429                 :         std::tuple<Awaitables...>* tasks,
     430                 :         state_type* state)
     431              51 :         : tasks_(tasks)
     432              51 :         , state_(state)
     433                 :     {
     434              51 :     }
     435                 : 
     436              51 :     bool await_ready() const noexcept
     437                 :     {
     438              51 :         return sizeof...(Awaitables) == 0;
     439                 :     }
     440                 : 
     441                 :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     442                 :         destroys this object before await_suspend returns. Must not reference
     443                 :         `this` after the final launch_one call.
     444                 :     */
     445              51 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     446                 :     {
     447              51 :         state_->core_.continuation_ = continuation;
     448              51 :         state_->core_.caller_env_ = caller_env;
     449                 : 
     450              51 :         if(caller_env->stop_token.stop_possible())
     451                 :         {
     452              18 :             state_->core_.parent_stop_callback_.emplace(
     453               9 :                 caller_env->stop_token,
     454               9 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     455                 : 
     456               9 :             if(caller_env->stop_token.stop_requested())
     457               3 :                 state_->core_.stop_source_.request_stop();
     458                 :         }
     459                 : 
     460              51 :         auto token = state_->core_.stop_source_.get_token();
     461             102 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     462              51 :             (..., launch_one<Is>(caller_env->executor, token));
     463              51 :         }(std::index_sequence_for<Awaitables...>{});
     464                 : 
     465             102 :         return std::noop_coroutine();
     466              51 :     }
     467                 : 
     468              51 :     void await_resume() const noexcept
     469                 :     {
     470              51 :     }
     471                 : 
     472                 : private:
     473                 :     /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
     474                 :     template<std::size_t I>
     475             121 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     476                 :     {
     477             121 :         auto runner = make_when_any_runner<I>(
     478             121 :             std::move(std::get<I>(*tasks_)), state_);
     479                 : 
     480             121 :         auto h = runner.release();
     481             121 :         h.promise().state_ = state_;
     482             121 :         h.promise().index_ = I;
     483             121 :         h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
     484                 : 
     485             121 :         std::coroutine_handle<> ch{h};
     486             121 :         state_->runner_handles_[I] = ch;
     487             121 :         caller_ex.post(ch);
     488             242 :     }
     489                 : };
     490                 : 
     491                 : } // namespace detail
     492                 : 
     493                 : /** Wait for the first awaitable to complete.
     494                 : 
     495                 :     Races multiple heterogeneous awaitables concurrently and returns when the
     496                 :     first one completes. The result is a variant with one alternative per
     497                 :     input task, preserving positional correspondence.
     498                 : 
     499                 :     @par Suspends
     500                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     501                 :     are launched concurrently and execute in parallel. The coroutine resumes
     502                 :     only after all awaitables have completed, even though the winner is
     503                 :     determined by the first to finish.
     504                 : 
     505                 :     @par Completion Conditions
     506                 :     @li Winner is determined when the first awaitable completes (success or exception)
     507                 :     @li Only one task can claim winner status via atomic compare-exchange
     508                 :     @li Once a winner exists, stop is requested for all remaining siblings
     509                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     510                 :     @li The winner's result is returned; if the winner threw, the exception is rethrown
     511                 : 
     512                 :     @par Cancellation Semantics
     513                 :     Cancellation is supported via stop_token propagated through the
     514                 :     IoAwaitable protocol:
     515                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     516                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     517                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     518                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     519                 :     @li Stop requests are cooperative; tasks must check and respond to them
     520                 : 
     521                 :     @par Concurrency/Overlap
     522                 :     All awaitables are launched concurrently before any can complete.
     523                 :     The launcher iterates through the arguments, starting each task on the
     524                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     525                 :     executors or interleave on single-threaded executors. There is no
     526                 :     guaranteed ordering of task completion.
     527                 : 
     528                 :     @par Notable Error Conditions
     529                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     530                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     531                 :     @li Cancellation: tasks may complete via cancellation without throwing
     532                 : 
     533                 :     @par Example
     534                 :     @code
     535                 :     task<void> example() {
     536                 :         auto result = co_await when_any(
     537                 :             fetch_int(),      // task<int>
     538                 :             fetch_string()    // task<std::string>
     539                 :         );
     540                 :         // result.index() is 0 or 1
     541                 :         if (result.index() == 0)
     542                 :             std::cout << "Got int: " << std::get<0>(result) << "\n";
     543                 :         else
     544                 :             std::cout << "Got string: " << std::get<1>(result) << "\n";
     545                 :     }
     546                 :     @endcode
     547                 : 
     548                 :     @param as Awaitables to race concurrently (at least one required; each
     549                 :         must satisfy IoAwaitable).
     550                 :     @return A task yielding a std::variant with one alternative per awaitable.
     551                 :         Use .index() to identify the winner. Void awaitables contribute
     552                 :         std::monostate.
     553                 : 
     554                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     555                 : 
     556                 :     @par Remarks
     557                 :     Awaitables are moved into the coroutine frame; original objects become
     558                 :     empty after the call. The variant preserves one alternative per input
     559                 :     task. Use .index() to determine which awaitable completed first.
     560                 :     Void awaitables contribute std::monostate to the variant.
     561                 : 
     562                 :     @see when_all, IoAwaitable
     563                 : */
     564                 : template<IoAwaitable... As>
     565                 :     requires (sizeof...(As) > 0)
     566              51 : [[nodiscard]] auto when_any(As... as)
     567                 :     -> task<std::variant<void_to_monostate_t<awaitable_result_t<As>>...>>
     568                 : {
     569                 :     detail::when_any_state<awaitable_result_t<As>...> state;
     570                 :     std::tuple<As...> awaitable_tuple(std::move(as)...);
     571                 : 
     572                 :     co_await detail::when_any_launcher<As...>(&awaitable_tuple, &state);
     573                 : 
     574                 :     if(state.core_.winner_exception_)
     575                 :         std::rethrow_exception(state.core_.winner_exception_);
     576                 : 
     577                 :     co_return std::move(*state.result_);
     578             102 : }
     579                 : 
     580                 : /** Concept for ranges of full I/O awaitables.
     581                 : 
     582                 :     A range satisfies `IoAwaitableRange` if it is a sized input range
     583                 :     whose value type satisfies @ref IoAwaitable. This enables when_any
     584                 :     to accept any container or view of awaitables, not just std::vector.
     585                 : 
     586                 :     @tparam R The range type.
     587                 : 
     588                 :     @par Requirements
     589                 :     @li `R` must satisfy `std::ranges::input_range`
     590                 :     @li `R` must satisfy `std::ranges::sized_range`
     591                 :     @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
     592                 : 
     593                 :     @par Syntactic Requirements
     594                 :     Given `r` of type `R`:
     595                 :     @li `std::ranges::begin(r)` is valid
     596                 :     @li `std::ranges::end(r)` is valid
     597                 :     @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
     598                 :     @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
     599                 : 
     600                 :     @par Example
     601                 :     @code
     602                 :     template<IoAwaitableRange R>
     603                 :     task<void> race_all(R&& awaitables) {
     604                 :         auto winner = co_await when_any(std::forward<R>(awaitables));
     605                 :         // Process winner...
     606                 :     }
     607                 :     @endcode
     608                 : 
     609                 :     @see when_any, IoAwaitable
     610                 : */
     611                 : template<typename R>
     612                 : concept IoAwaitableRange =
     613                 :     std::ranges::input_range<R> &&
     614                 :     std::ranges::sized_range<R> &&
     615                 :     IoAwaitable<std::ranges::range_value_t<R>>;
     616                 : 
     617                 : namespace detail {
     618                 : 
     619                 : /** Shared state for homogeneous when_any (range overload).
     620                 : 
     621                 :     Uses composition with when_any_core for shared functionality.
     622                 :     Simpler than heterogeneous: optional<T> instead of variant, vector
     623                 :     instead of array for runner handles.
     624                 : */
     625                 : template<typename T>
     626                 : struct when_any_homogeneous_state
     627                 : {
     628                 :     when_any_core core_;
     629                 :     std::optional<T> result_;
     630                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     631                 : 
     632              19 :     explicit when_any_homogeneous_state(std::size_t count)
     633              19 :         : core_(count)
     634              38 :         , runner_handles_(count)
     635                 :     {
     636              19 :     }
     637                 : 
     638                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     639                 : 
     640                 :     /** @pre core_.try_win() returned true. */
     641              17 :     void set_winner_result(T value)
     642                 :         noexcept(std::is_nothrow_move_constructible_v<T>)
     643                 :     {
     644              17 :         result_.emplace(std::move(value));
     645              17 :     }
     646                 : };
     647                 : 
     648                 : /** Specialization for void tasks (no result storage needed). */
     649                 : template<>
     650                 : struct when_any_homogeneous_state<void>
     651                 : {
     652                 :     when_any_core core_;
     653                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     654                 : 
     655               3 :     explicit when_any_homogeneous_state(std::size_t count)
     656               3 :         : core_(count)
     657               6 :         , runner_handles_(count)
     658                 :     {
     659               3 :     }
     660                 : 
     661                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     662                 : 
     663                 :     // No set_winner_result - void tasks have no result to store
     664                 : };
     665                 : 
     666                 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     667                 : template<IoAwaitableRange Range>
     668                 : class when_any_homogeneous_launcher
     669                 : {
     670                 :     using Awaitable = std::ranges::range_value_t<Range>;
     671                 :     using T = awaitable_result_t<Awaitable>;
     672                 : 
     673                 :     Range* range_;
     674                 :     when_any_homogeneous_state<T>* state_;
     675                 : 
     676                 : public:
     677              22 :     when_any_homogeneous_launcher(
     678                 :         Range* range,
     679                 :         when_any_homogeneous_state<T>* state)
     680              22 :         : range_(range)
     681              22 :         , state_(state)
     682                 :     {
     683              22 :     }
     684                 : 
     685              22 :     bool await_ready() const noexcept
     686                 :     {
     687              22 :         return std::ranges::empty(*range_);
     688                 :     }
     689                 : 
     690                 :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     691                 :         destroys this object before await_suspend returns. Must not reference
     692                 :         `this` after dispatching begins.
     693                 : 
     694                 :         Two-phase approach:
     695                 :         1. Create all runners (safe - no dispatch yet)
     696                 :         2. Dispatch all runners (any may complete synchronously)
     697                 :     */
     698              22 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     699                 :     {
     700              22 :         state_->core_.continuation_ = continuation;
     701              22 :         state_->core_.caller_env_ = caller_env;
     702                 : 
     703              22 :         if(caller_env->stop_token.stop_possible())
     704                 :         {
     705              14 :             state_->core_.parent_stop_callback_.emplace(
     706               7 :                 caller_env->stop_token,
     707               7 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     708                 : 
     709               7 :             if(caller_env->stop_token.stop_requested())
     710               4 :                 state_->core_.stop_source_.request_stop();
     711                 :         }
     712                 : 
     713              22 :         auto token = state_->core_.stop_source_.get_token();
     714                 : 
     715                 :         // Phase 1: Create all runners without dispatching.
     716                 :         // This iterates over *range_ safely because no runners execute yet.
     717              22 :         std::size_t index = 0;
     718             107 :         for(auto&& a : *range_)
     719                 :         {
     720              85 :             auto runner = make_when_any_runner(
     721              85 :                 std::move(a), state_, index);
     722                 : 
     723              85 :             auto h = runner.release();
     724              85 :             h.promise().state_ = state_;
     725              85 :             h.promise().index_ = index;
     726              85 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
     727                 : 
     728              85 :             state_->runner_handles_[index] = std::coroutine_handle<>{h};
     729              85 :             ++index;
     730                 :         }
     731                 : 
     732                 :         // Phase 2: Post all runners. Any may complete synchronously.
     733                 :         // After last post, state_ and this may be destroyed.
     734                 :         // Use raw pointer/count captured before posting.
     735              22 :         std::coroutine_handle<>* handles = state_->runner_handles_.data();
     736              22 :         std::size_t count = state_->runner_handles_.size();
     737             107 :         for(std::size_t i = 0; i < count; ++i)
     738              85 :             caller_env->executor.post(handles[i]);
     739                 : 
     740              44 :         return std::noop_coroutine();
     741             107 :     }
     742                 : 
     743              22 :     void await_resume() const noexcept
     744                 :     {
     745              22 :     }
     746                 : };
     747                 : 
     748                 : } // namespace detail
     749                 : 
     750                 : /** Wait for the first awaitable to complete (range overload).
     751                 : 
     752                 :     Races a range of awaitables with the same result type. Accepts any
     753                 :     sized input range of IoAwaitable types, enabling use with arrays,
     754                 :     spans, or custom containers.
     755                 : 
     756                 :     @par Suspends
     757                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     758                 :     in the range are launched concurrently and execute in parallel. The
     759                 :     coroutine resumes only after all awaitables have completed, even though
     760                 :     the winner is determined by the first to finish.
     761                 : 
     762                 :     @par Completion Conditions
     763                 :     @li Winner is determined when the first awaitable completes (success or exception)
     764                 :     @li Only one task can claim winner status via atomic compare-exchange
     765                 :     @li Once a winner exists, stop is requested for all remaining siblings
     766                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     767                 :     @li The winner's index and result are returned; if the winner threw, the exception is rethrown
     768                 : 
     769                 :     @par Cancellation Semantics
     770                 :     Cancellation is supported via stop_token propagated through the
     771                 :     IoAwaitable protocol:
     772                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     773                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     774                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     775                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     776                 :     @li Stop requests are cooperative; tasks must check and respond to them
     777                 : 
     778                 :     @par Concurrency/Overlap
     779                 :     All awaitables are launched concurrently before any can complete.
     780                 :     The launcher iterates through the range, starting each task on the
     781                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     782                 :     executors or interleave on single-threaded executors. There is no
     783                 :     guaranteed ordering of task completion.
     784                 : 
     785                 :     @par Notable Error Conditions
     786                 :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     787                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     788                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     789                 :     @li Cancellation: tasks may complete via cancellation without throwing
     790                 : 
     791                 :     @par Example
     792                 :     @code
     793                 :     task<void> example() {
     794                 :         std::array<task<Response>, 3> requests = {
     795                 :             fetch_from_server(0),
     796                 :             fetch_from_server(1),
     797                 :             fetch_from_server(2)
     798                 :         };
     799                 : 
     800                 :         auto [index, response] = co_await when_any(std::move(requests));
     801                 :     }
     802                 :     @endcode
     803                 : 
     804                 :     @par Example with Vector
     805                 :     @code
     806                 :     task<Response> fetch_fastest(std::vector<Server> const& servers) {
     807                 :         std::vector<task<Response>> requests;
     808                 :         for (auto const& server : servers)
     809                 :             requests.push_back(fetch_from(server));
     810                 : 
     811                 :         auto [index, response] = co_await when_any(std::move(requests));
     812                 :         co_return response;
     813                 :     }
     814                 :     @endcode
     815                 : 
     816                 :     @tparam R Range type satisfying IoAwaitableRange.
     817                 :     @param awaitables Range of awaitables to race concurrently (must not be empty).
     818                 :     @return A task yielding a pair of (winner_index, result).
     819                 : 
     820                 :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     821                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     822                 : 
     823                 :     @par Remarks
     824                 :     Elements are moved from the range; for lvalue ranges, the original
     825                 :     container will have moved-from elements after this call. The range
     826                 :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     827                 :     the variadic overload, no variant wrapper is needed since all tasks
     828                 :     share the same return type.
     829                 : 
     830                 :     @see when_any, IoAwaitableRange
     831                 : */
     832                 : template<IoAwaitableRange R>
     833                 :     requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
     834              21 : [[nodiscard]] auto when_any(R&& awaitables)
     835                 :     -> task<std::pair<std::size_t, awaitable_result_t<std::ranges::range_value_t<R>>>>
     836                 : {
     837                 :     using Awaitable = std::ranges::range_value_t<R>;
     838                 :     using T = awaitable_result_t<Awaitable>;
     839                 :     using result_type = std::pair<std::size_t, T>;
     840                 :     using OwnedRange = std::remove_cvref_t<R>;
     841                 : 
     842                 :     auto count = std::ranges::size(awaitables);
     843                 :     if(count == 0)
     844                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     845                 : 
     846                 :     // Move/copy range onto coroutine frame to ensure lifetime
     847                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     848                 : 
     849                 :     detail::when_any_homogeneous_state<T> state(count);
     850                 : 
     851                 :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     852                 : 
     853                 :     if(state.core_.winner_exception_)
     854                 :         std::rethrow_exception(state.core_.winner_exception_);
     855                 : 
     856                 :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     857              42 : }
     858                 : 
     859                 : /** Wait for the first awaitable to complete (void range overload).
     860                 : 
     861                 :     Races a range of void-returning awaitables. Since void awaitables have
     862                 :     no result value, only the winner's index is returned.
     863                 : 
     864                 :     @par Suspends
     865                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     866                 :     in the range are launched concurrently and execute in parallel. The
     867                 :     coroutine resumes only after all awaitables have completed, even though
     868                 :     the winner is determined by the first to finish.
     869                 : 
     870                 :     @par Completion Conditions
     871                 :     @li Winner is determined when the first awaitable completes (success or exception)
     872                 :     @li Only one task can claim winner status via atomic compare-exchange
     873                 :     @li Once a winner exists, stop is requested for all remaining siblings
     874                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     875                 :     @li The winner's index is returned; if the winner threw, the exception is rethrown
     876                 : 
     877                 :     @par Cancellation Semantics
     878                 :     Cancellation is supported via stop_token propagated through the
     879                 :     IoAwaitable protocol:
     880                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     881                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     882                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     883                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     884                 :     @li Stop requests are cooperative; tasks must check and respond to them
     885                 : 
     886                 :     @par Concurrency/Overlap
     887                 :     All awaitables are launched concurrently before any can complete.
     888                 :     The launcher iterates through the range, starting each task on the
     889                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     890                 :     executors or interleave on single-threaded executors. There is no
     891                 :     guaranteed ordering of task completion.
     892                 : 
     893                 :     @par Notable Error Conditions
     894                 :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     895                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     896                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     897                 :     @li Cancellation: tasks may complete via cancellation without throwing
     898                 : 
     899                 :     @par Example
     900                 :     @code
     901                 :     task<void> example() {
     902                 :         std::vector<task<void>> tasks;
     903                 :         for (int i = 0; i < 5; ++i)
     904                 :             tasks.push_back(background_work(i));
     905                 : 
     906                 :         std::size_t winner = co_await when_any(std::move(tasks));
     907                 :         // winner is the index of the first task to complete
     908                 :     }
     909                 :     @endcode
     910                 : 
     911                 :     @par Example with Timeout
     912                 :     @code
     913                 :     task<void> with_timeout() {
     914                 :         std::vector<task<void>> tasks;
     915                 :         tasks.push_back(long_running_operation());
     916                 :         tasks.push_back(delay(std::chrono::seconds(5)));
     917                 : 
     918                 :         std::size_t winner = co_await when_any(std::move(tasks));
     919                 :         if (winner == 1) {
     920                 :             // Timeout occurred
     921                 :         }
     922                 :     }
     923                 :     @endcode
     924                 : 
     925                 :     @tparam R Range type satisfying IoAwaitableRange with void result.
     926                 :     @param awaitables Range of void awaitables to race concurrently (must not be empty).
     927                 :     @return A task yielding the winner's index (zero-based).
     928                 : 
     929                 :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     930                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     931                 : 
     932                 :     @par Remarks
     933                 :     Elements are moved from the range; for lvalue ranges, the original
     934                 :     container will have moved-from elements after this call. The range
     935                 :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     936                 :     the non-void overload, no result storage is needed since void tasks
     937                 :     produce no value.
     938                 : 
     939                 :     @see when_any, IoAwaitableRange
     940                 : */
     941                 : template<IoAwaitableRange R>
     942                 :     requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
     943               3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
     944                 : {
     945                 :     using OwnedRange = std::remove_cvref_t<R>;
     946                 : 
     947                 :     auto count = std::ranges::size(awaitables);
     948                 :     if(count == 0)
     949                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     950                 : 
     951                 :     // Move/copy range onto coroutine frame to ensure lifetime
     952                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     953                 : 
     954                 :     detail::when_any_homogeneous_state<void> state(count);
     955                 : 
     956                 :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     957                 : 
     958                 :     if(state.core_.winner_exception_)
     959                 :         std::rethrow_exception(state.core_.winner_exception_);
     960                 : 
     961                 :     co_return state.core_.winner_index_;
     962               6 : }
     963                 : 
     964                 : } // namespace capy
     965                 : } // namespace boost
     966                 : 
     967                 : #endif
        

Generated by: LCOV version 2.3