Skip to content

Commit aebe59e

Browse files
committed
loop: Add support for reuse_port in create_server/create_datagram
1 parent 1d5ae0b commit aebe59e

File tree

9 files changed

+57
-10
lines changed

9 files changed

+57
-10
lines changed

tests/test_tcp.py

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ async def start_server():
6666
srv = await asyncio.start_server(
6767
handle_client,
6868
addrs, 0,
69+
reuse_port=getattr(socket, 'SO_REUSEPORT', None),
6970
family=socket.AF_INET,
7071
loop=self.loop)
7172

tests/test_udp.py

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def datagram_received(self, data, addr):
6868
coro = self.loop.create_datagram_endpoint(
6969
lambda: MyDatagramProto(loop=self.loop),
7070
family=socket.AF_INET,
71+
reuse_port=getattr(socket, 'SO_REUSEPORT', None),
7172
remote_addr=None if lc is None else (host, port))
7273
transport, client = self.loop.run_until_complete(coro)
7374

uvloop/handles/tcp.pxd

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ cdef class TCPServer(UVStreamServer):
44

55
@staticmethod
66
cdef TCPServer new(Loop loop, object protocol_factory, Server server,
7-
object ssl)
7+
object ssl, unsigned int flags)
88

99

1010
cdef class TCPTransport(UVStream):

uvloop/handles/tcp.pyx

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
cdef __tcp_init_uv_handle(UVStream handle, Loop loop):
1+
cdef __tcp_init_uv_handle(UVStream handle, Loop loop, unsigned int flags):
22
cdef int err
33

44
handle._handle = <uv.uv_handle_t*> \
@@ -7,7 +7,9 @@ cdef __tcp_init_uv_handle(UVStream handle, Loop loop):
77
handle._abort_init()
88
raise MemoryError()
99

10-
err = uv.uv_tcp_init(handle._loop.uvloop, <uv.uv_tcp_t*>handle._handle)
10+
err = uv.uv_tcp_init_ex(handle._loop.uvloop,
11+
<uv.uv_tcp_t*>handle._handle,
12+
flags)
1113
if err < 0:
1214
handle._abort_init()
1315
raise convert_error(err)
@@ -56,12 +58,12 @@ cdef class TCPServer(UVStreamServer):
5658

5759
@staticmethod
5860
cdef TCPServer new(Loop loop, object protocol_factory, Server server,
59-
object ssl):
61+
object ssl, unsigned int flags):
6062

6163
cdef TCPServer handle
6264
handle = TCPServer.__new__(TCPServer)
6365
handle._init(loop, protocol_factory, server, ssl)
64-
__tcp_init_uv_handle(<UVStream>handle, loop)
66+
__tcp_init_uv_handle(<UVStream>handle, loop, flags)
6567
return handle
6668

6769
cdef _new_socket(self):
@@ -101,7 +103,7 @@ cdef class TCPTransport(UVStream):
101103
cdef TCPTransport handle
102104
handle = TCPTransport.__new__(TCPTransport)
103105
handle._init(loop, protocol, server, waiter)
104-
__tcp_init_uv_handle(<UVStream>handle, loop)
106+
__tcp_init_uv_handle(<UVStream>handle, loop, uv.AF_UNSPEC)
105107
handle.__peername_set = 0
106108
handle.__sockname_set = 0
107109
return handle

uvloop/includes/stdlib.pxi

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ cdef iter_chain = itertools.chain
4646
cdef int has_AF_INET6 = hasattr(socket, 'AF_INET6')
4747
cdef int has_SO_REUSEPORT = hasattr(socket, 'SO_REUSEPORT')
4848
cdef int has_IPPROTO_IPV6 = hasattr(socket, 'IPPROTO_IPV6')
49+
cdef int SO_REUSEPORT = getattr(socket, 'SO_REUSEPORT', 0)
4950

5051
cdef socket_gaierror = socket.gaierror
5152
cdef socket_error = socket.error

uvloop/includes/system.pxd

+3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ cdef extern from "sys/socket.h" nogil:
4343

4444
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
4545

46+
int setsockopt(int socket, int level, int option_name,
47+
const void *option_value, int option_len)
48+
4649

4750
cdef extern from "errno.h" nogil:
4851

uvloop/includes/uv.pxd

+1-1
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ cdef extern from "../vendor/libuv/include/uv.h" nogil:
319319

320320
# TCP
321321

322-
int uv_tcp_init(uv_loop_t*, uv_tcp_t* handle)
322+
int uv_tcp_init_ex(uv_loop_t*, uv_tcp_t* handle, unsigned int flags)
323323
int uv_tcp_nodelay(uv_tcp_t* handle, int enable)
324324
int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay)
325325
int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock)

uvloop/loop.pxd

+2
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ cdef class Loop:
159159
cdef _sock_connect(self, fut, sock, address)
160160
cdef _sock_connect_cb(self, fut, sock, address)
161161

162+
cdef _sock_set_reuseport(self, int fd)
163+
162164
cdef _set_coroutine_wrapper(self, bint enabled)
163165

164166

uvloop/loop.pyx

+40-3
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,21 @@ cdef class Loop:
689689
else:
690690
fut.set_result(None)
691691

692+
cdef _sock_set_reuseport(self, int fd):
693+
cdef:
694+
int err
695+
int reuseport_flag = 1
696+
697+
err = system.setsockopt(
698+
fd,
699+
uv.SOL_SOCKET,
700+
SO_REUSEPORT,
701+
<char*>&reuseport_flag,
702+
sizeof(reuseport_flag))
703+
704+
if err < 0:
705+
raise convert_error(-errno.errno)
706+
692707
cdef _set_coroutine_wrapper(self, bint enabled):
693708
enabled = bool(enabled)
694709
if self._coroutine_wrapper_set == enabled:
@@ -1003,7 +1018,7 @@ cdef class Loop:
10031018
int backlog=100,
10041019
ssl=None,
10051020
reuse_address=None, # ignored, libuv sets it
1006-
reuse_port=None): # ignored
1021+
reuse_port=None):
10071022

10081023
cdef:
10091024
TCPServer tcp
@@ -1018,6 +1033,11 @@ cdef class Loop:
10181033
raise ValueError(
10191034
'host/port and sock can not be specified at the same time')
10201035

1036+
reuse_port = bool(reuse_port)
1037+
if reuse_port and not has_SO_REUSEPORT:
1038+
raise ValueError(
1039+
'reuse_port not supported by socket module')
1040+
10211041
if host == '':
10221042
hosts = [None]
10231043
elif (isinstance(host, str) or not isinstance(host, col_Iterable)):
@@ -1036,8 +1056,15 @@ cdef class Loop:
10361056
for info in infos:
10371057
addrinfo = (<AddrInfo>info).data
10381058
while addrinfo != NULL:
1059+
if addrinfo.ai_family == uv.AF_UNSPEC:
1060+
raise RuntimeError('AF_UNSPEC in DNS results')
1061+
10391062
tcp = TCPServer.new(
1040-
self, protocol_factory, server, ssl)
1063+
self, protocol_factory, server, ssl,
1064+
addrinfo.ai_family)
1065+
1066+
if reuse_port:
1067+
self._sock_set_reuseport(tcp._fileno())
10411068

10421069
try:
10431070
tcp.bind(addrinfo.ai_addr)
@@ -1057,7 +1084,8 @@ cdef class Loop:
10571084
else:
10581085
if sock is None:
10591086
raise ValueError('Neither host/port nor sock were specified')
1060-
tcp = TCPServer.new(self, protocol_factory, server, ssl)
1087+
tcp = TCPServer.new(self, protocol_factory, server, ssl,
1088+
uv.AF_UNSPEC)
10611089
fileno = os_dup(sock.fileno())
10621090
try:
10631091
tcp.open(fileno)
@@ -1684,6 +1712,10 @@ cdef class Loop:
16841712
udp._attach_fileobj(sock)
16851713
else:
16861714
reuse_address = bool(reuse_address)
1715+
reuse_port = bool(reuse_port)
1716+
if reuse_port and not has_SO_REUSEPORT:
1717+
raise ValueError(
1718+
'reuse_port not supported by socket module')
16871719

16881720
lads = None
16891721
if local_addr is not None:
@@ -1720,6 +1752,9 @@ cdef class Loop:
17201752
udp = UDPTransport.__new__(UDPTransport)
17211753
udp._init(self, family)
17221754

1755+
if reuse_port:
1756+
self._sock_set_reuseport(udp._fileno())
1757+
17231758
socket = udp._get_socket()
17241759
socket.bind(('0.0.0.0', 0))
17251760
else:
@@ -1728,6 +1763,8 @@ cdef class Loop:
17281763
try:
17291764
udp = UDPTransport.__new__(UDPTransport)
17301765
udp._init(self, lai.ai_family)
1766+
if reuse_port:
1767+
self._sock_set_reuseport(udp._fileno())
17311768
udp._bind(lai.ai_addr, reuse_address)
17321769
except Exception as ex:
17331770
lai = lai.ai_next

0 commit comments

Comments
 (0)