src/ex/detail/strand_service.cpp
97.8% Lines (89/91)
95.5% List of functions (21/22)
90.6% Branches (29/32)
Functions (22)
Function
Calls
Lines
Branches
Blocks
boost::capy::detail::strand_invoker::promise_type::operator new(unsigned long, boost::capy::detail::strand_impl&)
:53
0
100.0%
75.0%
–
boost::capy::detail::strand_invoker::promise_type::operator delete(void*, unsigned long)
:70
0
87.5%
50.0%
–
boost::capy::detail::strand_invoker::promise_type::get_return_object()
:84
0
100.0%
–
–
boost::capy::detail::strand_invoker::promise_type::initial_suspend()
:87
0
100.0%
–
–
boost::capy::detail::strand_invoker::promise_type::final_suspend()
:88
0
100.0%
–
–
boost::capy::detail::strand_invoker::promise_type::return_void()
:89
0
100.0%
–
–
boost::capy::detail::strand_invoker::promise_type::unhandled_exception()
:90
0
0.0%
–
–
boost::capy::detail::strand_service_impl::strand_service_impl(boost::capy::execution_context&)
:112
0
100.0%
–
–
boost::capy::detail::strand_service_impl::get_implementation()
:117
0
100.0%
100.0%
–
boost::capy::detail::strand_service_impl::shutdown()
:127
0
100.0%
100.0%
–
boost::capy::detail::strand_service_impl::enqueue(boost::capy::detail::strand_impl&, std::__n4861::coroutine_handle<void>)
:143
0
100.0%
100.0%
–
boost::capy::detail::strand_service_impl::dispatch_pending(boost::capy::detail::strand_impl&)
:156
0
100.0%
100.0%
–
boost::capy::detail::strand_service_impl::try_unlock(boost::capy::detail::strand_impl&)
:167
0
100.0%
100.0%
–
boost::capy::detail::strand_service_impl::set_dispatch_thread(boost::capy::detail::strand_impl&)
:179
0
100.0%
–
–
boost::capy::detail::strand_service_impl::clear_dispatch_thread(boost::capy::detail::strand_impl&)
:185
0
100.0%
–
–
boost::capy::detail::strand_service_impl::make_invoker(boost::capy::detail::strand_impl&)
:193
0
100.0%
100.0%
–
boost::capy::detail::strand_service::strand_service()
:213
0
100.0%
–
–
boost::capy::detail::strand_service::~strand_service()
:219
0
100.0%
–
–
boost::capy::detail::strand_service::running_in_this_thread(boost::capy::detail::strand_impl&)
:223
0
100.0%
–
–
boost::capy::detail::strand_service::dispatch(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>)
:230
0
100.0%
83.3%
–
boost::capy::detail::strand_service::post(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>)
:242
0
100.0%
100.0%
–
boost::capy::detail::get_strand_service(boost::capy::execution_context&)
:250
0
100.0%
–
–
| Line | Branch | TLA | Hits | Source Code |
|---|---|---|---|---|
| 1 | // | |||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | |||
| 3 | // | |||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||
| 6 | // | |||
| 7 | // Official repository: https://github.com/cppalliance/capy | |||
| 8 | // | |||
| 9 | ||||
| 10 | #include "src/ex/detail/strand_queue.hpp" | |||
| 11 | #include <boost/capy/ex/detail/strand_service.hpp> | |||
| 12 | #include <atomic> | |||
| 13 | #include <coroutine> | |||
| 14 | #include <mutex> | |||
| 15 | #include <thread> | |||
| 16 | #include <utility> | |||
| 17 | ||||
| 18 | namespace boost { | |||
| 19 | namespace capy { | |||
| 20 | namespace detail { | |||
| 21 | ||||
| 22 | //---------------------------------------------------------- | |||
| 23 | ||||
| 24 | /** Implementation state for a strand. | |||
| 25 | ||||
| 26 | Each strand_impl provides serialization for coroutines | |||
| 27 | dispatched through strands that share it. | |||
| 28 | */ | |||
| 29 | // Sentinel stored in cached_frame_ after shutdown to prevent | |||
| 30 | // in-flight invokers from repopulating a freed cache slot. | |||
| 31 | inline void* const kCacheClosed = reinterpret_cast<void*>(1); | |||
| 32 | ||||
| 33 | struct strand_impl | |||
| 34 | { | |||
| 35 | std::mutex mutex_; | |||
| 36 | strand_queue pending_; | |||
| 37 | bool locked_ = false; | |||
| 38 | std::atomic<std::thread::id> dispatch_thread_{}; | |||
| 39 | std::atomic<void*> cached_frame_{nullptr}; | |||
| 40 | }; | |||
| 41 | ||||
| 42 | //---------------------------------------------------------- | |||
| 43 | ||||
| 44 | /** Invoker coroutine for strand dispatch. | |||
| 45 | ||||
| 46 | Uses custom allocator to recycle frame - one allocation | |||
| 47 | per strand_impl lifetime, stored in trailer for recovery. | |||
| 48 | */ | |||
| 49 | struct strand_invoker | |||
| 50 | { | |||
| 51 | struct promise_type | |||
| 52 | { | |||
| 53 | 12x | void* operator new(std::size_t n, strand_impl& impl) | ||
| 54 | { | |||
| 55 | 12x | constexpr auto A = alignof(strand_impl*); | ||
| 56 | 12x | std::size_t padded = (n + A - 1) & ~(A - 1); | ||
| 57 | 12x | std::size_t total = padded + sizeof(strand_impl*); | ||
| 58 | ||||
| 59 | 12x | void* p = impl.cached_frame_.exchange( | ||
| 60 | nullptr, std::memory_order_acquire); | |||
| 61 |
3/4✓ Branch 0 taken 2 times.
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
|
12x | if(!p || p == kCacheClosed) | |
| 62 | 10x | p = ::operator new(total); | ||
| 63 | ||||
| 64 | // Trailer lets delete recover impl | |||
| 65 | 12x | *reinterpret_cast<strand_impl**>( | ||
| 66 | 12x | static_cast<char*>(p) + padded) = &impl; | ||
| 67 | 12x | return p; | ||
| 68 | } | |||
| 69 | ||||
| 70 | 12x | void operator delete(void* p, std::size_t n) noexcept | ||
| 71 | { | |||
| 72 | 12x | constexpr auto A = alignof(strand_impl*); | ||
| 73 | 12x | std::size_t padded = (n + A - 1) & ~(A - 1); | ||
| 74 | ||||
| 75 | 12x | auto* impl = *reinterpret_cast<strand_impl**>( | ||
| 76 | static_cast<char*>(p) + padded); | |||
| 77 | ||||
| 78 | 12x | void* expected = nullptr; | ||
| 79 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 12 times.
|
12x | if(!impl->cached_frame_.compare_exchange_strong( | |
| 80 | expected, p, std::memory_order_release)) | |||
| 81 | ✗ | ::operator delete(p); | ||
| 82 | 12x | } | ||
| 83 | ||||
| 84 | 12x | strand_invoker get_return_object() noexcept | ||
| 85 | 12x | { return {std::coroutine_handle<promise_type>::from_promise(*this)}; } | ||
| 86 | ||||
| 87 | 12x | std::suspend_always initial_suspend() noexcept { return {}; } | ||
| 88 | 12x | std::suspend_never final_suspend() noexcept { return {}; } | ||
| 89 | 12x | void return_void() noexcept {} | ||
| 90 | ✗ | void unhandled_exception() { std::terminate(); } | ||
| 91 | }; | |||
| 92 | ||||
| 93 | std::coroutine_handle<promise_type> h_; | |||
| 94 | }; | |||
| 95 | ||||
| 96 | //---------------------------------------------------------- | |||
| 97 | ||||
| 98 | /** Concrete implementation of strand_service. | |||
| 99 | ||||
| 100 | Holds the fixed pool of strand_impl objects. | |||
| 101 | */ | |||
| 102 | class strand_service_impl : public strand_service | |||
| 103 | { | |||
| 104 | static constexpr std::size_t num_impls = 211; | |||
| 105 | ||||
| 106 | strand_impl impls_[num_impls]; | |||
| 107 | std::size_t salt_ = 0; | |||
| 108 | std::mutex mutex_; | |||
| 109 | ||||
| 110 | public: | |||
| 111 | explicit | |||
| 112 | 22x | strand_service_impl(execution_context&) | ||
| 113 | 4664x | { | ||
| 114 | 22x | } | ||
| 115 | ||||
| 116 | strand_impl* | |||
| 117 | 26x | get_implementation() override | ||
| 118 | { | |||
| 119 |
1/1✓ Branch 1 taken 26 times.
|
26x | std::lock_guard<std::mutex> lock(mutex_); | |
| 120 | 26x | std::size_t index = salt_++; | ||
| 121 | 26x | index = index % num_impls; | ||
| 122 | 26x | return &impls_[index]; | ||
| 123 | 26x | } | ||
| 124 | ||||
| 125 | protected: | |||
| 126 | void | |||
| 127 | 22x | shutdown() override | ||
| 128 | { | |||
| 129 |
2/2✓ Branch 0 taken 4642 times.
✓ Branch 1 taken 22 times.
|
4664x | for(std::size_t i = 0; i < num_impls; ++i) | |
| 130 | { | |||
| 131 |
1/1✓ Branch 1 taken 4642 times.
|
4642x | std::lock_guard<std::mutex> lock(impls_[i].mutex_); | |
| 132 | 4642x | impls_[i].locked_ = true; | ||
| 133 | ||||
| 134 | 4642x | void* p = impls_[i].cached_frame_.exchange( | ||
| 135 | kCacheClosed, std::memory_order_acquire); | |||
| 136 |
2/2✓ Branch 0 taken 10 times.
✓ Branch 1 taken 4632 times.
|
4642x | if(p) | |
| 137 | 10x | ::operator delete(p); | ||
| 138 | 4642x | } | ||
| 139 | 22x | } | ||
| 140 | ||||
| 141 | private: | |||
| 142 | static bool | |||
| 143 | 329x | enqueue(strand_impl& impl, std::coroutine_handle<> h) | ||
| 144 | { | |||
| 145 |
1/1✓ Branch 1 taken 329 times.
|
329x | std::lock_guard<std::mutex> lock(impl.mutex_); | |
| 146 |
1/1✓ Branch 1 taken 329 times.
|
329x | impl.pending_.push(h); | |
| 147 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 317 times.
|
329x | if(!impl.locked_) | |
| 148 | { | |||
| 149 | 12x | impl.locked_ = true; | ||
| 150 | 12x | return true; | ||
| 151 | } | |||
| 152 | 317x | return false; | ||
| 153 | 329x | } | ||
| 154 | ||||
| 155 | static void | |||
| 156 | 28x | dispatch_pending(strand_impl& impl) | ||
| 157 | { | |||
| 158 | 28x | strand_queue::taken_batch batch; | ||
| 159 | { | |||
| 160 |
1/1✓ Branch 1 taken 28 times.
|
28x | std::lock_guard<std::mutex> lock(impl.mutex_); | |
| 161 | 28x | batch = impl.pending_.take_all(); | ||
| 162 | 28x | } | ||
| 163 |
1/1✓ Branch 1 taken 28 times.
|
28x | impl.pending_.dispatch_batch(batch); | |
| 164 | 28x | } | ||
| 165 | ||||
| 166 | static bool | |||
| 167 | 28x | try_unlock(strand_impl& impl) | ||
| 168 | { | |||
| 169 |
1/1✓ Branch 1 taken 28 times.
|
28x | std::lock_guard<std::mutex> lock(impl.mutex_); | |
| 170 |
2/2✓ Branch 1 taken 12 times.
✓ Branch 2 taken 16 times.
|
28x | if(impl.pending_.empty()) | |
| 171 | { | |||
| 172 | 12x | impl.locked_ = false; | ||
| 173 | 12x | return true; | ||
| 174 | } | |||
| 175 | 16x | return false; | ||
| 176 | 28x | } | ||
| 177 | ||||
| 178 | static void | |||
| 179 | 28x | set_dispatch_thread(strand_impl& impl) noexcept | ||
| 180 | { | |||
| 181 | 28x | impl.dispatch_thread_.store(std::this_thread::get_id()); | ||
| 182 | 28x | } | ||
| 183 | ||||
| 184 | static void | |||
| 185 | 12x | clear_dispatch_thread(strand_impl& impl) noexcept | ||
| 186 | { | |||
| 187 | 12x | impl.dispatch_thread_.store(std::thread::id{}); | ||
| 188 | 12x | } | ||
| 189 | ||||
| 190 | // Loops until queue empty (aggressive). Alternative: per-batch fairness | |||
| 191 | // (repost after each batch to let other work run) - explore if starvation observed. | |||
| 192 | static strand_invoker | |||
| 193 |
1/1✓ Branch 1 taken 12 times.
|
12x | make_invoker(strand_impl& impl) | |
| 194 | { | |||
| 195 | strand_impl* p = &impl; | |||
| 196 | for(;;) | |||
| 197 | { | |||
| 198 | set_dispatch_thread(*p); | |||
| 199 | dispatch_pending(*p); | |||
| 200 | if(try_unlock(*p)) | |||
| 201 | { | |||
| 202 | clear_dispatch_thread(*p); | |||
| 203 | co_return; | |||
| 204 | } | |||
| 205 | } | |||
| 206 | 24x | } | ||
| 207 | ||||
| 208 | friend class strand_service; | |||
| 209 | }; | |||
| 210 | ||||
| 211 | //---------------------------------------------------------- | |||
| 212 | ||||
| 213 | 22x | strand_service:: | ||
| 214 | 22x | strand_service() | ||
| 215 | 22x | : service() | ||
| 216 | { | |||
| 217 | 22x | } | ||
| 218 | ||||
| 219 | 22x | strand_service:: | ||
| 220 | ~strand_service() = default; | |||
| 221 | ||||
| 222 | bool | |||
| 223 | 8x | strand_service:: | ||
| 224 | running_in_this_thread(strand_impl& impl) noexcept | |||
| 225 | { | |||
| 226 | 8x | return impl.dispatch_thread_.load() == std::this_thread::get_id(); | ||
| 227 | } | |||
| 228 | ||||
| 229 | std::coroutine_handle<> | |||
| 230 | 6x | strand_service:: | ||
| 231 | dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) | |||
| 232 | { | |||
| 233 |
2/2✓ Branch 1 taken 2 times.
✓ Branch 2 taken 4 times.
|
6x | if(running_in_this_thread(impl)) | |
| 234 | 2x | return h; | ||
| 235 | ||||
| 236 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4x | if(strand_service_impl::enqueue(impl, h)) | |
| 237 |
2/2✓ Branch 1 taken 4 times.
✓ Branch 5 taken 4 times.
|
4x | ex.post(strand_service_impl::make_invoker(impl).h_); | |
| 238 | 4x | return std::noop_coroutine(); | ||
| 239 | } | |||
| 240 | ||||
| 241 | void | |||
| 242 | 325x | strand_service:: | ||
| 243 | post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) | |||
| 244 | { | |||
| 245 |
2/2✓ Branch 1 taken 8 times.
✓ Branch 2 taken 317 times.
|
325x | if(strand_service_impl::enqueue(impl, h)) | |
| 246 |
2/2✓ Branch 1 taken 8 times.
✓ Branch 5 taken 8 times.
|
8x | ex.post(strand_service_impl::make_invoker(impl).h_); | |
| 247 | 325x | } | ||
| 248 | ||||
| 249 | strand_service& | |||
| 250 | 26x | get_strand_service(execution_context& ctx) | ||
| 251 | { | |||
| 252 | 26x | return ctx.use_service<strand_service_impl>(); | ||
| 253 | } | |||
| 254 | ||||
| 255 | } // namespace detail | |||
| 256 | } // namespace capy | |||
| 257 | } // namespace boost | |||
| 258 |