LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex/detail - strand_service.cpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 97.8 % 91 89 2
Test Date: 2026-03-12 20:08:21 Functions: 91.3 % 23 21 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #include "src/ex/detail/strand_queue.hpp"
      11                 : #include <boost/capy/ex/detail/strand_service.hpp>
      12                 : #include <atomic>
      13                 : #include <coroutine>
      14                 : #include <mutex>
      15                 : #include <thread>
      16                 : #include <utility>
      17                 : 
      18                 : namespace boost {
      19                 : namespace capy {
      20                 : namespace detail {
      21                 : 
      22                 : //----------------------------------------------------------
      23                 : 
      24                 : /** Implementation state for a strand.
      25                 : 
      26                 :     Each strand_impl provides serialization for coroutines
      27                 :     dispatched through strands that share it.
      28                 : */
      29                 : // Sentinel stored in cached_frame_ after shutdown to prevent
      30                 : // in-flight invokers from repopulating a freed cache slot.
      31                 : inline void* const kCacheClosed = reinterpret_cast<void*>(1);
      32                 : 
      33                 : struct strand_impl
      34                 : {
      35                 :     std::mutex mutex_;
      36                 :     strand_queue pending_;
      37                 :     bool locked_ = false;
      38                 :     std::atomic<std::thread::id> dispatch_thread_{};
      39                 :     std::atomic<void*> cached_frame_{nullptr};
      40                 : };
      41                 : 
      42                 : //----------------------------------------------------------
      43                 : 
      44                 : /** Invoker coroutine for strand dispatch.
      45                 : 
      46                 :     Uses custom allocator to recycle frame - one allocation
      47                 :     per strand_impl lifetime, stored in trailer for recovery.
      48                 : */
      49                 : struct strand_invoker
      50                 : {
      51                 :     struct promise_type
      52                 :     {
      53 HIT          12 :         void* operator new(std::size_t n, strand_impl& impl)
      54                 :         {
      55              12 :             constexpr auto A = alignof(strand_impl*);
      56              12 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      57              12 :             std::size_t total = padded + sizeof(strand_impl*);
      58                 : 
      59              12 :             void* p = impl.cached_frame_.exchange(
      60                 :                 nullptr, std::memory_order_acquire);
      61              12 :             if(!p || p == kCacheClosed)
      62              10 :                 p = ::operator new(total);
      63                 : 
      64                 :             // Trailer lets delete recover impl
      65              12 :             *reinterpret_cast<strand_impl**>(
      66              12 :                 static_cast<char*>(p) + padded) = &impl;
      67              12 :             return p;
      68                 :         }
      69                 : 
      70              12 :         void operator delete(void* p, std::size_t n) noexcept
      71                 :         {
      72              12 :             constexpr auto A = alignof(strand_impl*);
      73              12 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      74                 : 
      75              12 :             auto* impl = *reinterpret_cast<strand_impl**>(
      76                 :                 static_cast<char*>(p) + padded);
      77                 : 
      78              12 :             void* expected = nullptr;
      79              12 :             if(!impl->cached_frame_.compare_exchange_strong(
      80                 :                 expected, p, std::memory_order_release))
      81 MIS           0 :                 ::operator delete(p);
      82 HIT          12 :         }
      83                 : 
      84              12 :         strand_invoker get_return_object() noexcept
      85              12 :         { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
      86                 : 
      87              12 :         std::suspend_always initial_suspend() noexcept { return {}; }
      88              12 :         std::suspend_never final_suspend() noexcept { return {}; }
      89              12 :         void return_void() noexcept {}
      90 MIS           0 :         void unhandled_exception() { std::terminate(); }
      91                 :     };
      92                 : 
      93                 :     std::coroutine_handle<promise_type> h_;
      94                 : };
      95                 : 
      96                 : //----------------------------------------------------------
      97                 : 
      98                 : /** Concrete implementation of strand_service.
      99                 : 
     100                 :     Holds the fixed pool of strand_impl objects.
     101                 : */
     102                 : class strand_service_impl : public strand_service
     103                 : {
     104                 :     static constexpr std::size_t num_impls = 211;
     105                 : 
     106                 :     strand_impl impls_[num_impls];
     107                 :     std::size_t salt_ = 0;
     108                 :     std::mutex mutex_;
     109                 : 
     110                 : public:
     111                 :     explicit
     112 HIT          22 :     strand_service_impl(execution_context&)
     113            4664 :     {
     114              22 :     }
     115                 : 
     116                 :     strand_impl*
     117              26 :     get_implementation() override
     118                 :     {
     119              26 :         std::lock_guard<std::mutex> lock(mutex_);
     120              26 :         std::size_t index = salt_++;
     121              26 :         index = index % num_impls;
     122              26 :         return &impls_[index];
     123              26 :     }
     124                 : 
     125                 : protected:
     126                 :     void
     127              22 :     shutdown() override
     128                 :     {
     129            4664 :         for(std::size_t i = 0; i < num_impls; ++i)
     130                 :         {
     131            4642 :             std::lock_guard<std::mutex> lock(impls_[i].mutex_);
     132            4642 :             impls_[i].locked_ = true;
     133                 : 
     134            4642 :             void* p = impls_[i].cached_frame_.exchange(
     135                 :                 kCacheClosed, std::memory_order_acquire);
     136            4642 :             if(p)
     137              10 :                 ::operator delete(p);
     138            4642 :         }
     139              22 :     }
     140                 : 
     141                 : private:
     142                 :     static bool
     143             329 :     enqueue(strand_impl& impl, std::coroutine_handle<> h)
     144                 :     {
     145             329 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     146             329 :         impl.pending_.push(h);
     147             329 :         if(!impl.locked_)
     148                 :         {
     149              12 :             impl.locked_ = true;
     150              12 :             return true;
     151                 :         }
     152             317 :         return false;
     153             329 :     }
     154                 : 
     155                 :     static void
     156              28 :     dispatch_pending(strand_impl& impl)
     157                 :     {
     158              28 :         strand_queue::taken_batch batch;
     159                 :         {
     160              28 :             std::lock_guard<std::mutex> lock(impl.mutex_);
     161              28 :             batch = impl.pending_.take_all();
     162              28 :         }
     163              28 :         impl.pending_.dispatch_batch(batch);
     164              28 :     }
     165                 : 
     166                 :     static bool
     167              28 :     try_unlock(strand_impl& impl)
     168                 :     {
     169              28 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     170              28 :         if(impl.pending_.empty())
     171                 :         {
     172              12 :             impl.locked_ = false;
     173              12 :             return true;
     174                 :         }
     175              16 :         return false;
     176              28 :     }
     177                 : 
     178                 :     static void
     179              28 :     set_dispatch_thread(strand_impl& impl) noexcept
     180                 :     {
     181              28 :         impl.dispatch_thread_.store(std::this_thread::get_id());
     182              28 :     }
     183                 : 
     184                 :     static void
     185              12 :     clear_dispatch_thread(strand_impl& impl) noexcept
     186                 :     {
     187              12 :         impl.dispatch_thread_.store(std::thread::id{});
     188              12 :     }
     189                 : 
     190                 :     // Loops until queue empty (aggressive). Alternative: per-batch fairness
     191                 :     // (repost after each batch to let other work run) - explore if starvation observed.
     192                 :     static strand_invoker
     193              12 :     make_invoker(strand_impl& impl)
     194                 :     {
     195                 :         strand_impl* p = &impl;
     196                 :         for(;;)
     197                 :         {
     198                 :             set_dispatch_thread(*p);
     199                 :             dispatch_pending(*p);
     200                 :             if(try_unlock(*p))
     201                 :             {
     202                 :                 clear_dispatch_thread(*p);
     203                 :                 co_return;
     204                 :             }
     205                 :         }
     206              24 :     }
     207                 : 
     208                 :     friend class strand_service;
     209                 : };
     210                 : 
     211                 : //----------------------------------------------------------
     212                 : 
     213              22 : strand_service::
     214              22 : strand_service()
     215              22 :     : service()
     216                 : {
     217              22 : }
     218                 : 
     219              22 : strand_service::
     220                 : ~strand_service() = default;
     221                 : 
     222                 : bool
     223               8 : strand_service::
     224                 : running_in_this_thread(strand_impl& impl) noexcept
     225                 : {
     226               8 :     return impl.dispatch_thread_.load() == std::this_thread::get_id();
     227                 : }
     228                 : 
     229                 : std::coroutine_handle<>
     230               6 : strand_service::
     231                 : dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
     232                 : {
     233               6 :     if(running_in_this_thread(impl))
     234               2 :         return h;
     235                 : 
     236               4 :     if(strand_service_impl::enqueue(impl, h))
     237               4 :         ex.post(strand_service_impl::make_invoker(impl).h_);
     238               4 :     return std::noop_coroutine();
     239                 : }
     240                 : 
     241                 : void
     242             325 : strand_service::
     243                 : post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
     244                 : {
     245             325 :     if(strand_service_impl::enqueue(impl, h))
     246               8 :         ex.post(strand_service_impl::make_invoker(impl).h_);
     247             325 : }
     248                 : 
     249                 : strand_service&
     250              26 : get_strand_service(execution_context& ctx)
     251                 : {
     252              26 :     return ctx.use_service<strand_service_impl>();
     253                 : }
     254                 : 
     255                 : } // namespace detail
     256                 : } // namespace capy
     257                 : } // namespace boost
        

Generated by: LCOV version 2.3