TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12 :
13 : #include <boost/corosio/detail/dispatch_coro.hpp>
14 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
15 : #include <boost/corosio/native/detail/make_err.hpp>
16 : #include <boost/corosio/io/io_object.hpp>
17 :
18 : #include <coroutine>
19 : #include <mutex>
20 : #include <utility>
21 :
22 : #include <netinet/in.h>
23 : #include <sys/socket.h>
24 : #include <unistd.h>
25 :
26 : namespace boost::corosio::detail {
27 :
28 : /** Complete a base read/write operation.
29 :
30 : Translates the recorded errno and cancellation state into
31 : an error_code, stores the byte count, then resumes the
32 : caller via symmetric transfer.
33 :
34 : @tparam Op The concrete operation type.
35 : @param op The operation to complete.
36 : */
37 : template<typename Op>
38 : void
39 HIT 88639 : complete_io_op(Op& op)
40 : {
41 88639 : op.stop_cb.reset();
42 88639 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
43 :
44 88639 : if (op.cancelled.load(std::memory_order_acquire))
45 307 : *op.ec_out = capy::error::canceled;
46 88332 : else if (op.errn != 0)
47 MIS 0 : *op.ec_out = make_err(op.errn);
48 HIT 88332 : else if (op.is_read_operation() && op.bytes_transferred == 0)
49 MIS 0 : *op.ec_out = capy::error::eof;
50 : else
51 HIT 88332 : *op.ec_out = {};
52 :
53 88639 : *op.bytes_out = op.bytes_transferred;
54 :
55 88639 : capy::executor_ref saved_ex(op.ex);
56 88639 : std::coroutine_handle<> saved_h(op.h);
57 88639 : auto prevent = std::move(op.impl_ptr);
58 88639 : dispatch_coro(saved_ex, saved_h).resume();
59 88639 : }
60 :
61 : /** Complete a connect operation with endpoint caching.
62 :
63 : On success, queries the local endpoint via getsockname and
64 : caches both endpoints in the socket impl. Then resumes the
65 : caller via symmetric transfer.
66 :
67 : @tparam Op The concrete connect operation type.
68 : @param op The operation to complete.
69 : */
70 : template<typename Op>
71 : void
72 7993 : complete_connect_op(Op& op)
73 : {
74 7993 : op.stop_cb.reset();
75 7993 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
76 :
77 7993 : bool success =
78 7993 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
79 :
80 7993 : if (success && op.socket_impl_)
81 : {
82 7989 : endpoint local_ep;
83 7989 : sockaddr_storage local_storage{};
84 7989 : socklen_t local_len = sizeof(local_storage);
85 7989 : if (::getsockname(
86 : op.fd, reinterpret_cast<sockaddr*>(&local_storage),
87 7989 : &local_len) == 0)
88 7989 : local_ep = from_sockaddr(local_storage);
89 7989 : op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
90 : }
91 :
92 7993 : if (op.cancelled.load(std::memory_order_acquire))
93 MIS 0 : *op.ec_out = capy::error::canceled;
94 HIT 7993 : else if (op.errn != 0)
95 4 : *op.ec_out = make_err(op.errn);
96 : else
97 7989 : *op.ec_out = {};
98 :
99 7993 : capy::executor_ref saved_ex(op.ex);
100 7993 : std::coroutine_handle<> saved_h(op.h);
101 7993 : auto prevent = std::move(op.impl_ptr);
102 7993 : dispatch_coro(saved_ex, saved_h).resume();
103 7993 : }
104 :
105 : /** Construct and register a peer socket from an accepted fd.
106 :
107 : Creates a new socket impl via the acceptor's associated
108 : socket service, registers it with the scheduler, and caches
109 : the local and remote endpoints.
110 :
111 : @tparam SocketImpl The concrete socket implementation type.
112 : @tparam AcceptorImpl The concrete acceptor implementation type.
113 : @param acceptor_impl The acceptor that accepted the connection.
114 : @param accepted_fd The accepted file descriptor (set to -1 on success).
115 : @param peer_storage The peer address from accept().
116 : @param impl_out Output pointer for the new socket impl.
117 : @param ec_out Output pointer for any error.
118 : @return True on success, false on failure.
119 : */
120 : template<typename SocketImpl, typename AcceptorImpl>
121 : bool
122 7979 : setup_accepted_socket(
123 : AcceptorImpl* acceptor_impl,
124 : int& accepted_fd,
125 : sockaddr_storage const& peer_storage,
126 : io_object::implementation** impl_out,
127 : std::error_code* ec_out)
128 : {
129 7979 : auto* socket_svc = acceptor_impl->service().tcp_service();
130 7979 : if (!socket_svc)
131 : {
132 MIS 0 : *ec_out = make_err(ENOENT);
133 0 : return false;
134 : }
135 :
136 HIT 7979 : auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
137 7979 : impl.set_socket(accepted_fd);
138 :
139 7979 : impl.desc_state_.fd = accepted_fd;
140 : {
141 7979 : std::lock_guard lock(impl.desc_state_.mutex);
142 7979 : impl.desc_state_.read_op = nullptr;
143 7979 : impl.desc_state_.write_op = nullptr;
144 7979 : impl.desc_state_.connect_op = nullptr;
145 7979 : }
146 7979 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
147 :
148 7979 : impl.set_endpoints(
149 : acceptor_impl->local_endpoint(), from_sockaddr(peer_storage));
150 :
151 7979 : if (impl_out)
152 7979 : *impl_out = &impl;
153 7979 : accepted_fd = -1;
154 7979 : return true;
155 : }
156 :
157 : /** Complete an accept operation.
158 :
159 : Sets up the peer socket on success, or closes the accepted
160 : fd on failure. Then resumes the caller via symmetric transfer.
161 :
162 : @tparam SocketImpl The concrete socket implementation type.
163 : @tparam Op The concrete accept operation type.
164 : @param op The operation to complete.
165 : */
166 : template<typename SocketImpl, typename Op>
167 : void
168 7991 : complete_accept_op(Op& op)
169 : {
170 7991 : op.stop_cb.reset();
171 7991 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
172 :
173 7991 : bool success =
174 7991 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
175 :
176 7991 : if (op.cancelled.load(std::memory_order_acquire))
177 12 : *op.ec_out = capy::error::canceled;
178 7979 : else if (op.errn != 0)
179 MIS 0 : *op.ec_out = make_err(op.errn);
180 : else
181 HIT 7979 : *op.ec_out = {};
182 :
183 7991 : if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
184 : {
185 7979 : if (!setup_accepted_socket<SocketImpl>(
186 7979 : op.acceptor_impl_, op.accepted_fd, op.peer_storage, op.impl_out,
187 : op.ec_out))
188 MIS 0 : success = false;
189 : }
190 :
191 HIT 7991 : if (!success || !op.acceptor_impl_)
192 : {
193 12 : if (op.accepted_fd >= 0)
194 : {
195 MIS 0 : ::close(op.accepted_fd);
196 0 : op.accepted_fd = -1;
197 : }
198 HIT 12 : if (op.impl_out)
199 12 : *op.impl_out = nullptr;
200 : }
201 :
202 7991 : capy::executor_ref saved_ex(op.ex);
203 7991 : std::coroutine_handle<> saved_h(op.h);
204 7991 : auto prevent = std::move(op.impl_ptr);
205 7991 : dispatch_coro(saved_ex, saved_h).resume();
206 7991 : }
207 :
208 : /** Complete a datagram operation (send_to or recv_from).
209 :
210 : For recv_from operations, writes the source endpoint from the
211 : recorded sockaddr_storage into the caller's endpoint pointer.
212 : Then resumes the caller via symmetric transfer.
213 :
214 : @tparam Op The concrete datagram operation type.
215 : @param op The operation to complete.
216 : @param source_out Optional pointer to store source endpoint
217 : (non-null for recv_from, null for send_to).
218 : */
219 : template<typename Op>
220 : void
221 14 : complete_datagram_op(Op& op, endpoint* source_out)
222 : {
223 14 : op.stop_cb.reset();
224 14 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
225 :
226 14 : if (op.cancelled.load(std::memory_order_acquire))
227 6 : *op.ec_out = capy::error::canceled;
228 8 : else if (op.errn != 0)
229 MIS 0 : *op.ec_out = make_err(op.errn);
230 : else
231 HIT 8 : *op.ec_out = {};
232 :
233 14 : *op.bytes_out = op.bytes_transferred;
234 :
235 20 : if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
236 6 : op.errn == 0)
237 6 : *source_out = from_sockaddr(op.source_storage);
238 :
239 14 : capy::executor_ref saved_ex(op.ex);
240 14 : std::coroutine_handle<> saved_h(op.h);
241 14 : auto prevent = std::move(op.impl_ptr);
242 14 : dispatch_coro(saved_ex, saved_h).resume();
243 14 : }
244 :
245 : } // namespace boost::corosio::detail
246 :
247 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
|