TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #include "src/ex/detail/strand_queue.hpp"
11 : #include <boost/capy/ex/detail/strand_service.hpp>
12 : #include <atomic>
13 : #include <coroutine>
14 : #include <mutex>
15 : #include <thread>
16 : #include <utility>
17 :
18 : namespace boost {
19 : namespace capy {
20 : namespace detail {
21 :
22 : //----------------------------------------------------------
23 :
24 : /** Implementation state for a strand.
25 :
26 : Each strand_impl provides serialization for coroutines
27 : dispatched through strands that share it.
28 : */
29 : // Sentinel stored in cached_frame_ after shutdown to prevent
30 : // in-flight invokers from repopulating a freed cache slot.
31 : inline void* const kCacheClosed = reinterpret_cast<void*>(1);
32 :
33 : struct strand_impl
34 : {
35 : std::mutex mutex_;
36 : strand_queue pending_;
37 : bool locked_ = false;
38 : std::atomic<std::thread::id> dispatch_thread_{};
39 : std::atomic<void*> cached_frame_{nullptr};
40 : };
41 :
42 : //----------------------------------------------------------
43 :
44 : /** Invoker coroutine for strand dispatch.
45 :
46 : Uses custom allocator to recycle frame - one allocation
47 : per strand_impl lifetime, stored in trailer for recovery.
48 : */
49 : struct strand_invoker
50 : {
51 : struct promise_type
52 : {
53 HIT 12 : void* operator new(std::size_t n, strand_impl& impl)
54 : {
55 12 : constexpr auto A = alignof(strand_impl*);
56 12 : std::size_t padded = (n + A - 1) & ~(A - 1);
57 12 : std::size_t total = padded + sizeof(strand_impl*);
58 :
59 12 : void* p = impl.cached_frame_.exchange(
60 : nullptr, std::memory_order_acquire);
61 12 : if(!p || p == kCacheClosed)
62 10 : p = ::operator new(total);
63 :
64 : // Trailer lets delete recover impl
65 12 : *reinterpret_cast<strand_impl**>(
66 12 : static_cast<char*>(p) + padded) = &impl;
67 12 : return p;
68 : }
69 :
70 12 : void operator delete(void* p, std::size_t n) noexcept
71 : {
72 12 : constexpr auto A = alignof(strand_impl*);
73 12 : std::size_t padded = (n + A - 1) & ~(A - 1);
74 :
75 12 : auto* impl = *reinterpret_cast<strand_impl**>(
76 : static_cast<char*>(p) + padded);
77 :
78 12 : void* expected = nullptr;
79 12 : if(!impl->cached_frame_.compare_exchange_strong(
80 : expected, p, std::memory_order_release))
81 MIS 0 : ::operator delete(p);
82 HIT 12 : }
83 :
84 12 : strand_invoker get_return_object() noexcept
85 12 : { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
86 :
87 12 : std::suspend_always initial_suspend() noexcept { return {}; }
88 12 : std::suspend_never final_suspend() noexcept { return {}; }
89 12 : void return_void() noexcept {}
90 MIS 0 : void unhandled_exception() { std::terminate(); }
91 : };
92 :
93 : std::coroutine_handle<promise_type> h_;
94 : };
95 :
96 : //----------------------------------------------------------
97 :
98 : /** Concrete implementation of strand_service.
99 :
100 : Holds the fixed pool of strand_impl objects.
101 : */
102 : class strand_service_impl : public strand_service
103 : {
104 : static constexpr std::size_t num_impls = 211;
105 :
106 : strand_impl impls_[num_impls];
107 : std::size_t salt_ = 0;
108 : std::mutex mutex_;
109 :
110 : public:
111 : explicit
112 HIT 22 : strand_service_impl(execution_context&)
113 4664 : {
114 22 : }
115 :
116 : strand_impl*
117 26 : get_implementation() override
118 : {
119 26 : std::lock_guard<std::mutex> lock(mutex_);
120 26 : std::size_t index = salt_++;
121 26 : index = index % num_impls;
122 26 : return &impls_[index];
123 26 : }
124 :
125 : protected:
126 : void
127 22 : shutdown() override
128 : {
129 4664 : for(std::size_t i = 0; i < num_impls; ++i)
130 : {
131 4642 : std::lock_guard<std::mutex> lock(impls_[i].mutex_);
132 4642 : impls_[i].locked_ = true;
133 :
134 4642 : void* p = impls_[i].cached_frame_.exchange(
135 : kCacheClosed, std::memory_order_acquire);
136 4642 : if(p)
137 10 : ::operator delete(p);
138 4642 : }
139 22 : }
140 :
141 : private:
142 : static bool
143 329 : enqueue(strand_impl& impl, std::coroutine_handle<> h)
144 : {
145 329 : std::lock_guard<std::mutex> lock(impl.mutex_);
146 329 : impl.pending_.push(h);
147 329 : if(!impl.locked_)
148 : {
149 12 : impl.locked_ = true;
150 12 : return true;
151 : }
152 317 : return false;
153 329 : }
154 :
155 : static void
156 28 : dispatch_pending(strand_impl& impl)
157 : {
158 28 : strand_queue::taken_batch batch;
159 : {
160 28 : std::lock_guard<std::mutex> lock(impl.mutex_);
161 28 : batch = impl.pending_.take_all();
162 28 : }
163 28 : impl.pending_.dispatch_batch(batch);
164 28 : }
165 :
166 : static bool
167 28 : try_unlock(strand_impl& impl)
168 : {
169 28 : std::lock_guard<std::mutex> lock(impl.mutex_);
170 28 : if(impl.pending_.empty())
171 : {
172 12 : impl.locked_ = false;
173 12 : return true;
174 : }
175 16 : return false;
176 28 : }
177 :
178 : static void
179 28 : set_dispatch_thread(strand_impl& impl) noexcept
180 : {
181 28 : impl.dispatch_thread_.store(std::this_thread::get_id());
182 28 : }
183 :
184 : static void
185 12 : clear_dispatch_thread(strand_impl& impl) noexcept
186 : {
187 12 : impl.dispatch_thread_.store(std::thread::id{});
188 12 : }
189 :
190 : // Loops until queue empty (aggressive). Alternative: per-batch fairness
191 : // (repost after each batch to let other work run) - explore if starvation observed.
192 : static strand_invoker
193 12 : make_invoker(strand_impl& impl)
194 : {
195 : strand_impl* p = &impl;
196 : for(;;)
197 : {
198 : set_dispatch_thread(*p);
199 : dispatch_pending(*p);
200 : if(try_unlock(*p))
201 : {
202 : clear_dispatch_thread(*p);
203 : co_return;
204 : }
205 : }
206 24 : }
207 :
208 : friend class strand_service;
209 : };
210 :
211 : //----------------------------------------------------------
212 :
213 22 : strand_service::
214 22 : strand_service()
215 22 : : service()
216 : {
217 22 : }
218 :
219 22 : strand_service::
220 : ~strand_service() = default;
221 :
222 : bool
223 8 : strand_service::
224 : running_in_this_thread(strand_impl& impl) noexcept
225 : {
226 8 : return impl.dispatch_thread_.load() == std::this_thread::get_id();
227 : }
228 :
229 : std::coroutine_handle<>
230 6 : strand_service::
231 : dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
232 : {
233 6 : if(running_in_this_thread(impl))
234 2 : return h;
235 :
236 4 : if(strand_service_impl::enqueue(impl, h))
237 4 : ex.post(strand_service_impl::make_invoker(impl).h_);
238 4 : return std::noop_coroutine();
239 : }
240 :
241 : void
242 325 : strand_service::
243 : post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
244 : {
245 325 : if(strand_service_impl::enqueue(impl, h))
246 8 : ex.post(strand_service_impl::make_invoker(impl).h_);
247 325 : }
248 :
249 : strand_service&
250 26 : get_strand_service(execution_context& ctx)
251 : {
252 26 : return ctx.use_service<strand_service_impl>();
253 : }
254 :
255 : } // namespace detail
256 : } // namespace capy
257 : } // namespace boost
|