Skip to content

Commit 625a9d7

Browse files
authored
Fix thread not waking up when there is still data to be sent (#2670)
1 parent 1784b1c commit 625a9d7

File tree

2 files changed

+59
-0
lines changed

2 files changed

+59
-0
lines changed

kafka/conn.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,13 @@ def send_pending_requests_v2(self):
10751075
total_bytes = self._send_bytes(self._send_buffer)
10761076
self._send_buffer = self._send_buffer[total_bytes:]
10771077

1078+
# If all data was sent, we need to get the new data from the protocol now, otherwise
1079+
# this function would return True, indicating that there are no more pending
1080+
# requests. This could cause the calling thread to wait indefinitely as it won't
1081+
# know that there is still buffered data to send.
1082+
if not self._send_buffer:
1083+
self._send_buffer = self._protocol.send_bytes()
1084+
10781085
if self._sensors:
10791086
self._sensors.bytes_sent.record(total_bytes)
10801087
# Return True iff send buffer is empty

test/test_conn.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
from kafka.conn import BrokerConnection, ConnectionStates
1414
from kafka.future import Future
15+
from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError
16+
from kafka.metrics.metrics import Metrics
17+
from kafka.metrics.stats.sensor import Sensor
1518
from kafka.protocol.api import RequestHeader
1619
from kafka.protocol.group import HeartbeatResponse
1720
from kafka.protocol.metadata import MetadataRequest
@@ -43,6 +46,15 @@ def _socket(mocker):
4346
mocker.patch('socket.socket', return_value=socket)
4447
return socket
4548

49+
def metrics(mocker):
50+
metrics = mocker.MagicMock(Metrics)
51+
metrics.mocked_sensors = {}
52+
def sensor(name, **kwargs):
53+
if name not in metrics.mocked_sensors:
54+
metrics.mocked_sensors[name] = mocker.MagicMock(Sensor)
55+
return metrics.mocked_sensors[name]
56+
metrics.sensor.side_effect = sensor
57+
return metrics
4658

4759
@pytest.fixture
4860
def conn(_socket, dns_lookup, mocker):
@@ -228,6 +240,46 @@ def test_send_response(_socket, conn):
228240
assert len(conn.in_flight_requests) == 1
229241

230242

243+
def test_send_async_request_while_other_request_is_already_in_buffer(_socket, conn, metrics):
244+
conn.connect()
245+
assert conn.state is ConnectionStates.CONNECTED
246+
assert 'node-0.bytes-sent' in metrics.mocked_sensors
247+
bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent']
248+
249+
req1 = MetadataRequest[0](topics='foo')
250+
header1 = RequestHeader(req1, client_id=conn.config['client_id'])
251+
payload_bytes1 = len(header1.encode()) + len(req1.encode())
252+
req2 = MetadataRequest[0]([])
253+
header2 = RequestHeader(req2, client_id=conn.config['client_id'])
254+
payload_bytes2 = len(header2.encode()) + len(req2.encode())
255+
256+
# The first call to the socket will raise a transient SSL exception. This will make the first
257+
# request to be kept in the internal buffer to be sent in the next call of
258+
# send_pending_requests_v2.
259+
_socket.send.side_effect = [SSLWantWriteError, 4 + payload_bytes1, 4 + payload_bytes2]
260+
261+
conn.send(req1, blocking=False)
262+
# This won't send any bytes because of the SSL exception and the request bytes will be kept in
263+
# the buffer.
264+
assert conn.send_pending_requests_v2() is False
265+
assert bytes_sent_sensor.record.call_args_list[0].args == (0,)
266+
267+
conn.send(req2, blocking=False)
268+
# This will send the remaining bytes in the buffer from the first request, but should notice
269+
# that the second request was queued, therefore it should return False.
270+
bytes_sent_sensor.record.reset_mock()
271+
assert conn.send_pending_requests_v2() is False
272+
bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes1)
273+
274+
bytes_sent_sensor.record.reset_mock()
275+
assert conn.send_pending_requests_v2() is True
276+
bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes2)
277+
278+
bytes_sent_sensor.record.reset_mock()
279+
assert conn.send_pending_requests_v2() is True
280+
bytes_sent_sensor.record.assert_called_once_with(0)
281+
282+
231283
def test_send_error(_socket, conn):
232284
conn.connect()
233285
assert conn.state is ConnectionStates.CONNECTED

0 commit comments

Comments
 (0)