Skip to content

Commit a537c80

Browse files
twangboydwoz
authored andcommitted
REQ/ZMQ/TCP cleanup: teardown helpers, minion cruft removal, balancer race guard
Drop unused timestamp assignments left in SaltClientError / generic reconnect handlers in MinionManager._connect_minion and the matching Syndic path. Refactor functional zeromq RequestClient tests to prefer close_future-backed cleanup with IOLoop-friendly polling and a synchronous fallback (_close_zmq_only / _mark_teardown_finished) when graceful shutdown stalls. Always set RequestClient._closing after the fallback so Transport.__del__ does not emit Unclosed transport warnings after tests stop the IOLoop. Harden TCP LoadBalancerServer accept loop: if close() clears the socket while accept() fails, exit the loop instead of calling getsockname() on None (fixes PytestUnhandledThreadExceptionWarning in test_tcp_load_balancer_server).
1 parent 0f538ac commit a537c80

3 files changed

Lines changed: 92 additions & 46 deletions

File tree

salt/minion.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,7 +1177,6 @@ def _connect_minion(self, minion):
11771177
minion.opts["master"],
11781178
exc,
11791179
)
1180-
last = time.time()
11811180
if auth_wait < self.max_auth_wait:
11821181
auth_wait += self.auth_wait
11831182
yield salt.ext.tornado.gen.sleep(auth_wait) # TODO: log?
@@ -1202,7 +1201,6 @@ def _connect_minion(self, minion):
12021201
# Match SaltClientError path: without a delay, connect_master can be
12031202
# retried in a tight loop and create zmq contexts faster than they are
12041203
# torn down (libzMQ pthread / EMFILE failures on some hosts).
1205-
last = time.time()
12061204
if auth_wait < self.max_auth_wait:
12071205
auth_wait += self.auth_wait
12081206
yield salt.ext.tornado.gen.sleep(auth_wait)
@@ -4557,7 +4555,6 @@ def _connect_syndic(self, opts):
45574555
"master at %s responding?",
45584556
opts["master"],
45594557
)
4560-
last = time.time()
45614558
if auth_wait < self.max_auth_wait:
45624559
auth_wait += self.auth_wait
45634560
yield salt.ext.tornado.gen.sleep(auth_wait) # TODO: log?

salt/transport/tcp.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ def run(self):
183183
# ECONNABORTED indicates that there was a connection
184184
# but it was closed while still in the accept queue.
185185
# (observed on FreeBSD).
186-
name = self._socket.getsockname()
186+
sock = self._socket
187+
if sock is None:
188+
break
189+
name = sock.getsockname()
187190
if isinstance(name, tuple):
188191
name = name[0]
189192
if salt.ext.tornado.util.errno_from_exception(e) == errno.ECONNABORTED:

tests/pytests/functional/transport/zeromq/test_request_client.py

Lines changed: 88 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,47 +12,109 @@
1212

1313
log = logging.getLogger(__name__)
1414

15-
16-
_REQ_DRAIN_POLLS = 300
17-
_REQ_DRAIN_SLEEP_S = 0.01
1815
_REQ_DRAIN_TIMEOUT_S = 10
19-
16+
_REQ_POLL_S = 0.01
2017

2118
pytestmark = [
2219
pytest.mark.windows_whitelisted,
2320
]
2421

2522

26-
def _sync_finalize_req_client(cli, io_loop):
23+
def _blocking_teardown_req_message_client(cli):
24+
"""
25+
When ``close_future`` cannot finish, release ZMQ resources.
26+
27+
``AsyncReqMessageClient.close()`` is a no-op if ``close_future()`` already
28+
started teardown but ``finalize()`` never ran (stuck loop, half-finished
29+
``_send_recv``, etc.). ``_close_zmq_only`` matches the resource release in
30+
``finalize`` without re-entering ``_initiate_async_req_close``.
31+
"""
32+
try:
33+
mc = cli.message_client
34+
if getattr(mc, "socket", None) is not None:
35+
try:
36+
mc.close()
37+
except Exception: # pylint: disable=broad-except
38+
log.debug(
39+
"REQ MessageClient synchronous close failed during cleanup",
40+
exc_info=True,
41+
)
42+
if getattr(mc, "socket", None) is not None:
43+
mc._close_zmq_only()
44+
mc._mark_teardown_finished()
45+
except Exception: # pylint: disable=broad-except
46+
log.debug(
47+
"REQ MessageClient synchronous teardown fallback failed during cleanup",
48+
exc_info=True,
49+
)
50+
finally:
51+
# ``Transport.__del__`` warns unless ``_closing`` is set; when the owning
52+
# ``IOLoop`` is stopped we never yield ``cli.close_future()`` (#68637).
53+
setattr(cli, "_closing", True)
54+
55+
56+
def _sync_yield_req_client_close_future(cli, io_loop):
2757
"""
28-
Explicit REQ close plus bounded wait for AsyncReqMessageClient teardown.
58+
Block until zeromq RequestClient asynchronous teardown completes (#68637).
2959
30-
When close() is deferred onto the loop (#68637), dropping the REQ client immediately
31-
can leave Zeromq sockets until GC and trigger noisy ``Socket.__del__`` tracebacks at
32-
process exit unless we drain ``message_client.socket is None``.
60+
Uses ``RequestClient.close_future()``—same completion contract as
61+
``AsyncReqChannel.close_async`` / ``Minion.destroy_async``—rather than polling
62+
``message_client.socket is None``. If the loop is stopped or ``run_sync`` fails,
63+
fall back to synchronous ``AsyncReqMessageClient.close()`` (covers tests that stop
64+
the ``IOLoop`` before teardown).
3365
"""
3466

3567
@salt.ext.tornado.gen.coroutine
36-
def _runner():
37-
cli.close()
38-
for _ in range(_REQ_DRAIN_POLLS):
39-
if cli.message_client.socket is None:
40-
break
41-
yield salt.ext.tornado.gen.sleep(_REQ_DRAIN_SLEEP_S)
68+
def _wait():
69+
fut = cli.close_future()
70+
yield fut
4271

72+
ok = False
4373
try:
44-
io_loop.run_sync(_runner, timeout=_REQ_DRAIN_TIMEOUT_S)
74+
if getattr(io_loop, "_running", False):
75+
io_loop.run_sync(_wait, timeout=_REQ_DRAIN_TIMEOUT_S)
76+
ok = True
4577
except Exception: # pylint: disable=broad-except
46-
log.debug("REQ client teardown drain aborted during cleanup", exc_info=True)
78+
log.debug(
79+
"REQ client close_future waiter aborted during cleanup", exc_info=True
80+
)
81+
if not ok or getattr(cli.message_client, "socket", None) is not None:
82+
_blocking_teardown_req_message_client(cli)
83+
84+
85+
def _sync_finalize_req_client(cli, io_loop):
86+
"""Explicit teardown for fixtures (wait on ``close_future``)."""
87+
_sync_yield_req_client_close_future(cli, io_loop)
4788

4889

4990
async def async_finalize_req_client(cli):
50-
"""Async equivalent of :func:`_sync_finalize_req_client`."""
51-
cli.close()
52-
for _ in range(_REQ_DRAIN_POLLS):
53-
if cli.message_client.socket is None:
54-
break
55-
await salt.ext.tornado.gen.sleep(_REQ_DRAIN_SLEEP_S)
91+
"""
92+
Async cleanup: spin the I/O loop via ``gen.sleep`` until ``close_future`` completes.
93+
94+
Plain ``await`` on a Tornado ``Future`` does not drive ``cli.io_loop`` in this
95+
test harness (#68637).
96+
"""
97+
await _async_wait_close_future(cli, "fixture REQ close_future did not finish")
98+
99+
100+
async def _await_req_teardown_after_close(cli):
101+
"""After ``RequestClient.close()`` in-test, wait for deferred teardown."""
102+
await _async_wait_close_future(
103+
cli, "REQ message client did not finish teardown after RequestClient.close()"
104+
)
105+
106+
107+
async def _async_wait_close_future(cli, fail_msg):
108+
fut = cli.close_future()
109+
n = max(1, int(_REQ_DRAIN_TIMEOUT_S / _REQ_POLL_S))
110+
for _ in range(n):
111+
if fut.done():
112+
fut.result()
113+
return
114+
await salt.ext.tornado.gen.sleep(_REQ_POLL_S)
115+
_blocking_teardown_req_message_client(cli)
116+
if getattr(cli.message_client, "socket", None) is not None:
117+
pytest.fail(fail_msg)
56118

57119

58120
def _zmq_teardown_rep(stream=None, rep_socket=None, ctx=None):
@@ -96,7 +158,7 @@ async def test_request_channel_issue_64627(io_loop, request_client, minion_opts,
96158
Validate socket is preserved until request channel is explicitly closed.
97159
98160
When ``AsyncReqMessageClient.close()`` runs on an active ``IOLoop``, teardown is
99-
scheduled on the loop (#68637); yield until ``socket`` is cleared before asserting.
161+
scheduled on the loop (#68637); yield ``close_future`` before asserting the socket is gone.
100162
"""
101163
minion_opts["master_uri"] = f"tcp://127.0.0.1:{port}"
102164

@@ -117,15 +179,7 @@ def req_handler(stream, msg):
117179
rep = await request_client.send(b"foo")
118180
assert req_socket is request_client.message_client.socket
119181
request_client.close()
120-
for _ in range(300):
121-
if request_client.message_client.socket is None:
122-
break
123-
await salt.ext.tornado.gen.sleep(0.01)
124-
else:
125-
pytest.fail(
126-
"REQ message client socket not cleared after RequestClient.close() "
127-
"(deferred teardown on running IOLoop; see #68637)"
128-
)
182+
await _await_req_teardown_after_close(request_client)
129183
assert request_client.message_client.socket is None
130184

131185
finally:
@@ -221,15 +275,7 @@ async def test_request_client_send_recv_socket_closed(
221275

222276
with caplog.at_level(logging.TRACE):
223277
request_client.close()
224-
for _ in range(300):
225-
if request_client.message_client.socket is None:
226-
break
227-
await salt.ext.tornado.gen.sleep(0.01)
228-
else:
229-
pytest.fail(
230-
"REQ message client socket not cleared after RequestClient.close() "
231-
"(deferred teardown on running IOLoop; see #68637)"
232-
)
278+
await _await_req_teardown_after_close(request_client)
233279

234280
assert any(
235281
"Send and receive coroutine ending" in msg and "closed" in msg

0 commit comments

Comments
 (0)