Skip to content

Commit 1612c88

Browse files
authored
fix(taskworker) Improve shutdown of taskworkers (#94885)
`atexit` doesn't handle SIGTERM, which we need to have clean shutdown in unprivileged container contexts. I also added names to child processes and threads to help with debugging.
1 parent 27ea5c9 commit 1612c88

File tree

2 files changed

+37
-25
lines changed

2 files changed

+37
-25
lines changed

src/sentry/taskworker/worker.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from __future__ import annotations
22

3-
import atexit
43
import logging
54
import multiprocessing
65
import queue
6+
import signal
77
import threading
88
import time
99
from concurrent.futures import ThreadPoolExecutor
@@ -81,9 +81,6 @@ def __init__(
8181

8282
self._processing_pool_name: str = processing_pool_name or "unknown"
8383

84-
def __del__(self) -> None:
85-
self.shutdown()
86-
8784
def do_imports(self) -> None:
8885
for module in settings.TASKWORKER_IMPORTS:
8986
__import__(module)
@@ -99,10 +96,20 @@ def start(self) -> int:
9996
self.start_result_thread()
10097
self.start_spawn_children_thread()
10198

102-
atexit.register(self.shutdown)
99+
# Convert signals into KeyboardInterrupt.
100+
# Running shutdown() within the signal handler can lead to deadlocks
101+
def signal_handler(*args: Any) -> None:
102+
raise KeyboardInterrupt()
103103

104-
while True:
105-
self.run_once()
104+
signal.signal(signal.SIGINT, signal_handler)
105+
signal.signal(signal.SIGTERM, signal_handler)
106+
107+
try:
108+
while True:
109+
self.run_once()
110+
except KeyboardInterrupt:
111+
self.shutdown()
112+
raise
106113

107114
def run_once(self) -> None:
108115
"""Access point for tests to run a single worker loop"""
@@ -113,30 +120,33 @@ def shutdown(self) -> None:
113120
Shutdown cleanly
114121
Activate the shutdown event and drain results before terminating children.
115122
"""
116-
if self._shutdown_event.is_set():
117-
return
118-
119-
logger.info("taskworker.worker.shutdown")
123+
logger.info("taskworker.worker.shutdown.start")
120124
self._shutdown_event.set()
121125

126+
logger.info("taskworker.worker.shutdown.spawn_children")
127+
if self._spawn_children_thread:
128+
self._spawn_children_thread.join()
129+
130+
logger.info("taskworker.worker.shutdown.children")
122131
for child in self._children:
123132
child.terminate()
133+
for child in self._children:
124134
child.join()
125135

136+
logger.info("taskworker.worker.shutdown.result")
126137
if self._result_thread:
127-
self._result_thread.join()
138+
# Use a timeout as sometimes this thread can deadlock on the Event.
139+
self._result_thread.join(timeout=5)
128140

129-
# Drain remaining results synchronously, as the thread will have terminated
130-
# when shutdown_event was set.
141+
# Drain any remaining results synchronously
131142
while True:
132143
try:
133144
result = self._processed_tasks.get_nowait()
134145
self._send_result(result, fetch=False)
135146
except queue.Empty:
136147
break
137148

138-
if self._spawn_children_thread:
139-
self._spawn_children_thread.join()
149+
logger.info("taskworker.worker.shutdown.complete")
140150

141151
def _add_task(self) -> bool:
142152
"""
@@ -179,7 +189,7 @@ def start_result_thread(self) -> None:
179189
"""
180190

181191
def result_thread() -> None:
182-
logger.debug("taskworker.worker.result_thread_started")
192+
logger.debug("taskworker.worker.result_thread.started")
183193
iopool = ThreadPoolExecutor(max_workers=self._concurrency)
184194
with iopool as executor:
185195
while not self._shutdown_event.is_set():
@@ -193,7 +203,9 @@ def result_thread() -> None:
193203
)
194204
continue
195205

196-
self._result_thread = threading.Thread(target=result_thread)
206+
self._result_thread = threading.Thread(
207+
name="send-result", target=result_thread, daemon=True
208+
)
197209
self._result_thread.start()
198210

199211
def _send_result(self, result: ProcessingResult, fetch: bool = True) -> bool:
@@ -253,6 +265,7 @@ def _send_update_task(
253265
)
254266
# Use the shutdown_event as a sleep mechanism
255267
self._shutdown_event.wait(self._setstatus_backoff_seconds)
268+
256269
try:
257270
next_task = self.client.update_task(result, fetch_next)
258271
self._setstatus_backoff_seconds = 0
@@ -276,14 +289,15 @@ def _send_update_task(
276289

277290
def start_spawn_children_thread(self) -> None:
278291
def spawn_children_thread() -> None:
279-
logger.debug("taskworker.worker.spawn_children_thread_started")
292+
logger.debug("taskworker.worker.spawn_children_thread.started")
280293
while not self._shutdown_event.is_set():
281294
self._children = [child for child in self._children if child.is_alive()]
282295
if len(self._children) >= self._concurrency:
283296
time.sleep(0.1)
284297
continue
285298
for i in range(self._concurrency - len(self._children)):
286299
process = self.mp_context.Process(
300+
name=f"taskworker-child-{i}",
287301
target=child_process,
288302
args=(
289303
self._child_tasks,
@@ -301,7 +315,9 @@ def spawn_children_thread() -> None:
301315
extra={"pid": process.pid, "processing_pool": self._processing_pool_name},
302316
)
303317

304-
self._spawn_children_thread = threading.Thread(target=spawn_children_thread)
318+
self._spawn_children_thread = threading.Thread(
319+
name="spawn-children", target=spawn_children_thread, daemon=True
320+
)
305321
self._spawn_children_thread.start()
306322

307323
def fetch_task(self) -> InflightTaskActivation | None:

src/sentry/taskworker/workerchild.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
160160
f"execution deadline of {deadline} seconds exceeded by {taskname}"
161161
)
162162

163-
while True:
163+
while not shutdown_event.is_set():
164164
if max_task_count and processed_task_count >= max_task_count:
165165
metrics.incr(
166166
"taskworker.worker.max_task_count_reached",
@@ -171,10 +171,6 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
171171
)
172172
break
173173

174-
if shutdown_event.is_set():
175-
logger.info("taskworker.worker.shutdown_event")
176-
break
177-
178174
child_tasks_get_start = time.monotonic()
179175
try:
180176
# If the queue is empty, this could block for a second.

0 commit comments

Comments
 (0)