TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 : #define BOOST_CAPY_ASYNC_MUTEX_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/error.hpp>
17 : #include <boost/capy/ex/io_env.hpp>
18 : #include <boost/capy/io_result.hpp>
19 :
20 : #include <stop_token>
21 :
22 : #include <atomic>
23 : #include <coroutine>
24 : #include <new>
25 : #include <utility>
26 :
27 : /* async_mutex implementation notes
28 : ================================
29 :
30 : Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 : inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 : async_mutex::waiters_.
33 :
34 : Cancellation via stop_token
35 : ---------------------------
36 : A std::stop_callback is registered in await_suspend. Two actors can
37 : race to resume the suspended coroutine: unlock() and the stop callback.
38 : An atomic bool `claimed_` resolves the race -- whoever does
39 : claimed_.exchange(true) and reads false wins. The loser does nothing.
40 :
41 : The stop callback calls ex_.post(h_). The stop_callback is
42 : destroyed later in await_resume. cancel_fn touches no members
43 : after post returns (same pattern as delete-this).
44 :
45 : unlock() pops waiters from the front. If the popped waiter was
46 : already claimed by the stop callback, unlock() skips it and tries
47 : the next. await_resume removes the (still-linked) canceled waiter
48 : via waiters_.remove(this).
49 :
50 : The stop_callback lives in a union to suppress automatic
51 : construction/destruction. Placement new in await_suspend, explicit
52 : destructor call in await_resume and ~lock_awaiter.
53 :
54 : Member ordering constraint
55 : --------------------------
56 : The union containing stop_cb_ must be declared AFTER the members
57 : the callback accesses (h_, ex_, claimed_, canceled_). If the
58 : stop_cb_ destructor blocks waiting for a concurrent callback, those
59 : members must still be alive (C++ destroys in reverse declaration
60 : order).
61 :
62 : active_ flag
63 : ------------
64 : Tracks both list membership and stop_cb_ lifetime (they are always
65 : set and cleared together). Used by the destructor to clean up if the
66 : coroutine is destroyed while suspended (e.g. execution_context
67 : shutdown).
68 :
69 : Cancellation scope
70 : ------------------
71 : Cancellation only takes effect while the coroutine is suspended in
72 : the wait queue. If the mutex is unlocked, await_ready acquires it
73 : immediately without checking the stop token. This is intentional:
74 : the fast path has no token access and no overhead.
75 :
76 : Threading assumptions
77 : ---------------------
78 : - All list mutations happen on the executor thread (await_suspend,
79 : await_resume, unlock, ~lock_awaiter).
80 : - The stop callback may fire from any thread, but only touches
81 : claimed_ (atomic) and then calls post. It never touches the
82 : list.
83 : - ~lock_awaiter must be called from the executor thread. This is
84 : guaranteed during normal shutdown but NOT if the coroutine frame
85 : is destroyed from another thread while a stop callback could
86 : fire (precondition violation, same as cppcoro/folly).
87 : */
88 :
89 : namespace boost {
90 : namespace capy {
91 :
92 : /** An asynchronous mutex for coroutines.
93 :
94 : This mutex provides mutual exclusion for coroutines without blocking.
95 : When a coroutine attempts to acquire a locked mutex, it suspends and
96 : is added to an intrusive wait queue. When the holder unlocks, the next
97 : waiter is resumed with the lock held.
98 :
99 : @par Cancellation
100 :
101 : When a coroutine is suspended waiting for the mutex and its stop
102 : token is triggered, the waiter completes with `error::canceled`
103 : instead of acquiring the lock.
104 :
105 : Cancellation only applies while the coroutine is suspended in the
106 : wait queue. If the mutex is unlocked when `lock()` is called, the
107 : lock is acquired immediately even if the stop token is already
108 : signaled.
109 :
110 : @par Zero Allocation
111 :
112 : No heap allocation occurs for lock operations.
113 :
114 : @par Thread Safety
115 :
116 : Distinct objects: Safe.@n
117 : Shared objects: Unsafe.
118 :
119 : The mutex operations are designed for single-threaded use on one
120 : executor. The stop callback may fire from any thread.
121 :
122 : This type is non-copyable and non-movable because suspended
123 : waiters hold intrusive pointers into the mutex's internal list.
124 :
125 : @par Example
126 : @code
127 : async_mutex cm;
128 :
129 : task<> protected_operation() {
130 : auto [ec] = co_await cm.lock();
131 : if(ec)
132 : co_return;
133 : // ... critical section ...
134 : cm.unlock();
135 : }
136 :
137 : // Or with RAII:
138 : task<> protected_operation() {
139 : auto [ec, guard] = co_await cm.scoped_lock();
140 : if(ec)
141 : co_return;
142 : // ... critical section ...
143 : // unlocks automatically
144 : }
145 : @endcode
146 : */
147 : class async_mutex
148 : {
149 : public:
150 : class lock_awaiter;
151 : class lock_guard;
152 : class lock_guard_awaiter;
153 :
154 : private:
155 : bool locked_ = false;
156 : detail::intrusive_list<lock_awaiter> waiters_;
157 :
158 : public:
159 : /** Awaiter returned by lock().
160 : */
161 : class lock_awaiter
162 : : public detail::intrusive_list<lock_awaiter>::node
163 : {
164 : friend class async_mutex;
165 :
166 : async_mutex* m_;
167 : std::coroutine_handle<> h_;
168 : executor_ref ex_;
169 :
170 : // These members must be declared before stop_cb_
171 : // (see comment on the union below).
172 : std::atomic<bool> claimed_{false};
173 : bool canceled_ = false;
174 : bool active_ = false;
175 :
176 : struct cancel_fn
177 : {
178 : lock_awaiter* self_;
179 :
180 HIT 6 : void operator()() const noexcept
181 : {
182 6 : if(!self_->claimed_.exchange(
183 : true, std::memory_order_acq_rel))
184 : {
185 6 : self_->canceled_ = true;
186 6 : self_->ex_.post(self_->h_);
187 : }
188 6 : }
189 : };
190 :
191 : using stop_cb_t =
192 : std::stop_callback<cancel_fn>;
193 :
194 : // Aligned storage for stop_cb_t. Declared last:
195 : // its destructor may block while the callback
196 : // accesses the members above.
197 : BOOST_CAPY_MSVC_WARNING_PUSH
198 : BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
199 : alignas(stop_cb_t)
200 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
201 : BOOST_CAPY_MSVC_WARNING_POP
202 :
203 17 : stop_cb_t& stop_cb_() noexcept
204 : {
205 : return *reinterpret_cast<stop_cb_t*>(
206 17 : stop_cb_buf_);
207 : }
208 :
209 : public:
210 70 : ~lock_awaiter()
211 : {
212 70 : if(active_)
213 : {
214 3 : stop_cb_().~stop_cb_t();
215 3 : m_->waiters_.remove(this);
216 : }
217 70 : }
218 :
219 35 : explicit lock_awaiter(async_mutex* m) noexcept
220 35 : : m_(m)
221 : {
222 35 : }
223 :
224 35 : lock_awaiter(lock_awaiter&& o) noexcept
225 35 : : m_(o.m_)
226 35 : , h_(o.h_)
227 35 : , ex_(o.ex_)
228 35 : , claimed_(o.claimed_.load(
229 : std::memory_order_relaxed))
230 35 : , canceled_(o.canceled_)
231 35 : , active_(std::exchange(o.active_, false))
232 : {
233 35 : }
234 :
235 : lock_awaiter(lock_awaiter const&) = delete;
236 : lock_awaiter& operator=(lock_awaiter const&) = delete;
237 : lock_awaiter& operator=(lock_awaiter&&) = delete;
238 :
239 35 : bool await_ready() const noexcept
240 : {
241 35 : if(!m_->locked_)
242 : {
243 16 : m_->locked_ = true;
244 16 : return true;
245 : }
246 19 : return false;
247 : }
248 :
249 : /** IoAwaitable protocol overload. */
250 : std::coroutine_handle<>
251 19 : await_suspend(
252 : std::coroutine_handle<> h,
253 : io_env const* env) noexcept
254 : {
255 19 : if(env->stop_token.stop_requested())
256 : {
257 2 : canceled_ = true;
258 2 : return h;
259 : }
260 17 : h_ = h;
261 17 : ex_ = env->executor;
262 17 : m_->waiters_.push_back(this);
263 51 : ::new(stop_cb_buf_) stop_cb_t(
264 17 : env->stop_token, cancel_fn{this});
265 17 : active_ = true;
266 17 : return std::noop_coroutine();
267 : }
268 :
269 32 : io_result<> await_resume() noexcept
270 : {
271 32 : if(active_)
272 : {
273 14 : stop_cb_().~stop_cb_t();
274 14 : if(canceled_)
275 : {
276 6 : m_->waiters_.remove(this);
277 6 : active_ = false;
278 6 : return {make_error_code(
279 6 : error::canceled)};
280 : }
281 8 : active_ = false;
282 : }
283 26 : if(canceled_)
284 2 : return {make_error_code(
285 2 : error::canceled)};
286 24 : return {{}};
287 : }
288 : };
289 :
290 : /** RAII lock guard for async_mutex.
291 :
292 : Automatically unlocks the mutex when destroyed.
293 : */
294 : class [[nodiscard]] lock_guard
295 : {
296 : async_mutex* m_;
297 :
298 : public:
299 5 : ~lock_guard()
300 : {
301 5 : if(m_)
302 2 : m_->unlock();
303 5 : }
304 :
305 2 : lock_guard() noexcept
306 2 : : m_(nullptr)
307 : {
308 2 : }
309 :
310 2 : explicit lock_guard(async_mutex* m) noexcept
311 2 : : m_(m)
312 : {
313 2 : }
314 :
315 1 : lock_guard(lock_guard&& o) noexcept
316 1 : : m_(std::exchange(o.m_, nullptr))
317 : {
318 1 : }
319 :
320 : lock_guard& operator=(lock_guard&& o) noexcept
321 : {
322 : if(this != &o)
323 : {
324 : if(m_)
325 : m_->unlock();
326 : m_ = std::exchange(o.m_, nullptr);
327 : }
328 : return *this;
329 : }
330 :
331 : lock_guard(lock_guard const&) = delete;
332 : lock_guard& operator=(lock_guard const&) = delete;
333 : };
334 :
335 : /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
336 : */
337 : class lock_guard_awaiter
338 : {
339 : async_mutex* m_;
340 : lock_awaiter inner_;
341 :
342 : public:
343 4 : explicit lock_guard_awaiter(async_mutex* m) noexcept
344 4 : : m_(m)
345 4 : , inner_(m)
346 : {
347 4 : }
348 :
349 4 : bool await_ready() const noexcept
350 : {
351 4 : return inner_.await_ready();
352 : }
353 :
354 : /** IoAwaitable protocol overload. */
355 : std::coroutine_handle<>
356 2 : await_suspend(
357 : std::coroutine_handle<> h,
358 : io_env const* env) noexcept
359 : {
360 2 : return inner_.await_suspend(h, env);
361 : }
362 :
363 4 : io_result<lock_guard> await_resume() noexcept
364 : {
365 4 : auto r = inner_.await_resume();
366 4 : if(r.ec)
367 2 : return {r.ec, {}};
368 2 : return {{}, lock_guard(m_)};
369 4 : }
370 : };
371 :
372 : /// Construct an unlocked mutex.
373 : async_mutex() = default;
374 :
375 : /// Copy constructor (deleted).
376 : async_mutex(async_mutex const&) = delete;
377 :
378 : /// Copy assignment (deleted).
379 : async_mutex& operator=(async_mutex const&) = delete;
380 :
381 : /// Move constructor (deleted).
382 : async_mutex(async_mutex&&) = delete;
383 :
384 : /// Move assignment (deleted).
385 : async_mutex& operator=(async_mutex&&) = delete;
386 :
387 : /** Returns an awaiter that acquires the mutex.
388 :
389 : @return An awaitable that await-returns `(error_code)`.
390 : */
391 31 : lock_awaiter lock() noexcept
392 : {
393 31 : return lock_awaiter{this};
394 : }
395 :
396 : /** Returns an awaiter that acquires the mutex with RAII.
397 :
398 : @return An awaitable that await-returns `(error_code,lock_guard)`.
399 : */
400 4 : lock_guard_awaiter scoped_lock() noexcept
401 : {
402 4 : return lock_guard_awaiter(this);
403 : }
404 :
405 : /** Releases the mutex.
406 :
407 : If waiters are queued, the next eligible waiter is
408 : resumed with the lock held. Canceled waiters are
409 : skipped. If no eligible waiter remains, the mutex
410 : becomes unlocked.
411 : */
412 24 : void unlock() noexcept
413 : {
414 : for(;;)
415 : {
416 24 : auto* waiter = waiters_.pop_front();
417 24 : if(!waiter)
418 : {
419 16 : locked_ = false;
420 16 : return;
421 : }
422 8 : if(!waiter->claimed_.exchange(
423 : true, std::memory_order_acq_rel))
424 : {
425 8 : waiter->ex_.post(waiter->h_);
426 8 : return;
427 : }
428 MIS 0 : }
429 : }
430 :
431 : /** Returns true if the mutex is currently locked.
432 : */
433 HIT 26 : bool is_locked() const noexcept
434 : {
435 26 : return locked_;
436 : }
437 : };
438 :
439 : } // namespace capy
440 : } // namespace boost
441 :
442 : #endif
|