include/boost/capy/ex/async_event.hpp

100.0% Lines (68/68) 100.0% List of functions (13/13) 94.4% Branches (17/18)
f(x) Functions (13)
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11 #define BOOST_CAPY_ASYNC_EVENT_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/error.hpp>
17 #include <boost/capy/ex/io_env.hpp>
18 #include <boost/capy/io_result.hpp>
19
20 #include <stop_token>
21
22 #include <atomic>
23 #include <coroutine>
24 #include <new>
25 #include <utility>
26
27 /* async_event implementation notes
28 =================================
29
30 Same cancellation pattern as async_mutex (see that file for the
31 full discussion on claimed_, stop_cb lifetime, member ordering,
32 and threading assumptions).
33
34 Key difference: set() wakes ALL waiters (broadcast), not one.
35 It pops every waiter from the list and posts the ones it
36 claims. Waiters already claimed by a stop callback are skipped.
37
38 Because set() pops all waiters, a canceled waiter may have been
39 removed from the list by set() before its await_resume runs.
40 This requires a separate in_list_ flag (unlike async_mutex where
41 active_ served double duty). await_resume only calls remove()
42 when in_list_ is true.
43 */
44
45 namespace boost {
46 namespace capy {
47
48 /** An asynchronous event for coroutines.
49
50 This event provides a way to notify multiple coroutines that some
51 condition has occurred. When a coroutine awaits an unset event, it
52 suspends and is added to a wait queue. When the event is set, all
53 waiting coroutines are resumed.
54
55 @par Cancellation
56
57 When a coroutine is suspended waiting for the event and its stop
58 token is triggered, the waiter completes with `error::canceled`
59 instead of waiting for `set()`.
60
61 Cancellation only applies while the coroutine is suspended in the
62 wait queue. If the event is already set when `wait()` is called,
63 the wait completes immediately even if the stop token is already
64 signaled.
65
66 @par Zero Allocation
67
68 No heap allocation occurs for wait operations.
69
70 @par Thread Safety
71
72 Distinct objects: Safe.@n
73 Shared objects: Unsafe.
74
75 The event operations are designed for single-threaded use on one
76 executor. The stop callback may fire from any thread.
77
78 This type is non-copyable and non-movable because suspended
79 waiters hold intrusive pointers into the event's internal list.
80
81 @par Example
82 @code
83 async_event event;
84
85 task<> waiter() {
86 auto [ec] = co_await event.wait();
87 if(ec)
88 co_return;
89 // ... event was set ...
90 }
91
92 task<> notifier() {
93 // ... do some work ...
94 event.set(); // Wake all waiters
95 }
96 @endcode
97 */
98 class async_event
99 {
100 public:
101 class wait_awaiter;
102
103 private:
104 bool set_ = false;
105 detail::intrusive_list<wait_awaiter> waiters_;
106
107 public:
108 /** Awaiter returned by wait().
109 */
110 class wait_awaiter
111 : public detail::intrusive_list<wait_awaiter>::node
112 {
113 friend class async_event;
114
115 async_event* e_;
116 std::coroutine_handle<> h_;
117 executor_ref ex_;
118
119 // Declared before stop_cb_buf_: the callback
120 // accesses these members, so they must still be
121 // alive if the stop_cb_ destructor blocks.
122 std::atomic<bool> claimed_{false};
123 bool canceled_ = false;
124 bool active_ = false;
125 bool in_list_ = false;
126
127 struct cancel_fn
128 {
129 wait_awaiter* self_;
130
131 21x void operator()() const noexcept
132 {
133
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 1 time.
21x if(!self_->claimed_.exchange(
134 true, std::memory_order_acq_rel))
135 {
136 20x self_->canceled_ = true;
137 20x self_->ex_.post(self_->h_);
138 }
139 21x }
140 };
141
142 using stop_cb_t =
143 std::stop_callback<cancel_fn>;
144
145 // Aligned storage for stop_cb_t. Declared last:
146 // its destructor may block while the callback
147 // accesses the members above.
148 BOOST_CAPY_MSVC_WARNING_PUSH
149 BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
150 alignas(stop_cb_t)
151 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
152 BOOST_CAPY_MSVC_WARNING_POP
153
154 37x stop_cb_t& stop_cb_() noexcept
155 {
156 return *reinterpret_cast<stop_cb_t*>(
157 37x stop_cb_buf_);
158 }
159
160 public:
161 251x ~wait_awaiter()
162 {
163
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 250 times.
251x if(active_)
164 1x stop_cb_().~stop_cb_t();
165
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 250 times.
251x if(in_list_)
166 1x e_->waiters_.remove(this);
167 251x }
168
169 57x explicit wait_awaiter(async_event* e) noexcept
170 57x : e_(e)
171 {
172 57x }
173
174 194x wait_awaiter(wait_awaiter&& o) noexcept
175 194x : e_(o.e_)
176 194x , h_(o.h_)
177 194x , ex_(o.ex_)
178 194x , claimed_(o.claimed_.load(
179 std::memory_order_relaxed))
180 194x , canceled_(o.canceled_)
181 194x , active_(std::exchange(o.active_, false))
182 194x , in_list_(std::exchange(o.in_list_, false))
183 {
184 194x }
185
186 wait_awaiter(wait_awaiter const&) = delete;
187 wait_awaiter& operator=(wait_awaiter const&) = delete;
188 wait_awaiter& operator=(wait_awaiter&&) = delete;
189
190 57x bool await_ready() const noexcept
191 {
192 57x return e_->set_;
193 }
194
195 /** IoAwaitable protocol overload. */
196 std::coroutine_handle<>
197 47x await_suspend(
198 std::coroutine_handle<> h,
199 io_env const* env) noexcept
200 {
201
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 37 times.
47x if(env->stop_token.stop_requested())
202 {
203 10x canceled_ = true;
204 10x return h;
205 }
206 37x h_ = h;
207 37x ex_ = env->executor;
208 37x e_->waiters_.push_back(this);
209 37x in_list_ = true;
210 111x ::new(stop_cb_buf_) stop_cb_t(
211 37x env->stop_token, cancel_fn{this});
212 37x active_ = true;
213 37x return std::noop_coroutine();
214 }
215
216 54x io_result<> await_resume() noexcept
217 {
218
2/2
✓ Branch 0 taken 36 times.
✓ Branch 1 taken 18 times.
54x if(active_)
219 {
220 36x stop_cb_().~stop_cb_t();
221 36x active_ = false;
222 }
223
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 24 times.
54x if(canceled_)
224 {
225
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 10 times.
30x if(in_list_)
226 {
227 20x e_->waiters_.remove(this);
228 20x in_list_ = false;
229 }
230 30x return {make_error_code(
231 30x error::canceled)};
232 }
233 24x return {{}};
234 }
235 };
236
237 /// Construct an unset event.
238 20x async_event() = default;
239
240 /// Copy constructor (deleted).
241 async_event(async_event const&) = delete;
242
243 /// Copy assignment (deleted).
244 async_event& operator=(async_event const&) = delete;
245
246 /// Move constructor (deleted).
247 async_event(async_event&&) = delete;
248
249 /// Move assignment (deleted).
250 async_event& operator=(async_event&&) = delete;
251
252 /** Returns an awaiter that waits until the event is set.
253
254 If the event is already set, completes immediately.
255
256 @return An awaitable that await-returns `(error_code)`.
257 */
258 57x wait_awaiter wait() noexcept
259 {
260 57x return wait_awaiter{this};
261 }
262
263 /** Sets the event.
264
265 All waiting coroutines are resumed. Canceled waiters
266 are skipped. Subsequent calls to wait() complete
267 immediately until clear() is called.
268 */
269 23x void set()
270 {
271 23x set_ = true;
272 for(;;)
273 {
274 39x auto* w = waiters_.pop_front();
275
2/2
✓ Branch 0 taken 23 times.
✓ Branch 1 taken 16 times.
39x if(!w)
276 23x break;
277 16x w->in_list_ = false;
278
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16x if(!w->claimed_.exchange(
279 true, std::memory_order_acq_rel))
280 {
281 16x w->ex_.post(w->h_);
282 }
283 16x }
284 23x }
285
286 /** Clears the event.
287
288 Subsequent calls to wait() will suspend until
289 set() is called again.
290 */
291 2x void clear() noexcept
292 {
293 2x set_ = false;
294 2x }
295
296 /** Returns true if the event is currently set.
297 */
298 9x bool is_set() const noexcept
299 {
300 9x return set_;
301 }
302 };
303
304 } // namespace capy
305 } // namespace boost
306
307 #endif
308