1 -
//
 
2 -
// Copyright (c) 2026 Steve Gerbino
 
3 -
//
 
4 -
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 -
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 -
//
 
7 -
// Official repository: https://github.com/cppalliance/corosio
 
8 -
//
 
9 -

 
10 -
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
 
11 -
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
 
12 -

 
13 -
#include <boost/corosio/detail/platform.hpp>
 
14 -

 
15 -
#if BOOST_COROSIO_HAS_SELECT
 
16 -

 
17 -
#include <boost/corosio/detail/config.hpp>
 
18 -
#include <boost/capy/ex/execution_context.hpp>
 
19 -
#include <boost/corosio/detail/socket_service.hpp>
 
20 -

 
21 -
#include <boost/corosio/native/detail/select/select_socket.hpp>
 
22 -
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
 
23 -
#include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
 
24 -

 
25 -
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
 
26 -

 
27 -
#include <coroutine>
 
28 -
#include <mutex>
 
29 -
#include <utility>
 
30 -

 
31 -
#include <errno.h>
 
32 -
#include <fcntl.h>
 
33 -
#include <netinet/in.h>
 
34 -
#include <netinet/tcp.h>
 
35 -
#include <sys/select.h>
 
36 -
#include <sys/socket.h>
 
37 -
#include <unistd.h>
 
38 -

 
39 -
/*
 
40 -
    Each I/O op tries the syscall speculatively; only registers with
 
41 -
    the reactor on EAGAIN. Fd is registered once at open time and
 
42 -
    stays registered until close. The reactor only marks ready_events_;
 
43 -
    actual I/O happens in invoke_deferred_io(). cancel() captures
 
44 -
    shared_from_this() into op.impl_ptr to keep the impl alive.
 
45 -
*/
 
46 -

 
47 -
namespace boost::corosio::detail {
 
48 -

 
49 -
/// State for select socket service.
 
50 -
using select_socket_state =
 
51 -
    reactor_service_state<select_scheduler, select_socket>;
 
52 -

 
53 -
/** select socket service implementation.
 
54 -

 
55 -
    Inherits from socket_service to enable runtime polymorphism.
 
56 -
    Uses key_type = socket_service for service lookup.
 
57 -
*/
 
58 -
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
 
59 -
{
 
60 -
public:
 
61 -
    explicit select_socket_service(capy::execution_context& ctx);
 
62 -
    ~select_socket_service() override;
 
63 -

 
64 -
    select_socket_service(select_socket_service const&)            = delete;
 
65 -
    select_socket_service& operator=(select_socket_service const&) = delete;
 
66 -

 
67 -
    void shutdown() override;
 
68 -

 
69 -
    io_object::implementation* construct() override;
 
70 -
    void destroy(io_object::implementation*) override;
 
71 -
    void close(io_object::handle&) override;
 
72 -
    std::error_code open_socket(
 
73 -
        tcp_socket::implementation& impl,
 
74 -
        int family,
 
75 -
        int type,
 
76 -
        int protocol) override;
 
77 -

 
78 -
    select_scheduler& scheduler() const noexcept
 
79 -
    {
 
80 -
        return state_->sched_;
 
81 -
    }
 
82 -
    void post(scheduler_op* op);
 
83 -
    void work_started() noexcept;
 
84 -
    void work_finished() noexcept;
 
85 -

 
86 -
private:
 
87 -
    std::unique_ptr<select_socket_state> state_;
 
88 -
};
 
89 -

 
90 -
inline void
 
91 -
select_connect_op::cancel() noexcept
 
92 -
{
 
93 -
    if (socket_impl_)
 
94 -
        socket_impl_->cancel_single_op(*this);
 
95 -
    else
 
96 -
        request_cancel();
 
97 -
}
 
98 -

 
99 -
inline void
 
100 -
select_read_op::cancel() noexcept
 
101 -
{
 
102 -
    if (socket_impl_)
 
103 -
        socket_impl_->cancel_single_op(*this);
 
104 -
    else
 
105 -
        request_cancel();
 
106 -
}
 
107 -

 
108 -
inline void
 
109 -
select_write_op::cancel() noexcept
 
110 -
{
 
111 -
    if (socket_impl_)
 
112 -
        socket_impl_->cancel_single_op(*this);
 
113 -
    else
 
114 -
        request_cancel();
 
115 -
}
 
116 -

 
117 -
inline void
 
118 -
select_op::operator()()
 
119 -
{
 
120 -
    complete_io_op(*this);
 
121 -
}
 
122 -

 
123 -
inline void
 
124 -
select_connect_op::operator()()
 
125 -
{
 
126 -
    complete_connect_op(*this);
 
127 -
}
 
128 -

 
129 -
inline select_socket::select_socket(select_socket_service& svc) noexcept
 
130 -
    : reactor_socket(svc)
 
131 -
{
 
132 -
}
 
133 -

 
134 -
inline select_socket::~select_socket() = default;
 
135 -

 
136 -
inline std::coroutine_handle<>
 
137 -
select_socket::connect(
 
138 -
    std::coroutine_handle<> h,
 
139 -
    capy::executor_ref ex,
 
140 -
    endpoint ep,
 
141 -
    std::stop_token token,
 
142 -
    std::error_code* ec)
 
143 -
{
 
144 -
    auto result = do_connect(h, ex, ep, token, ec);
 
145 -
    // Rebuild fd_sets so select() watches for writability
 
146 -
    if (result == std::noop_coroutine())
 
147 -
        svc_.scheduler().notify_reactor();
 
148 -
    return result;
 
149 -
}
 
150 -

 
151 -
inline std::coroutine_handle<>
 
152 -
select_socket::read_some(
 
153 -
    std::coroutine_handle<> h,
 
154 -
    capy::executor_ref ex,
 
155 -
    buffer_param param,
 
156 -
    std::stop_token token,
 
157 -
    std::error_code* ec,
 
158 -
    std::size_t* bytes_out)
 
159 -
{
 
160 -
    return do_read_some(h, ex, param, token, ec, bytes_out);
 
161 -
}
 
162 -

 
163 -
inline std::coroutine_handle<>
 
164 -
select_socket::write_some(
 
165 -
    std::coroutine_handle<> h,
 
166 -
    capy::executor_ref ex,
 
167 -
    buffer_param param,
 
168 -
    std::stop_token token,
 
169 -
    std::error_code* ec,
 
170 -
    std::size_t* bytes_out)
 
171 -
{
 
172 -
    auto result = do_write_some(h, ex, param, token, ec, bytes_out);
 
173 -
    // Rebuild fd_sets so select() watches for writability
 
174 -
    if (result == std::noop_coroutine())
 
175 -
        svc_.scheduler().notify_reactor();
 
176 -
    return result;
 
177 -
}
 
178 -

 
179 -
inline void
 
180 -
select_socket::cancel() noexcept
 
181 -
{
 
182 -
    do_cancel();
 
183 -
}
 
184 -

 
185 -
inline void
 
186 -
select_socket::close_socket() noexcept
 
187 -
{
 
188 -
    do_close_socket();
 
189 -
}
 
190 -

 
191 -
inline select_socket_service::select_socket_service(
 
192 -
    capy::execution_context& ctx)
 
193 -
    : state_(
 
194 -
          std::make_unique<select_socket_state>(
 
195 -
              ctx.use_service<select_scheduler>()))
 
196 -
{
 
197 -
}
 
198 -

 
199 -
inline select_socket_service::~select_socket_service() {}
 
200 -

 
201 -
inline void
 
202 -
select_socket_service::shutdown()
 
203 -
{
 
204 -
    std::lock_guard lock(state_->mutex_);
 
205 -

 
206 -
    while (auto* impl = state_->impl_list_.pop_front())
 
207 -
        impl->close_socket();
 
208 -

 
209 -
    // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
 
210 -
    // drains completed_ops_, calling destroy() on each queued op. Letting
 
211 -
    // ~state_ release the ptrs (during service destruction, after scheduler
 
212 -
    // shutdown) keeps every impl alive until all ops have been drained.
 
213 -
}
 
214 -

 
215 -
inline io_object::implementation*
 
216 -
select_socket_service::construct()
 
217 -
{
 
218 -
    auto impl = std::make_shared<select_socket>(*this);
 
219 -
    auto* raw = impl.get();
 
220 -

 
221 -
    {
 
222 -
        std::lock_guard lock(state_->mutex_);
 
223 -
        state_->impl_ptrs_.emplace(raw, std::move(impl));
 
224 -
        state_->impl_list_.push_back(raw);
 
225 -
    }
 
226 -

 
227 -
    return raw;
 
228 -
}
 
229 -

 
230 -
inline void
 
231 -
select_socket_service::destroy(io_object::implementation* impl)
 
232 -
{
 
233 -
    auto* select_impl = static_cast<select_socket*>(impl);
 
234 -
    select_impl->close_socket();
 
235 -
    std::lock_guard lock(state_->mutex_);
 
236 -
    state_->impl_list_.remove(select_impl);
 
237 -
    state_->impl_ptrs_.erase(select_impl);
 
238 -
}
 
239 -

 
240 -
inline std::error_code
 
241 -
select_socket_service::open_socket(
 
242 -
    tcp_socket::implementation& impl, int family, int type, int protocol)
 
243 -
{
 
244 -
    auto* select_impl = static_cast<select_socket*>(&impl);
 
245 -
    select_impl->close_socket();
 
246 -

 
247 -
    int fd = ::socket(family, type, protocol);
 
248 -
    if (fd < 0)
 
249 -
        return make_err(errno);
 
250 -

 
251 -
    if (family == AF_INET6)
 
252 -
    {
 
253 -
        int one = 1;
 
254 -
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
 
255 -
    }
 
256 -

 
257 -
    int flags = ::fcntl(fd, F_GETFL, 0);
 
258 -
    if (flags == -1)
 
259 -
    {
 
260 -
        int errn = errno;
 
261 -
        ::close(fd);
 
262 -
        return make_err(errn);
 
263 -
    }
 
264 -
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
 
265 -
    {
 
266 -
        int errn = errno;
 
267 -
        ::close(fd);
 
268 -
        return make_err(errn);
 
269 -
    }
 
270 -
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
 
271 -
    {
 
272 -
        int errn = errno;
 
273 -
        ::close(fd);
 
274 -
        return make_err(errn);
 
275 -
    }
 
276 -

 
277 -
    if (fd >= FD_SETSIZE)
 
278 -
    {
 
279 -
        ::close(fd);
 
280 -
        return make_err(EMFILE);
 
281 -
    }
 
282 -

 
283 -
#ifdef SO_NOSIGPIPE
 
284 -
    {
 
285 -
        int one = 1;
 
286 -
        ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
 
287 -
    }
 
288 -
#endif
 
289 -

 
290 -
    select_impl->fd_ = fd;
 
291 -

 
292 -
    select_impl->desc_state_.fd = fd;
 
293 -
    {
 
294 -
        std::lock_guard lock(select_impl->desc_state_.mutex);
 
295 -
        select_impl->desc_state_.read_op    = nullptr;
 
296 -
        select_impl->desc_state_.write_op   = nullptr;
 
297 -
        select_impl->desc_state_.connect_op = nullptr;
 
298 -
    }
 
299 -
    scheduler().register_descriptor(fd, &select_impl->desc_state_);
 
300 -

 
301 -
    return {};
 
302 -
}
 
303 -

 
304 -
inline void
 
305 -
select_socket_service::close(io_object::handle& h)
 
306 -
{
 
307 -
    static_cast<select_socket*>(h.get())->close_socket();
 
308 -
}
 
309 -

 
310 -
inline void
 
311 -
select_socket_service::post(scheduler_op* op)
 
312 -
{
 
313 -
    state_->sched_.post(op);
 
314 -
}
 
315 -

 
316 -
inline void
 
317 -
select_socket_service::work_started() noexcept
 
318 -
{
 
319 -
    state_->sched_.work_started();
 
320 -
}
 
321 -

 
322 -
inline void
 
323 -
select_socket_service::work_finished() noexcept
 
324 -
{
 
325 -
    state_->sched_.work_finished();
 
326 -
}
 
327 -

 
328 -
} // namespace boost::corosio::detail
 
329 -

 
330 -
#endif // BOOST_COROSIO_HAS_SELECT
 
331 -

 
332 -
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP