Skip to content

Commit a2a2bbe

Browse files
committed
add implementations for eventloop functions sock_*
1 parent 5747a3a commit a2a2bbe

File tree

2 files changed

+190
-14
lines changed

2 files changed

+190
-14
lines changed

include/boost/python/eventloop.hpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,21 @@ class event_loop
6767
}
6868

6969

70-
void sock_recv(object sock, int bytes);
70+
object sock_recv(object sock, size_t nbytes);
7171

72-
void sock_recv_into(object sock, object buffer);
72+
size_t sock_recv_into(object sock, object buffer);
7373

74-
void sock_sendall(object sock, object data);
74+
object sock_sendall(object sock, object data);
7575

7676
void sock_connect(object sock, object address);
7777

78-
void sock_accept(object sock);
78+
object sock_accept(object sock);
7979

8080
void sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true);
8181

8282
private:
8383
int64_t _timer_id = 0;
84+
object _pymod_socket = import("socket");
8485
boost::asio::io_context::strand _strand;
8586
std::unordered_map<int, std::unique_ptr<boost::asio::steady_timer>> _id_to_timer_map;
8687
// read: key = fd * 2 + 0, write: key = fd * 2 + 1

src/eventloop.cpp

+185-10
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,121 @@
66
// TODO:
77
// 1. posix::stream_descriptor need windows version
88
// 2. call_* need return async.Handle
9+
// 3. _ensure_fd_no_transport
10+
// 4. _ensure_resolve
911

1012
#include <boost/asio.hpp>
1113
#include <boost/bind.hpp>
1214
#include <boost/python.hpp>
1315
#include <boost/python/eventloop.hpp>
16+
#include <boost/mpl/vector.hpp>
17+
#include <Python.h>
1418

1519

1620
namespace boost { namespace python { namespace asio {
21+
namespace
22+
{
23+
24+
bool _hasattr(object o, const char* name)
25+
{
26+
return PyObject_HasAttrString(o.ptr(), name);
27+
}
28+
29+
void _sock_recv_handler(
30+
std::promise<std::vector<char>>& prom_data,
31+
std::promise<size_t>& prom_nbytes_read,
32+
size_t nbytes,
33+
int fd)
34+
{
35+
std::vector<char> buffer(nbytes);
36+
prom_nbytes_read.set_value(read(fd, buffer.data(), nbytes));
37+
prom_data.set_value(std::move(buffer));
38+
}
39+
40+
void _sock_send_handler(std::promise<size_t>& prom, int fd, const char *py_str, ssize_t len)
41+
{
42+
size_t nwrite = write(fd, py_str, len);
43+
prom.set_value(nwrite);
44+
}
45+
46+
void _sock_connect_cb(object pymod_socket, std::promise<void>& prom, std::future<void>& fut, object sock, object addr)
47+
{
48+
try
49+
{
50+
object err = sock.attr("getsockopt")(
51+
pymod_socket.attr("SOL_SOCKET"), pymod_socket.attr("SO_ERROR"));
52+
if (err != object(0)) {
53+
// TODO: print the address
54+
PyErr_SetString(PyExc_OSError, "Connect call failed {address}");
55+
}
56+
}
57+
catch (const error_already_set& e)
58+
{
59+
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
60+
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
61+
{
62+
PyErr_Clear();
63+
// pass
64+
}
65+
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
66+
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
67+
{
68+
// raise
69+
}
70+
else if (PyErr_ExceptionMatches(PyExc_BaseException))
71+
{
72+
PyErr_Clear();
73+
prom.set_exception(std::current_exception());
74+
}
75+
else
76+
{
77+
PyErr_Clear();
78+
prom.set_value();
79+
}
80+
}
81+
}
82+
83+
void _sock_accept(event_loop& loop, std::promise<object>& prom, std::future<object>& fut, object sock)
84+
{
85+
int fd = extract<int>(sock.attr("fileno")());
86+
object conn;
87+
object address;
88+
try
89+
{
90+
object ret = sock.attr("accept")();
91+
conn = ret[0];
92+
address = ret[1];
93+
conn.attr("setblocking")(object(false));
94+
}
95+
catch (const error_already_set& e)
96+
{
97+
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
98+
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
99+
{
100+
PyErr_Clear();
101+
loop.add_reader(fd, make_function(bind(
102+
_sock_accept, boost::ref(loop), boost::ref(prom), boost::ref(fut), sock),
103+
default_call_policies(), boost::mpl::vector<void, object>()));
104+
}
105+
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
106+
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
107+
{
108+
// raise
109+
}
110+
else if (PyErr_ExceptionMatches(PyExc_BaseException))
111+
{
112+
PyErr_Clear();
113+
prom.set_exception(std::current_exception());
114+
}
115+
else
116+
{
117+
PyErr_Clear();
118+
prom.set_value(make_tuple(conn, address));
119+
}
120+
}
121+
}
122+
123+
}
17124

18125
void event_loop::_add_reader_or_writer(int fd, object f, int key)
19126
{
@@ -76,34 +183,102 @@ void event_loop::call_at(double when, object f)
76183
return call_soon(f);
77184
}
78185

79-
void event_loop::sock_recv(object sock, int bytes)
186+
object event_loop::sock_recv(object sock, size_t nbytes)
80187
{
81-
188+
int fd = extract<int>(sock.attr("fileno")());
189+
std::promise<std::vector<char>> prom_data;
190+
std::future<std::vector<char>> fut_data = prom_data.get_future();
191+
std::promise<size_t> prom_nbytes_read;
192+
std::future<size_t> fut_nbytes_read = prom_nbytes_read.get_future();
193+
add_reader(fd, make_function(bind(_sock_recv_handler,
194+
boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd),
195+
default_call_policies(), boost::mpl::vector<void, object>()));
196+
return object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes)));
82197
}
83198

84-
void event_loop::sock_recv_into(object sock, object buffer)
199+
size_t event_loop::sock_recv_into(object sock, object buffer)
85200
{
86-
201+
int fd = extract<int>(sock.attr("fileno")());
202+
ssize_t nbytes = len(buffer);
203+
std::promise<std::vector<char>> prom_data;
204+
std::future<std::vector<char>> fut_data = prom_data.get_future();
205+
std::promise<size_t> prom_nbytes_read;
206+
std::future<size_t> fut_nbytes_read = prom_nbytes_read.get_future();
207+
add_reader(fd, make_function(bind(_sock_recv_handler,
208+
boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd),
209+
default_call_policies(), boost::mpl::vector<void, object>()));
210+
buffer = object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes)));
211+
return fut_nbytes_read.get();
87212
}
88213

89-
void event_loop::sock_sendall(object sock, object data)
214+
object event_loop::sock_sendall(object sock, object data)
90215
{
91-
216+
int fd = extract<int>(sock.attr("fileno")());
217+
char const* py_str = extract<char const*>(data.attr("decode")());
218+
ssize_t py_str_len = len(data);
219+
std::promise<size_t> prom;
220+
std::future<size_t> fut = prom.get_future();
221+
add_writer(fd, make_function(bind(_sock_send_handler, std::ref(prom), fd, py_str, py_str_len),
222+
default_call_policies(), boost::mpl::vector<void, object>()));
223+
fut.wait();
224+
return object();
92225
}
93226

94227
void event_loop::sock_connect(object sock, object address)
95228
{
96-
229+
230+
if (!_hasattr(_pymod_socket, "AF_UNIX") || sock.attr("family") != _pymod_socket.attr("AF_UNIX"))
231+
{
232+
// TODO: _ensure_resolve
233+
}
234+
std::promise<void> prom;
235+
std::future<void> fut = prom.get_future();
236+
int fd = extract<int>(sock.attr("fileno")());
237+
try
238+
{
239+
sock.attr("connect")(address);
240+
}
241+
catch (const error_already_set& e)
242+
{
243+
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
244+
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
245+
{
246+
PyErr_Clear();
247+
add_writer(fd, make_function(bind(
248+
_sock_connect_cb, _pymod_socket, boost::ref(prom), boost::ref(fut), sock, address),
249+
default_call_policies(), boost::mpl::vector<void, object>()));
250+
}
251+
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
252+
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
253+
{
254+
// raise
255+
}
256+
else if (PyErr_ExceptionMatches(PyExc_BaseException))
257+
{
258+
PyErr_Clear();
259+
prom.set_exception(std::current_exception());
260+
}
261+
else
262+
{
263+
PyErr_Clear();
264+
prom.set_value();
265+
}
266+
}
267+
fut.wait();
97268
}
98269

99-
void event_loop::sock_accept(object sock)
270+
object event_loop::sock_accept(object sock)
100271
{
101-
272+
std::promise<object> prom;
273+
std::future<object> fut = prom.get_future();
274+
_sock_accept(*this, prom, fut, sock);
275+
return fut.get();
102276
}
103277

278+
// TODO: implement this
104279
void event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback)
105280
{
106-
281+
PyErr_SetString(PyExc_NotImplementedError, "Not implemented!");
107282
}
108283

109284
}}}

0 commit comments

Comments
 (0)