1 -
//
 
2 -
// Copyright (c) 2026 Steve Gerbino
 
3 -
//
 
4 -
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 -
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 -
//
 
7 -
// Official repository: https://github.com/cppalliance/corosio
 
8 -
//
 
9 -

 
10 -
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
 
11 -
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
 
12 -

 
13 -
#include <boost/corosio/detail/platform.hpp>
 
14 -

 
15 -
#if BOOST_COROSIO_HAS_EPOLL
 
16 -

 
17 -
#include <boost/corosio/detail/config.hpp>
 
18 -
#include <boost/capy/ex/execution_context.hpp>
 
19 -
#include <boost/corosio/detail/socket_service.hpp>
 
20 -

 
21 -
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
 
22 -
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
 
23 -
#include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
 
24 -

 
25 -
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
 
26 -

 
27 -
#include <coroutine>
 
28 -
#include <mutex>
 
29 -
#include <utility>
 
30 -

 
31 -
#include <errno.h>
 
32 -
#include <netinet/in.h>
 
33 -
#include <netinet/tcp.h>
 
34 -
#include <sys/epoll.h>
 
35 -
#include <sys/socket.h>
 
36 -
#include <unistd.h>
 
37 -

 
38 -
/*
 
39 -
    epoll Socket Implementation
 
40 -
    ===========================
 
41 -

 
42 -
    Each I/O operation follows the same pattern:
 
43 -
      1. Try the syscall immediately (non-blocking socket)
 
44 -
      2. If it succeeds or fails with a real error, post to completion queue
 
45 -
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
 
46 -

 
47 -
    This "try first" approach avoids unnecessary epoll round-trips for
 
48 -
    operations that can complete immediately (common for small reads/writes
 
49 -
    on fast local connections).
 
50 -

 
51 -
    One-Shot Registration
 
52 -
    ---------------------
 
53 -
    We use one-shot epoll registration: each operation registers, waits for
 
54 -
    one event, then unregisters. This simplifies the state machine since we
 
55 -
    don't need to track whether an fd is currently registered or handle
 
56 -
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
 
57 -
    simplicity is worth it.
 
58 -

 
59 -
    Cancellation
 
60 -
    ------------
 
61 -
    See op.hpp for the completion/cancellation race handling via the
 
62 -
    `registered` atomic. cancel() must complete pending operations (post
 
63 -
    them with cancelled flag) so coroutines waiting on them can resume.
 
64 -
    close_socket() calls cancel() first to ensure this.
 
65 -

 
66 -
    Impl Lifetime with shared_ptr
 
67 -
    -----------------------------
 
68 -
    Socket impls use enable_shared_from_this. The service owns impls via
 
69 -
    shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
 
70 -
    removal. When a user calls close(), we call cancel() which posts pending
 
71 -
    ops to the scheduler.
 
72 -

 
73 -
    CRITICAL: The posted ops must keep the impl alive until they complete.
 
74 -
    Otherwise the scheduler would process a freed op (use-after-free). The
 
75 -
    cancel() method captures shared_from_this() into op.impl_ptr before
 
76 -
    posting. When the op completes, impl_ptr is cleared, allowing the impl
 
77 -
    to be destroyed if no other references exist.
 
78 -

 
79 -
    Service Ownership
 
80 -
    -----------------
 
81 -
    epoll_socket_service owns all socket impls. destroy_impl() removes the
 
82 -
    shared_ptr from the map, but the impl may survive if ops still hold
 
83 -
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
 
84 -
    in-flight ops will complete and release their refs.
 
85 -
*/
 
86 -

 
87 -
namespace boost::corosio::detail {
 
88 -

 
89 -
/// State for epoll socket service.
 
90 -
using epoll_socket_state = reactor_service_state<epoll_scheduler, epoll_socket>;
 
91 -

 
92 -
/** epoll socket service implementation.
 
93 -

 
94 -
    Inherits from socket_service to enable runtime polymorphism.
 
95 -
    Uses key_type = socket_service for service lookup.
 
96 -
*/
 
97 -
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
 
98 -
{
 
99 -
public:
 
100 -
    explicit epoll_socket_service(capy::execution_context& ctx);
 
101 -
    ~epoll_socket_service() override;
 
102 -

 
103 -
    epoll_socket_service(epoll_socket_service const&)            = delete;
 
104 -
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
 
105 -

 
106 -
    void shutdown() override;
 
107 -

 
108 -
    io_object::implementation* construct() override;
 
109 -
    void destroy(io_object::implementation*) override;
 
110 -
    void close(io_object::handle&) override;
 
111 -
    std::error_code open_socket(
 
112 -
        tcp_socket::implementation& impl,
 
113 -
        int family,
 
114 -
        int type,
 
115 -
        int protocol) override;
 
116 -

 
117 -
    epoll_scheduler& scheduler() const noexcept
 
118 -
    {
 
119 -
        return state_->sched_;
 
120 -
    }
 
121 -
    void post(scheduler_op* op);
 
122 -
    void work_started() noexcept;
 
123 -
    void work_finished() noexcept;
 
124 -

 
125 -
private:
 
126 -
    std::unique_ptr<epoll_socket_state> state_;
 
127 -
};
 
128 -

 
129 -
inline void
 
130 -
epoll_connect_op::cancel() noexcept
 
131 -
{
 
132 -
    if (socket_impl_)
 
133 -
        socket_impl_->cancel_single_op(*this);
 
134 -
    else
 
135 -
        request_cancel();
 
136 -
}
 
137 -

 
138 -
inline void
 
139 -
epoll_read_op::cancel() noexcept
 
140 -
{
 
141 -
    if (socket_impl_)
 
142 -
        socket_impl_->cancel_single_op(*this);
 
143 -
    else
 
144 -
        request_cancel();
 
145 -
}
 
146 -

 
147 -
inline void
 
148 -
epoll_write_op::cancel() noexcept
 
149 -
{
 
150 -
    if (socket_impl_)
 
151 -
        socket_impl_->cancel_single_op(*this);
 
152 -
    else
 
153 -
        request_cancel();
 
154 -
}
 
155 -

 
156 -
inline void
 
157 -
epoll_op::operator()()
 
158 -
{
 
159 -
    complete_io_op(*this);
 
160 -
}
 
161 -

 
162 -
inline void
 
163 -
epoll_connect_op::operator()()
 
164 -
{
 
165 -
    complete_connect_op(*this);
 
166 -
}
 
167 -

 
168 -
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
 
169 -
    : reactor_socket(svc)
 
170 -
{
 
171 -
}
 
172 -

 
173 -
inline epoll_socket::~epoll_socket() = default;
 
174 -

 
175 -
inline std::coroutine_handle<>
 
176 -
epoll_socket::connect(
 
177 -
    std::coroutine_handle<> h,
 
178 -
    capy::executor_ref ex,
 
179 -
    endpoint ep,
 
180 -
    std::stop_token token,
 
181 -
    std::error_code* ec)
 
182 -
{
 
183 -
    return do_connect(h, ex, ep, token, ec);
 
184 -
}
 
185 -

 
186 -
inline std::coroutine_handle<>
 
187 -
epoll_socket::read_some(
 
188 -
    std::coroutine_handle<> h,
 
189 -
    capy::executor_ref ex,
 
190 -
    buffer_param param,
 
191 -
    std::stop_token token,
 
192 -
    std::error_code* ec,
 
193 -
    std::size_t* bytes_out)
 
194 -
{
 
195 -
    return do_read_some(h, ex, param, token, ec, bytes_out);
 
196 -
}
 
197 -

 
198 -
inline std::coroutine_handle<>
 
199 -
epoll_socket::write_some(
 
200 -
    std::coroutine_handle<> h,
 
201 -
    capy::executor_ref ex,
 
202 -
    buffer_param param,
 
203 -
    std::stop_token token,
 
204 -
    std::error_code* ec,
 
205 -
    std::size_t* bytes_out)
 
206 -
{
 
207 -
    return do_write_some(h, ex, param, token, ec, bytes_out);
 
208 -
}
 
209 -

 
210 -
inline void
 
211 -
epoll_socket::cancel() noexcept
 
212 -
{
 
213 -
    do_cancel();
 
214 -
}
 
215 -

 
216 -
inline void
 
217 -
epoll_socket::close_socket() noexcept
 
218 -
{
 
219 -
    do_close_socket();
 
220 -
}
 
221 -

 
222 -
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
 
223 -
    : state_(
 
224 -
          std::make_unique<epoll_socket_state>(
 
225 -
              ctx.use_service<epoll_scheduler>()))
 
226 -
{
 
227 -
}
 
228 -

 
229 -
inline epoll_socket_service::~epoll_socket_service() {}
 
230 -

 
231 -
inline void
 
232 -
epoll_socket_service::shutdown()
 
233 -
{
 
234 -
    std::lock_guard lock(state_->mutex_);
 
235 -

 
236 -
    while (auto* impl = state_->impl_list_.pop_front())
 
237 -
        impl->close_socket();
 
238 -

 
239 -
    // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
 
240 -
    // drains completed_ops_, calling destroy() on each queued op. If we
 
241 -
    // released our shared_ptrs now, an epoll_op::destroy() could free the
 
242 -
    // last ref to an impl whose embedded descriptor_state is still linked
 
243 -
    // in the queue — use-after-free on the next pop(). Letting ~state_
 
244 -
    // release the ptrs (during service destruction, after scheduler
 
245 -
    // shutdown) keeps every impl alive until all ops have been drained.
 
246 -
}
 
247 -

 
248 -
inline io_object::implementation*
 
249 -
epoll_socket_service::construct()
 
250 -
{
 
251 -
    auto impl = std::make_shared<epoll_socket>(*this);
 
252 -
    auto* raw = impl.get();
 
253 -

 
254 -
    {
 
255 -
        std::lock_guard lock(state_->mutex_);
 
256 -
        state_->impl_ptrs_.emplace(raw, std::move(impl));
 
257 -
        state_->impl_list_.push_back(raw);
 
258 -
    }
 
259 -

 
260 -
    return raw;
 
261 -
}
 
262 -

 
263 -
inline void
 
264 -
epoll_socket_service::destroy(io_object::implementation* impl)
 
265 -
{
 
266 -
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
 
267 -
    epoll_impl->close_socket();
 
268 -
    std::lock_guard lock(state_->mutex_);
 
269 -
    state_->impl_list_.remove(epoll_impl);
 
270 -
    state_->impl_ptrs_.erase(epoll_impl);
 
271 -
}
 
272 -

 
273 -
inline std::error_code
 
274 -
epoll_socket_service::open_socket(
 
275 -
    tcp_socket::implementation& impl, int family, int type, int protocol)
 
276 -
{
 
277 -
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
 
278 -
    epoll_impl->close_socket();
 
279 -

 
280 -
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
 
281 -
    if (fd < 0)
 
282 -
        return make_err(errno);
 
283 -

 
284 -
    if (family == AF_INET6)
 
285 -
    {
 
286 -
        int one = 1;
 
287 -
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
 
288 -
    }
 
289 -

 
290 -
    epoll_impl->fd_ = fd;
 
291 -

 
292 -
    // Register fd with epoll (edge-triggered mode)
 
293 -
    epoll_impl->desc_state_.fd = fd;
 
294 -
    {
 
295 -
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
 
296 -
        epoll_impl->desc_state_.read_op    = nullptr;
 
297 -
        epoll_impl->desc_state_.write_op   = nullptr;
 
298 -
        epoll_impl->desc_state_.connect_op = nullptr;
 
299 -
    }
 
300 -
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
 
301 -

 
302 -
    return {};
 
303 -
}
 
304 -

 
305 -
inline void
 
306 -
epoll_socket_service::close(io_object::handle& h)
 
307 -
{
 
308 -
    static_cast<epoll_socket*>(h.get())->close_socket();
 
309 -
}
 
310 -

 
311 -
inline void
 
312 -
epoll_socket_service::post(scheduler_op* op)
 
313 -
{
 
314 -
    state_->sched_.post(op);
 
315 -
}
 
316 -

 
317 -
inline void
 
318 -
epoll_socket_service::work_started() noexcept
 
319 -
{
 
320 -
    state_->sched_.work_started();
 
321 -
}
 
322 -

 
323 -
inline void
 
324 -
epoll_socket_service::work_finished() noexcept
 
325 -
{
 
326 -
    state_->sched_.work_finished();
 
327 -
}
 
328 -

 
329 -
} // namespace boost::corosio::detail
 
330 -

 
331 -
#endif // BOOST_COROSIO_HAS_EPOLL
 
332 -

 
333 -
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP