include/boost/corosio/native/detail/epoll/epoll_tcp_service.hpp

79.3% Lines (46/58) 85.7% List of functions (12/14)
f(x) Functions (14)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_service::epoll_tcp_service(boost::capy::execution_context&) :99 270x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::cancel() :112 0 0.0% 0.0% boost::corosio::detail::epoll_read_op::cancel() :121 97x 80.0% 75.0% boost::corosio::detail::epoll_write_op::cancel() :130 0 0.0% 0.0% boost::corosio::detail::epoll_op::operator()() :139 47317x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::operator()() :145 4561x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::epoll_tcp_socket(boost::corosio::detail::epoll_tcp_service&) :150 13740x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::~epoll_tcp_socket() :155 13740x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :158 4561x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :169 118034x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :181 117884x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::cancel() :193 95x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::close_socket() :199 41191x 100.0% 100.0% boost::corosio::detail::epoll_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :205 4576x 94.4% 94.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/tcp_service.hpp>
19
20 #include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23
24 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25
26 #include <coroutine>
27
28 #include <errno.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <sys/epoll.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 /*
36 epoll Socket Implementation
37 ===========================
38
39 Each I/O operation follows the same pattern:
40 1. Try the syscall immediately (non-blocking socket)
41 2. If it succeeds or fails with a real error, post to completion queue
42 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43
44 This "try first" approach avoids unnecessary epoll round-trips for
45 operations that can complete immediately (common for small reads/writes
46 on fast local connections).
47
48 One-Shot Registration
49 ---------------------
50 We use one-shot epoll registration: each operation registers, waits for
51 one event, then unregisters. This simplifies the state machine since we
52 don't need to track whether an fd is currently registered or handle
53 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54 simplicity is worth it.
55
56 Cancellation
57 ------------
58 See op.hpp for the completion/cancellation race handling via the
59 `registered` atomic. cancel() must complete pending operations (post
60 them with cancelled flag) so coroutines waiting on them can resume.
61 close_socket() calls cancel() first to ensure this.
62
63 Impl Lifetime with shared_ptr
64 -----------------------------
65 Socket impls use enable_shared_from_this. The service owns impls via
66 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67 removal. When a user calls close(), we call cancel() which posts pending
68 ops to the scheduler.
69
70 CRITICAL: The posted ops must keep the impl alive until they complete.
71 Otherwise the scheduler would process a freed op (use-after-free). The
72 cancel() method captures shared_from_this() into op.impl_ptr before
73 posting. When the op completes, impl_ptr is cleared, allowing the impl
74 to be destroyed if no other references exist.
75
76 Service Ownership
77 -----------------
78 epoll_tcp_service owns all socket impls. destroy_impl() removes the
79 shared_ptr from the map, but the impl may survive if ops still hold
80 impl_ptr refs. shutdown() closes all sockets and clears the map; any
81 in-flight ops will complete and release their refs.
82 */
83
84 namespace boost::corosio::detail {
85
86 /** epoll TCP service implementation.
87
88 Inherits from tcp_service to enable runtime polymorphism.
89 Uses key_type = tcp_service for service lookup.
90 */
91 class BOOST_COROSIO_DECL epoll_tcp_service final
92 : public reactor_socket_service<
93 epoll_tcp_service,
94 tcp_service,
95 epoll_scheduler,
96 epoll_tcp_socket>
97 {
98 public:
99 270x explicit epoll_tcp_service(capy::execution_context& ctx)
100 270x : reactor_socket_service(ctx)
101 {
102 270x }
103
104 std::error_code open_socket(
105 tcp_socket::implementation& impl,
106 int family,
107 int type,
108 int protocol) override;
109 };
110
111 inline void
112 epoll_connect_op::cancel() noexcept
113 {
114 if (socket_impl_)
115 socket_impl_->cancel_single_op(*this);
116 else
117 request_cancel();
118 }
119
120 inline void
121 97x epoll_read_op::cancel() noexcept
122 {
123 97x if (socket_impl_)
124 97x socket_impl_->cancel_single_op(*this);
125 else
126 request_cancel();
127 97x }
128
129 inline void
130 epoll_write_op::cancel() noexcept
131 {
132 if (socket_impl_)
133 socket_impl_->cancel_single_op(*this);
134 else
135 request_cancel();
136 }
137
138 inline void
139 47317x epoll_op::operator()()
140 {
141 47317x complete_io_op(*this);
142 47317x }
143
144 inline void
145 4561x epoll_connect_op::operator()()
146 {
147 4561x complete_connect_op(*this);
148 4561x }
149
150 13740x inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
151 13740x : reactor_stream_socket(svc)
152 {
153 13740x }
154
155 13740x inline epoll_tcp_socket::~epoll_tcp_socket() = default;
156
157 inline std::coroutine_handle<>
158 4561x epoll_tcp_socket::connect(
159 std::coroutine_handle<> h,
160 capy::executor_ref ex,
161 endpoint ep,
162 std::stop_token token,
163 std::error_code* ec)
164 {
165 4561x return do_connect(h, ex, ep, token, ec);
166 }
167
168 inline std::coroutine_handle<>
169 118034x epoll_tcp_socket::read_some(
170 std::coroutine_handle<> h,
171 capy::executor_ref ex,
172 buffer_param param,
173 std::stop_token token,
174 std::error_code* ec,
175 std::size_t* bytes_out)
176 {
177 118034x return do_read_some(h, ex, param, token, ec, bytes_out);
178 }
179
180 inline std::coroutine_handle<>
181 117884x epoll_tcp_socket::write_some(
182 std::coroutine_handle<> h,
183 capy::executor_ref ex,
184 buffer_param param,
185 std::stop_token token,
186 std::error_code* ec,
187 std::size_t* bytes_out)
188 {
189 117884x return do_write_some(h, ex, param, token, ec, bytes_out);
190 }
191
192 inline void
193 95x epoll_tcp_socket::cancel() noexcept
194 {
195 95x do_cancel();
196 95x }
197
198 inline void
199 41191x epoll_tcp_socket::close_socket() noexcept
200 {
201 41191x do_close_socket();
202 41191x }
203
204 inline std::error_code
205 4576x epoll_tcp_service::open_socket(
206 tcp_socket::implementation& impl, int family, int type, int protocol)
207 {
208 4576x auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
209 4576x epoll_impl->close_socket();
210
211 4576x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
212 4576x if (fd < 0)
213 return make_err(errno);
214
215 4576x if (family == AF_INET6)
216 {
217 5x int one = 1;
218 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
219 }
220
221 4576x epoll_impl->fd_ = fd;
222
223 // Register fd with epoll (edge-triggered mode)
224 4576x epoll_impl->desc_state_.fd = fd;
225 {
226 4576x std::lock_guard lock(epoll_impl->desc_state_.mutex);
227 4576x epoll_impl->desc_state_.read_op = nullptr;
228 4576x epoll_impl->desc_state_.write_op = nullptr;
229 4576x epoll_impl->desc_state_.connect_op = nullptr;
230 4576x }
231 4576x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
232
233 4576x return {};
234 }
235
236 } // namespace boost::corosio::detail
237
238 #endif // BOOST_COROSIO_HAS_EPOLL
239
240 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
241