1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
10  
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11  
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11  
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12  

12  

13  
#include <boost/corosio/detail/config.hpp>
13  
#include <boost/corosio/detail/config.hpp>
14  
#include <boost/corosio/detail/intrusive.hpp>
14  
#include <boost/corosio/detail/intrusive.hpp>
15  
#include <boost/capy/ex/execution_context.hpp>
15  
#include <boost/capy/ex/execution_context.hpp>
16  

16  

17  
#include <condition_variable>
17  
#include <condition_variable>
18  
#include <mutex>
18  
#include <mutex>
19  
#include <stdexcept>
19  
#include <stdexcept>
20  
#include <thread>
20  
#include <thread>
21  
#include <vector>
21  
#include <vector>
22  

22  

23  
namespace boost::corosio::detail {
23  
namespace boost::corosio::detail {
24  

24  

25  
/** Base class for thread pool work items.
25  
/** Base class for thread pool work items.
26  

26  

27  
    Derive from this to create work that can be posted to a
27  
    Derive from this to create work that can be posted to a
28  
    @ref thread_pool. Uses static function pointer dispatch,
28  
    @ref thread_pool. Uses static function pointer dispatch,
29  
    consistent with the IOCP `op` pattern.
29  
    consistent with the IOCP `op` pattern.
30  

30  

31  
    @par Example
31  
    @par Example
32  
    @code
32  
    @code
33  
    struct my_work : pool_work_item
33  
    struct my_work : pool_work_item
34  
    {
34  
    {
35  
        int* result;
35  
        int* result;
36  
        static void execute( pool_work_item* w ) noexcept
36  
        static void execute( pool_work_item* w ) noexcept
37  
        {
37  
        {
38  
            auto* self = static_cast<my_work*>( w );
38  
            auto* self = static_cast<my_work*>( w );
39  
            *self->result = 42;
39  
            *self->result = 42;
40  
        }
40  
        }
41  
    };
41  
    };
42  

42  

43  
    my_work w;
43  
    my_work w;
44  
    w.func_ = &my_work::execute;
44  
    w.func_ = &my_work::execute;
45  
    w.result = &r;
45  
    w.result = &r;
46  
    pool.post( &w );
46  
    pool.post( &w );
47  
    @endcode
47  
    @endcode
48  
*/
48  
*/
49  
struct pool_work_item : intrusive_queue<pool_work_item>::node
49  
struct pool_work_item : intrusive_queue<pool_work_item>::node
50  
{
50  
{
51  
    /// Static dispatch function signature.
51  
    /// Static dispatch function signature.
52  
    using func_type = void (*)(pool_work_item*) noexcept;
52  
    using func_type = void (*)(pool_work_item*) noexcept;
53  

53  

54  
    /// Completion handler invoked by the worker thread.
54  
    /// Completion handler invoked by the worker thread.
55  
    func_type func_ = nullptr;
55  
    func_type func_ = nullptr;
56  
};
56  
};
57  

57  

58  
/** Shared thread pool for dispatching blocking operations.
58  
/** Shared thread pool for dispatching blocking operations.
59  

59  

60  
    Provides a fixed pool of reusable worker threads for operations
60  
    Provides a fixed pool of reusable worker threads for operations
61  
    that cannot be integrated with async I/O (e.g. blocking DNS
61  
    that cannot be integrated with async I/O (e.g. blocking DNS
62  
    calls). Registered as an `execution_context::service` so it
62  
    calls). Registered as an `execution_context::service` so it
63  
    is a singleton per io_context.
63  
    is a singleton per io_context.
64  

64  

65  
    Threads are created eagerly in the constructor. The default
65  
    Threads are created eagerly in the constructor. The default
66  
    thread count is 1.
66  
    thread count is 1.
67  

67  

68  
    @par Thread Safety
68  
    @par Thread Safety
69  
    All public member functions are thread-safe.
69  
    All public member functions are thread-safe.
70  

70  

71  
    @par Shutdown
71  
    @par Shutdown
72  
    Sets a shutdown flag, notifies all threads, and joins them.
72  
    Sets a shutdown flag, notifies all threads, and joins them.
73  
    In-flight blocking calls complete naturally before the thread
73  
    In-flight blocking calls complete naturally before the thread
74  
    exits.
74  
    exits.
75  
*/
75  
*/
76 -
class thread_pool final
76 +
class thread_pool final : public capy::execution_context::service
77 -
    : public capy::execution_context::service
 
78  
{
77  
{
79  
    std::mutex mutex_;
78  
    std::mutex mutex_;
80  
    std::condition_variable cv_;
79  
    std::condition_variable cv_;
81  
    intrusive_queue<pool_work_item> work_queue_;
80  
    intrusive_queue<pool_work_item> work_queue_;
82  
    std::vector<std::thread> threads_;
81  
    std::vector<std::thread> threads_;
83  
    bool shutdown_ = false;
82  
    bool shutdown_ = false;
84  

83  

85  
    void worker_loop();
84  
    void worker_loop();
86  

85  

87  
public:
86  
public:
88  
    using key_type = thread_pool;
87  
    using key_type = thread_pool;
89  

88  

90  
    /** Construct the thread pool service.
89  
    /** Construct the thread pool service.
91  

90  

92  
        Eagerly creates all worker threads.
91  
        Eagerly creates all worker threads.
93  

92  

94  
        @par Exception Safety
93  
        @par Exception Safety
95  
        Strong guarantee. If thread creation fails, all
94  
        Strong guarantee. If thread creation fails, all
96  
        already-created threads are shut down and joined
95  
        already-created threads are shut down and joined
97  
        before the exception propagates.
96  
        before the exception propagates.
98  

97  

99  
        @param ctx Reference to the owning execution_context.
98  
        @param ctx Reference to the owning execution_context.
100  
        @param num_threads Number of worker threads. Must be
99  
        @param num_threads Number of worker threads. Must be
101  
               at least 1.
100  
               at least 1.
102  

101  

103  
        @throws std::logic_error If `num_threads` is 0.
102  
        @throws std::logic_error If `num_threads` is 0.
104  
    */
103  
    */
105 -
    explicit thread_pool(
104 +
    explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
106 -
        capy::execution_context& ctx,
 
107 -
        unsigned num_threads = 1)
 
108  
    {
105  
    {
109  
        (void)ctx;
106  
        (void)ctx;
110  
        if (!num_threads)
107  
        if (!num_threads)
111 -
            throw std::logic_error(
108 +
            throw std::logic_error("thread_pool requires at least 1 thread");
112 -
                "thread_pool requires at least 1 thread");
 
113  
        threads_.reserve(num_threads);
109  
        threads_.reserve(num_threads);
114  
        try
110  
        try
115  
        {
111  
        {
116  
            for (unsigned i = 0; i < num_threads; ++i)
112  
            for (unsigned i = 0; i < num_threads; ++i)
117  
                threads_.emplace_back([this] { worker_loop(); });
113  
                threads_.emplace_back([this] { worker_loop(); });
118  
        }
114  
        }
119  
        catch (...)
115  
        catch (...)
120  
        {
116  
        {
121  
            shutdown();
117  
            shutdown();
122  
            throw;
118  
            throw;
123  
        }
119  
        }
124  
    }
120  
    }
125  

121  

126  
    ~thread_pool() override = default;
122  
    ~thread_pool() override = default;
127  

123  

128 -
    thread_pool(thread_pool const&) = delete;
124 +
    thread_pool(thread_pool const&)            = delete;
129  
    thread_pool& operator=(thread_pool const&) = delete;
125  
    thread_pool& operator=(thread_pool const&) = delete;
130  

126  

131  
    /** Enqueue a work item for execution on the thread pool.
127  
    /** Enqueue a work item for execution on the thread pool.
132  

128  

133  
        Zero-allocation: the caller owns the work item's storage.
129  
        Zero-allocation: the caller owns the work item's storage.
134  

130  

135  
        @param w The work item to execute. Must remain valid until
131  
        @param w The work item to execute. Must remain valid until
136  
                 its `func_` has been called.
132  
                 its `func_` has been called.
137  

133  

138  
        @return `true` if the item was enqueued, `false` if the
134  
        @return `true` if the item was enqueued, `false` if the
139  
                pool has already shut down.
135  
                pool has already shut down.
140  
    */
136  
    */
141  
    bool post(pool_work_item* w) noexcept;
137  
    bool post(pool_work_item* w) noexcept;
142  

138  

143  
    /** Shut down the thread pool.
139  
    /** Shut down the thread pool.
144  

140  

145  
        Signals all threads to exit after draining any
141  
        Signals all threads to exit after draining any
146  
        remaining queued work, then joins them.
142  
        remaining queued work, then joins them.
147  
    */
143  
    */
148  
    void shutdown() override;
144  
    void shutdown() override;
149  
};
145  
};
150  

146  

151  
inline void
147  
inline void
152  
thread_pool::worker_loop()
148  
thread_pool::worker_loop()
153  
{
149  
{
154  
    for (;;)
150  
    for (;;)
155  
    {
151  
    {
156  
        pool_work_item* w;
152  
        pool_work_item* w;
157  
        {
153  
        {
158  
            std::unique_lock<std::mutex> lock(mutex_);
154  
            std::unique_lock<std::mutex> lock(mutex_);
159 -
            cv_.wait(lock, [this] {
155 +
            cv_.wait(
160 -
                return shutdown_ || !work_queue_.empty();
156 +
                lock, [this] { return shutdown_ || !work_queue_.empty(); });
161 -
            });
 
162  

157  

163  
            w = work_queue_.pop();
158  
            w = work_queue_.pop();
164  
            if (!w)
159  
            if (!w)
165  
            {
160  
            {
166  
                if (shutdown_)
161  
                if (shutdown_)
167  
                    return;
162  
                    return;
168  
                continue;
163  
                continue;
169  
            }
164  
            }
170  
        }
165  
        }
171  
        w->func_(w);
166  
        w->func_(w);
172  
    }
167  
    }
173  
}
168  
}
174  

169  

175  
inline bool
170  
inline bool
176  
thread_pool::post(pool_work_item* w) noexcept
171  
thread_pool::post(pool_work_item* w) noexcept
177  
{
172  
{
178  
    {
173  
    {
179  
        std::lock_guard<std::mutex> lock(mutex_);
174  
        std::lock_guard<std::mutex> lock(mutex_);
180  
        if (shutdown_)
175  
        if (shutdown_)
181  
            return false;
176  
            return false;
182  
        work_queue_.push(w);
177  
        work_queue_.push(w);
183  
    }
178  
    }
184  
    cv_.notify_one();
179  
    cv_.notify_one();
185  
    return true;
180  
    return true;
186  
}
181  
}
187  

182  

188  
inline void
183  
inline void
189  
thread_pool::shutdown()
184  
thread_pool::shutdown()
190  
{
185  
{
191  
    {
186  
    {
192  
        std::lock_guard<std::mutex> lock(mutex_);
187  
        std::lock_guard<std::mutex> lock(mutex_);
193  
        shutdown_ = true;
188  
        shutdown_ = true;
194  
    }
189  
    }
195  
    cv_.notify_all();
190  
    cv_.notify_all();
196  

191  

197  
    for (auto& t : threads_)
192  
    for (auto& t : threads_)
198  
    {
193  
    {
199  
        if (t.joinable())
194  
        if (t.joinable())
200  
            t.join();
195  
            t.join();
201  
    }
196  
    }
202  
    threads_.clear();
197  
    threads_.clear();
203  

198  

204  
    {
199  
    {
205  
        std::lock_guard<std::mutex> lock(mutex_);
200  
        std::lock_guard<std::mutex> lock(mutex_);
206  
        while (work_queue_.pop())
201  
        while (work_queue_.pop())
207  
            ;
202  
            ;
208  
    }
203  
    }
209  
}
204  
}
210  

205  

211  
} // namespace boost::corosio::detail
206  
} // namespace boost::corosio::detail
212  

207  

213  
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
208  
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP