Skip to content

Commit 863edb5

Browse files
committed
fix(core): Adjust connection timeout and retry logic
Do not use the session timeout as connection timeout. This value is too large and results in a bad non-responsive server holding up the client long enough for the session to timeout. Use the KazooRetry object to use an increasing backoff timeout and cycle over all servers quickly, working around bad servers with minimal impact.
1 parent 4b13135 commit 863edb5

File tree

3 files changed

+115
-3
lines changed

3 files changed

+115
-3
lines changed

kazoo/protocol/connection.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,9 @@ def _connect_attempt(self, host, hostip, port, retry):
614614

615615
try:
616616
self._xid = 0
617-
read_timeout, connect_timeout = self._connect(host, hostip, port)
617+
read_timeout, connect_timeout = self._connect(
618+
host, hostip, port, timeout=retry.cur_delay
619+
)
618620
read_timeout = read_timeout / 1000.0
619621
connect_timeout = connect_timeout / 1000.0
620622
retry.reset()
@@ -685,7 +687,7 @@ def _connect_attempt(self, host, hostip, port, retry):
685687
if self._socket is not None:
686688
self._socket.close()
687689

688-
def _connect(self, host, hostip, port):
690+
def _connect(self, host, hostip, port, timeout):
689691
client = self.client
690692
self.logger.info(
691693
"Connecting to %s(%s):%s, use_ssl: %r",
@@ -705,7 +707,7 @@ def _connect(self, host, hostip, port):
705707
with self._socket_error_handling():
706708
self._socket = self.handler.create_connection(
707709
address=(hostip, port),
708-
timeout=client._session_timeout / 1000.0,
710+
timeout=timeout,
709711
use_ssl=self.client.use_ssl,
710712
keyfile=self.client.keyfile,
711713
certfile=self.client.certfile,

kazoo/retry.py

+4
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ def copy(self):
109109
obj.retry_exceptions = self.retry_exceptions
110110
return obj
111111

112+
@property
113+
def cur_delay(self):
114+
return self._cur_delay
115+
112116
def __call__(self, func, *args, **kwargs):
113117
"""Call a function with arguments until it completes without
114118
throwing a Kazoo exception

kazoo/tests/test_connection.py

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from unittest import mock
2+
3+
import pytest
4+
5+
from kazoo import retry
6+
from kazoo.handlers import threading
7+
from kazoo.protocol import connection
8+
from kazoo.protocol import states
9+
10+
11+
@mock.patch("kazoo.protocol.connection.ConnectionHandler._expand_client_hosts")
12+
def test_retry_logic(mock_expand):
13+
mock_client = mock.Mock()
14+
mock_client._state = states.KeeperState.CLOSED
15+
mock_client._session_id = None
16+
mock_client._session_passwd = b"\x00" * 16
17+
mock_client._stopped.is_set.return_value = False
18+
mock_client.handler.timeout_exception = threading.KazooTimeoutError
19+
mock_client.handler.create_connection.side_effect = (
20+
threading.KazooTimeoutError()
21+
)
22+
test_retry = retry.KazooRetry(
23+
max_tries=6,
24+
delay=1.0,
25+
backoff=2,
26+
max_delay=30.0,
27+
max_jitter=0.0,
28+
sleep_func=lambda _x: None,
29+
)
30+
test_cnx = connection.ConnectionHandler(
31+
client=mock_client,
32+
retry_sleeper=test_retry,
33+
)
34+
mock_expand.return_value = [
35+
("a", "1.1.1.1", 2181),
36+
("b", "2.2.2.2", 2181),
37+
("c", "3.3.3.3", 2181),
38+
]
39+
40+
with pytest.raises(retry.RetryFailedError):
41+
test_retry(test_cnx._connect_loop, test_retry)
42+
43+
assert mock_client.handler.create_connection.call_args_list[:3] == [
44+
mock.call(
45+
address=("1.1.1.1", 2181),
46+
timeout=1.0,
47+
use_ssl=mock.ANY,
48+
keyfile=mock.ANY,
49+
certfile=mock.ANY,
50+
ca=mock.ANY,
51+
keyfile_password=mock.ANY,
52+
verify_certs=mock.ANY,
53+
),
54+
mock.call(
55+
address=("2.2.2.2", 2181),
56+
timeout=1.0,
57+
use_ssl=mock.ANY,
58+
keyfile=mock.ANY,
59+
certfile=mock.ANY,
60+
ca=mock.ANY,
61+
keyfile_password=mock.ANY,
62+
verify_certs=mock.ANY,
63+
),
64+
mock.call(
65+
address=("3.3.3.3", 2181),
66+
timeout=1.0,
67+
use_ssl=mock.ANY,
68+
keyfile=mock.ANY,
69+
certfile=mock.ANY,
70+
ca=mock.ANY,
71+
keyfile_password=mock.ANY,
72+
verify_certs=mock.ANY,
73+
),
74+
], "All hosts are first tried with the lowest timeout value"
75+
assert mock_client.handler.create_connection.call_args_list[-3:] == [
76+
mock.call(
77+
address=("1.1.1.1", 2181),
78+
timeout=30.0,
79+
use_ssl=mock.ANY,
80+
keyfile=mock.ANY,
81+
certfile=mock.ANY,
82+
ca=mock.ANY,
83+
keyfile_password=mock.ANY,
84+
verify_certs=mock.ANY,
85+
),
86+
mock.call(
87+
address=("2.2.2.2", 2181),
88+
timeout=30.0,
89+
use_ssl=mock.ANY,
90+
keyfile=mock.ANY,
91+
certfile=mock.ANY,
92+
ca=mock.ANY,
93+
keyfile_password=mock.ANY,
94+
verify_certs=mock.ANY,
95+
),
96+
mock.call(
97+
address=("3.3.3.3", 2181),
98+
timeout=30.0,
99+
use_ssl=mock.ANY,
100+
keyfile=mock.ANY,
101+
certfile=mock.ANY,
102+
ca=mock.ANY,
103+
keyfile_password=mock.ANY,
104+
verify_certs=mock.ANY,
105+
),
106+
], "All hosts are last tried with the highest timeout value"

0 commit comments

Comments
 (0)