include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

83.9% Lines (115/137) 100.0% List of functions (9/9)
f(x) Functions (9)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <atomic>
31 #include <chrono>
32 #include <cstdint>
33 #include <mutex>
34
35 #include <errno.h>
36 #include <sys/epoll.h>
37 #include <sys/eventfd.h>
38 #include <sys/timerfd.h>
39 #include <unistd.h>
40
41 namespace boost::corosio::detail {
42
43 struct epoll_op;
44 struct descriptor_state;
45
46 /** Linux scheduler using epoll for I/O multiplexing.
47
48 This scheduler implements the scheduler interface using Linux epoll
49 for efficient I/O event notification. It uses a single reactor model
50 where one thread runs epoll_wait while other threads
51 wait on a condition variable for handler work. This design provides:
52
53 - Handler parallelism: N posted handlers can execute on N threads
54 - No thundering herd: condition_variable wakes exactly one thread
55 - IOCP parity: Behavior matches Windows I/O completion port semantics
56
57 When threads call run(), they first try to execute queued handlers.
58 If the queue is empty and no reactor is running, one thread becomes
59 the reactor and runs epoll_wait. Other threads wait on a condition
60 variable until handlers are available.
61
62 @par Thread Safety
63 All public member functions are thread-safe.
64 */
65 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
66 {
67 public:
68 /** Construct the scheduler.
69
70 Creates an epoll instance, eventfd for reactor interruption,
71 and timerfd for kernel-managed timer expiry.
72
73 @param ctx Reference to the owning execution_context.
74 @param concurrency_hint Hint for expected thread count (unused).
75 */
76 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77
78 /// Destroy the scheduler.
79 ~epoll_scheduler() override;
80
81 epoll_scheduler(epoll_scheduler const&) = delete;
82 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83
84 /// Shut down the scheduler, draining pending operations.
85 void shutdown() override;
86
87 /** Return the epoll file descriptor.
88
89 Used by socket services to register file descriptors
90 for I/O event notification.
91
92 @return The epoll file descriptor.
93 */
94 int epoll_fd() const noexcept
95 {
96 return epoll_fd_;
97 }
98
99 /** Register a descriptor for persistent monitoring.
100
101 The fd is registered once and stays registered until explicitly
102 deregistered. Events are dispatched via descriptor_state which
103 tracks pending read/write/connect operations.
104
105 @param fd The file descriptor to register.
106 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
107 */
108 void register_descriptor(int fd, descriptor_state* desc) const;
109
110 /** Deregister a persistently registered descriptor.
111
112 @param fd The file descriptor to deregister.
113 */
114 void deregister_descriptor(int fd) const;
115
116 private:
117 void
118 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
119 void interrupt_reactor() const override;
120 void update_timerfd() const;
121
122 int epoll_fd_;
123 int event_fd_;
124 int timer_fd_;
125
126 // Edge-triggered eventfd state
127 mutable std::atomic<bool> eventfd_armed_{false};
128
129 // Set when the earliest timer changes; flushed before epoll_wait
130 mutable std::atomic<bool> timerfd_stale_{false};
131 };
132
133 270x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
134 270x : epoll_fd_(-1)
135 270x , event_fd_(-1)
136 270x , timer_fd_(-1)
137 {
138 270x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
139 270x if (epoll_fd_ < 0)
140 detail::throw_system_error(make_err(errno), "epoll_create1");
141
142 270x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
143 270x if (event_fd_ < 0)
144 {
145 int errn = errno;
146 ::close(epoll_fd_);
147 detail::throw_system_error(make_err(errn), "eventfd");
148 }
149
150 270x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
151 270x if (timer_fd_ < 0)
152 {
153 int errn = errno;
154 ::close(event_fd_);
155 ::close(epoll_fd_);
156 detail::throw_system_error(make_err(errn), "timerfd_create");
157 }
158
159 270x epoll_event ev{};
160 270x ev.events = EPOLLIN | EPOLLET;
161 270x ev.data.ptr = nullptr;
162 270x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
163 {
164 int errn = errno;
165 ::close(timer_fd_);
166 ::close(event_fd_);
167 ::close(epoll_fd_);
168 detail::throw_system_error(make_err(errn), "epoll_ctl");
169 }
170
171 270x epoll_event timer_ev{};
172 270x timer_ev.events = EPOLLIN | EPOLLERR;
173 270x timer_ev.data.ptr = &timer_fd_;
174 270x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
175 {
176 int errn = errno;
177 ::close(timer_fd_);
178 ::close(event_fd_);
179 ::close(epoll_fd_);
180 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
181 }
182
183 270x timer_svc_ = &get_timer_service(ctx, *this);
184 270x timer_svc_->set_on_earliest_changed(
185 5058x timer_service::callback(this, [](void* p) {
186 4788x auto* self = static_cast<epoll_scheduler*>(p);
187 4788x self->timerfd_stale_.store(true, std::memory_order_release);
188 4788x self->interrupt_reactor();
189 4788x }));
190
191 270x get_resolver_service(ctx, *this);
192 270x get_signal_service(ctx, *this);
193
194 270x completed_ops_.push(&task_op_);
195 270x }
196
197 540x inline epoll_scheduler::~epoll_scheduler()
198 {
199 270x if (timer_fd_ >= 0)
200 270x ::close(timer_fd_);
201 270x if (event_fd_ >= 0)
202 270x ::close(event_fd_);
203 270x if (epoll_fd_ >= 0)
204 270x ::close(epoll_fd_);
205 540x }
206
207 inline void
208 270x epoll_scheduler::shutdown()
209 {
210 270x shutdown_drain();
211
212 270x if (event_fd_ >= 0)
213 270x interrupt_reactor();
214 270x }
215
216 inline void
217 9247x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
218 {
219 9247x epoll_event ev{};
220 9247x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
221 9247x ev.data.ptr = desc;
222
223 9247x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
224 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
225
226 9247x desc->registered_events = ev.events;
227 9247x desc->fd = fd;
228 9247x desc->scheduler_ = this;
229 9247x desc->ready_events_.store(0, std::memory_order_relaxed);
230
231 9247x std::lock_guard lock(desc->mutex);
232 9247x desc->impl_ref_.reset();
233 9247x desc->read_ready = false;
234 9247x desc->write_ready = false;
235 9247x }
236
237 inline void
238 9247x epoll_scheduler::deregister_descriptor(int fd) const
239 {
240 9247x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
241 9247x }
242
243 inline void
244 5286x epoll_scheduler::interrupt_reactor() const
245 {
246 5286x bool expected = false;
247 5286x if (eventfd_armed_.compare_exchange_strong(
248 expected, true, std::memory_order_release,
249 std::memory_order_relaxed))
250 {
251 5103x std::uint64_t val = 1;
252 5103x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
253 }
254 5286x }
255
256 inline void
257 9538x epoll_scheduler::update_timerfd() const
258 {
259 9538x auto nearest = timer_svc_->nearest_expiry();
260
261 9538x itimerspec ts{};
262 9538x int flags = 0;
263
264 9538x if (nearest == timer_service::time_point::max())
265 {
266 // No timers — disarm by setting to 0 (relative)
267 }
268 else
269 {
270 9485x auto now = std::chrono::steady_clock::now();
271 9485x if (nearest <= now)
272 {
273 // Use 1ns instead of 0 — zero disarms the timerfd
274 185x ts.it_value.tv_nsec = 1;
275 }
276 else
277 {
278 9300x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
279 9300x nearest - now)
280 9300x .count();
281 9300x ts.it_value.tv_sec = nsec / 1000000000;
282 9300x ts.it_value.tv_nsec = nsec % 1000000000;
283 9300x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
284 ts.it_value.tv_nsec = 1;
285 }
286 }
287
288 9538x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
289 detail::throw_system_error(make_err(errno), "timerfd_settime");
290 9538x }
291
292 inline void
293 41778x epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
294 {
295 41778x int timeout_ms = task_interrupted_ ? 0 : -1;
296
297 41778x if (lock.owns_lock())
298 13666x lock.unlock();
299
300 41778x task_cleanup on_exit{this, &lock, ctx};
301
302 // Flush deferred timerfd programming before blocking
303 41778x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
304 4767x update_timerfd();
305
306 epoll_event events[128];
307 41778x int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
308
309 41778x if (nfds < 0 && errno != EINTR)
310 detail::throw_system_error(make_err(errno), "epoll_wait");
311
312 41778x bool check_timers = false;
313 41778x op_queue local_ops;
314
315 93986x for (int i = 0; i < nfds; ++i)
316 {
317 52208x if (events[i].data.ptr == nullptr)
318 {
319 std::uint64_t val;
320 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
321 4833x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
322 4833x eventfd_armed_.store(false, std::memory_order_relaxed);
323 4833x continue;
324 4833x }
325
326 47375x if (events[i].data.ptr == &timer_fd_)
327 {
328 std::uint64_t expirations;
329 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
330 [[maybe_unused]] auto r =
331 4771x ::read(timer_fd_, &expirations, sizeof(expirations));
332 4771x check_timers = true;
333 4771x continue;
334 4771x }
335
336 42604x auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
337 42604x desc->add_ready_events(events[i].events);
338
339 42604x bool expected = false;
340 42604x if (desc->is_enqueued_.compare_exchange_strong(
341 expected, true, std::memory_order_release,
342 std::memory_order_relaxed))
343 {
344 42604x local_ops.push(desc);
345 }
346 }
347
348 41778x if (check_timers)
349 {
350 4771x timer_svc_->process_expired();
351 4771x update_timerfd();
352 }
353
354 41778x lock.lock();
355
356 41778x if (!local_ops.empty())
357 27600x completed_ops_.splice(local_ops);
358 41778x }
359
360 } // namespace boost::corosio::detail
361
362 #endif // BOOST_COROSIO_HAS_EPOLL
363
364 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
365