include/boost/capy/ex/async_mutex.hpp

98.9% Lines (93/94) 100.0% List of functions (20/20) 90.9% Branches (20/22)
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::capy::async_mutex::lock_awaiter::cancel_fn::operator()() const :180 0 100.0% 50.0% boost::capy::async_mutex::lock_awaiter::stop_cb_() :203 0 100.0% boost::capy::async_mutex::lock_awaiter::~lock_awaiter() :210 0 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::lock_awaiter(boost::capy::async_mutex*) :219 0 100.0% boost::capy::async_mutex::lock_awaiter::lock_awaiter(boost::capy::async_mutex::lock_awaiter&&) :224 0 100.0% boost::capy::async_mutex::lock_awaiter::await_ready() const :239 0 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :251 0 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::await_resume() :269 0 100.0% 100.0% boost::capy::async_mutex::lock_guard::~lock_guard() :299 0 100.0% 100.0% boost::capy::async_mutex::lock_guard::lock_guard() :305 0 100.0% boost::capy::async_mutex::lock_guard::lock_guard(boost::capy::async_mutex*) :310 0 100.0% boost::capy::async_mutex::lock_guard::lock_guard(boost::capy::async_mutex::lock_guard&&) :315 0 100.0% boost::capy::async_mutex::lock_guard_awaiter::lock_guard_awaiter(boost::capy::async_mutex*) :343 0 100.0% boost::capy::async_mutex::lock_guard_awaiter::await_ready() const :349 0 100.0% boost::capy::async_mutex::lock_guard_awaiter::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :356 0 100.0% boost::capy::async_mutex::lock_guard_awaiter::await_resume() :363 0 100.0% 100.0% boost::capy::async_mutex::lock() :391 0 100.0% boost::capy::async_mutex::scoped_lock() :400 0 100.0% boost::capy::async_mutex::unlock() :412 0 88.9% 75.0% boost::capy::async_mutex::is_locked() const :433 0 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 #define BOOST_CAPY_ASYNC_MUTEX_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/error.hpp>
17 #include <boost/capy/ex/io_env.hpp>
18 #include <boost/capy/io_result.hpp>
19
20 #include <stop_token>
21
22 #include <atomic>
23 #include <coroutine>
24 #include <new>
25 #include <utility>
26
27 /* async_mutex implementation notes
28 ================================
29
30 Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 async_mutex::waiters_.
33
34 Cancellation via stop_token
35 ---------------------------
36 A std::stop_callback is registered in await_suspend. Two actors can
37 race to resume the suspended coroutine: unlock() and the stop callback.
38 An atomic bool `claimed_` resolves the race -- whoever does
39 claimed_.exchange(true) and reads false wins. The loser does nothing.
40
41 The stop callback calls ex_.post(h_). The stop_callback is
42 destroyed later in await_resume. cancel_fn touches no members
43 after post returns (same pattern as delete-this).
44
45 unlock() pops waiters from the front. If the popped waiter was
46 already claimed by the stop callback, unlock() skips it and tries
47 the next. await_resume removes the (still-linked) canceled waiter
48 via waiters_.remove(this).
49
50 The stop_callback lives in a union to suppress automatic
51 construction/destruction. Placement new in await_suspend, explicit
52 destructor call in await_resume and ~lock_awaiter.
53
54 Member ordering constraint
55 --------------------------
56 The union containing stop_cb_ must be declared AFTER the members
57 the callback accesses (h_, ex_, claimed_, canceled_). If the
58 stop_cb_ destructor blocks waiting for a concurrent callback, those
59 members must still be alive (C++ destroys in reverse declaration
60 order).
61
62 active_ flag
63 ------------
64 Tracks both list membership and stop_cb_ lifetime (they are always
65 set and cleared together). Used by the destructor to clean up if the
66 coroutine is destroyed while suspended (e.g. execution_context
67 shutdown).
68
69 Cancellation scope
70 ------------------
71 Cancellation only takes effect while the coroutine is suspended in
72 the wait queue. If the mutex is unlocked, await_ready acquires it
73 immediately without checking the stop token. This is intentional:
74 the fast path has no token access and no overhead.
75
76 Threading assumptions
77 ---------------------
78 - All list mutations happen on the executor thread (await_suspend,
79 await_resume, unlock, ~lock_awaiter).
80 - The stop callback may fire from any thread, but only touches
81 claimed_ (atomic) and then calls post. It never touches the
82 list.
83 - ~lock_awaiter must be called from the executor thread. This is
84 guaranteed during normal shutdown but NOT if the coroutine frame
85 is destroyed from another thread while a stop callback could
86 fire (precondition violation, same as cppcoro/folly).
87 */
88
89 namespace boost {
90 namespace capy {
91
92 /** An asynchronous mutex for coroutines.
93
94 This mutex provides mutual exclusion for coroutines without blocking.
95 When a coroutine attempts to acquire a locked mutex, it suspends and
96 is added to an intrusive wait queue. When the holder unlocks, the next
97 waiter is resumed with the lock held.
98
99 @par Cancellation
100
101 When a coroutine is suspended waiting for the mutex and its stop
102 token is triggered, the waiter completes with `error::canceled`
103 instead of acquiring the lock.
104
105 Cancellation only applies while the coroutine is suspended in the
106 wait queue. If the mutex is unlocked when `lock()` is called, the
107 lock is acquired immediately even if the stop token is already
108 signaled.
109
110 @par Zero Allocation
111
112 No heap allocation occurs for lock operations.
113
114 @par Thread Safety
115
116 Distinct objects: Safe.@n
117 Shared objects: Unsafe.
118
119 The mutex operations are designed for single-threaded use on one
120 executor. The stop callback may fire from any thread.
121
122 This type is non-copyable and non-movable because suspended
123 waiters hold intrusive pointers into the mutex's internal list.
124
125 @par Example
126 @code
127 async_mutex cm;
128
129 task<> protected_operation() {
130 auto [ec] = co_await cm.lock();
131 if(ec)
132 co_return;
133 // ... critical section ...
134 cm.unlock();
135 }
136
137 // Or with RAII:
138 task<> protected_operation() {
139 auto [ec, guard] = co_await cm.scoped_lock();
140 if(ec)
141 co_return;
142 // ... critical section ...
143 // unlocks automatically
144 }
145 @endcode
146 */
147 class async_mutex
148 {
149 public:
150 class lock_awaiter;
151 class lock_guard;
152 class lock_guard_awaiter;
153
154 private:
155 bool locked_ = false;
156 detail::intrusive_list<lock_awaiter> waiters_;
157
158 public:
159 /** Awaiter returned by lock().
160 */
161 class lock_awaiter
162 : public detail::intrusive_list<lock_awaiter>::node
163 {
164 friend class async_mutex;
165
166 async_mutex* m_;
167 std::coroutine_handle<> h_;
168 executor_ref ex_;
169
170 // These members must be declared before stop_cb_
171 // (see comment on the union below).
172 std::atomic<bool> claimed_{false};
173 bool canceled_ = false;
174 bool active_ = false;
175
176 struct cancel_fn
177 {
178 lock_awaiter* self_;
179
180 6x void operator()() const noexcept
181 {
182
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6x if(!self_->claimed_.exchange(
183 true, std::memory_order_acq_rel))
184 {
185 6x self_->canceled_ = true;
186 6x self_->ex_.post(self_->h_);
187 }
188 6x }
189 };
190
191 using stop_cb_t =
192 std::stop_callback<cancel_fn>;
193
194 // Aligned storage for stop_cb_t. Declared last:
195 // its destructor may block while the callback
196 // accesses the members above.
197 BOOST_CAPY_MSVC_WARNING_PUSH
198 BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
199 alignas(stop_cb_t)
200 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
201 BOOST_CAPY_MSVC_WARNING_POP
202
203 17x stop_cb_t& stop_cb_() noexcept
204 {
205 return *reinterpret_cast<stop_cb_t*>(
206 17x stop_cb_buf_);
207 }
208
209 public:
210 70x ~lock_awaiter()
211 {
212
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 67 times.
70x if(active_)
213 {
214 3x stop_cb_().~stop_cb_t();
215 3x m_->waiters_.remove(this);
216 }
217 70x }
218
219 35x explicit lock_awaiter(async_mutex* m) noexcept
220 35x : m_(m)
221 {
222 35x }
223
224 35x lock_awaiter(lock_awaiter&& o) noexcept
225 35x : m_(o.m_)
226 35x , h_(o.h_)
227 35x , ex_(o.ex_)
228 35x , claimed_(o.claimed_.load(
229 std::memory_order_relaxed))
230 35x , canceled_(o.canceled_)
231 35x , active_(std::exchange(o.active_, false))
232 {
233 35x }
234
235 lock_awaiter(lock_awaiter const&) = delete;
236 lock_awaiter& operator=(lock_awaiter const&) = delete;
237 lock_awaiter& operator=(lock_awaiter&&) = delete;
238
239 35x bool await_ready() const noexcept
240 {
241
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 19 times.
35x if(!m_->locked_)
242 {
243 16x m_->locked_ = true;
244 16x return true;
245 }
246 19x return false;
247 }
248
249 /** IoAwaitable protocol overload. */
250 std::coroutine_handle<>
251 19x await_suspend(
252 std::coroutine_handle<> h,
253 io_env const* env) noexcept
254 {
255
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 17 times.
19x if(env->stop_token.stop_requested())
256 {
257 2x canceled_ = true;
258 2x return h;
259 }
260 17x h_ = h;
261 17x ex_ = env->executor;
262 17x m_->waiters_.push_back(this);
263 51x ::new(stop_cb_buf_) stop_cb_t(
264 17x env->stop_token, cancel_fn{this});
265 17x active_ = true;
266 17x return std::noop_coroutine();
267 }
268
269 32x io_result<> await_resume() noexcept
270 {
271
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 18 times.
32x if(active_)
272 {
273 14x stop_cb_().~stop_cb_t();
274
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 8 times.
14x if(canceled_)
275 {
276 6x m_->waiters_.remove(this);
277 6x active_ = false;
278 6x return {make_error_code(
279 6x error::canceled)};
280 }
281 8x active_ = false;
282 }
283
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 24 times.
26x if(canceled_)
284 2x return {make_error_code(
285 2x error::canceled)};
286 24x return {{}};
287 }
288 };
289
290 /** RAII lock guard for async_mutex.
291
292 Automatically unlocks the mutex when destroyed.
293 */
294 class [[nodiscard]] lock_guard
295 {
296 async_mutex* m_;
297
298 public:
299 5x ~lock_guard()
300 {
301
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5x if(m_)
302 2x m_->unlock();
303 5x }
304
305 2x lock_guard() noexcept
306 2x : m_(nullptr)
307 {
308 2x }
309
310 2x explicit lock_guard(async_mutex* m) noexcept
311 2x : m_(m)
312 {
313 2x }
314
315 1x lock_guard(lock_guard&& o) noexcept
316 1x : m_(std::exchange(o.m_, nullptr))
317 {
318 1x }
319
320 lock_guard& operator=(lock_guard&& o) noexcept
321 {
322 if(this != &o)
323 {
324 if(m_)
325 m_->unlock();
326 m_ = std::exchange(o.m_, nullptr);
327 }
328 return *this;
329 }
330
331 lock_guard(lock_guard const&) = delete;
332 lock_guard& operator=(lock_guard const&) = delete;
333 };
334
335 /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
336 */
337 class lock_guard_awaiter
338 {
339 async_mutex* m_;
340 lock_awaiter inner_;
341
342 public:
343 4x explicit lock_guard_awaiter(async_mutex* m) noexcept
344 4x : m_(m)
345 4x , inner_(m)
346 {
347 4x }
348
349 4x bool await_ready() const noexcept
350 {
351 4x return inner_.await_ready();
352 }
353
354 /** IoAwaitable protocol overload. */
355 std::coroutine_handle<>
356 2x await_suspend(
357 std::coroutine_handle<> h,
358 io_env const* env) noexcept
359 {
360 2x return inner_.await_suspend(h, env);
361 }
362
363 4x io_result<lock_guard> await_resume() noexcept
364 {
365 4x auto r = inner_.await_resume();
366
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4x if(r.ec)
367 2x return {r.ec, {}};
368 2x return {{}, lock_guard(m_)};
369 4x }
370 };
371
372 /// Construct an unlocked mutex.
373 async_mutex() = default;
374
375 /// Copy constructor (deleted).
376 async_mutex(async_mutex const&) = delete;
377
378 /// Copy assignment (deleted).
379 async_mutex& operator=(async_mutex const&) = delete;
380
381 /// Move constructor (deleted).
382 async_mutex(async_mutex&&) = delete;
383
384 /// Move assignment (deleted).
385 async_mutex& operator=(async_mutex&&) = delete;
386
387 /** Returns an awaiter that acquires the mutex.
388
389 @return An awaitable that await-returns `(error_code)`.
390 */
391 31x lock_awaiter lock() noexcept
392 {
393 31x return lock_awaiter{this};
394 }
395
396 /** Returns an awaiter that acquires the mutex with RAII.
397
398 @return An awaitable that await-returns `(error_code,lock_guard)`.
399 */
400 4x lock_guard_awaiter scoped_lock() noexcept
401 {
402 4x return lock_guard_awaiter(this);
403 }
404
405 /** Releases the mutex.
406
407 If waiters are queued, the next eligible waiter is
408 resumed with the lock held. Canceled waiters are
409 skipped. If no eligible waiter remains, the mutex
410 becomes unlocked.
411 */
412 24x void unlock() noexcept
413 {
414 for(;;)
415 {
416 24x auto* waiter = waiters_.pop_front();
417
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 8 times.
24x if(!waiter)
418 {
419 16x locked_ = false;
420 16x return;
421 }
422
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8x if(!waiter->claimed_.exchange(
423 true, std::memory_order_acq_rel))
424 {
425 8x waiter->ex_.post(waiter->h_);
426 8x return;
427 }
428 }
429 }
430
431 /** Returns true if the mutex is currently locked.
432 */
433 26x bool is_locked() const noexcept
434 {
435 26x return locked_;
436 }
437 };
438
439 } // namespace capy
440 } // namespace boost
441
442 #endif
443