-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
fix(taskworker) Improve shutdown of taskworkers #94885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
`atexit` doesn't handle SIGTERM, which we need to have clean shutdown in unprivileged container contexts. I also added names to child proceses and threads to help with debugging.
the docs say that daemonic processes cannot create child processes, and I have to assume that somewhere we create child processes in worker tasks, and that we're better off with non-daemonic child processes.
@@ -193,7 +202,9 @@ def result_thread() -> None: | |||
) | |||
continue | |||
|
|||
self._result_thread = threading.Thread(target=result_thread) | |||
self._result_thread = threading.Thread( | |||
name="send-result", target=result_thread, daemon=True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The threads have daemon=true now as we don't need/want to block the worker shutdown on them.
These timeouts won't halt the thread and we should wait longer for them. I've switched to using a shutdown property to avoid deadlocks on the shutdown_event
Feels more correct to stop spawning children before terminating them all.
Codecov ReportAttention: Patch coverage is ✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## master #94885 +/- ##
==========================================
+ Coverage 87.88% 87.90% +0.01%
==========================================
Files 10447 10446 -1
Lines 604270 603901 -369
Branches 23540 23468 -72
==========================================
- Hits 531082 530869 -213
+ Misses 72824 72670 -154
+ Partials 364 362 -2 |
Go back to using the Event.is_set() instead of the attribute. While using the Event can sometimes deadlock in the `result_thread`, having a timeout on the join allow progress to be made in spite of a lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Signal-Interrupted Shutdown Causes Inconsistent State
The shutdown()
method is no longer idempotent due to the removal of the _shutdown_event.is_set()
early return check. This allows concurrent calls to shutdown()
, such as from signal handlers and manual calls. If a signal arrives while shutdown()
is already executing, the resulting KeyboardInterrupt
can interrupt critical cleanup operations (e.g., joining child processes or threads). This leads to race conditions, potentially leaving child processes running, causing multiple termination attempts, or resulting in an inconsistent worker state (e.g., issues with queue draining).
src/sentry/taskworker/worker.py#L117-L149
sentry/src/sentry/taskworker/worker.py
Lines 117 to 149 in 4ca7024
def shutdown(self) -> None: | |
""" | |
Shutdown cleanly | |
Activate the shutdown event and drain results before terminating children. | |
""" | |
logger.info("taskworker.worker.shutdown.start") | |
self._shutdown_event.set() | |
logger.info("taskworker.worker.shutdown.spawn_children") | |
if self._spawn_children_thread: | |
self._spawn_children_thread.join() | |
logger.info("taskworker.worker.shutdown.children") | |
for child in self._children: | |
child.terminate() | |
for child in self._children: | |
child.join() | |
logger.info("taskworker.worker.shutdown.result") | |
if self._result_thread: | |
# Use a timeout as sometimes this thread can deadlock on the Event. | |
self._result_thread.join(timeout=5) | |
# Drain any remaining results synchronously | |
while True: | |
try: | |
result = self._processed_tasks.get_nowait() | |
self._send_result(result, fetch=False) | |
except queue.Empty: | |
break | |
logger.info("taskworker.worker.shutdown.complete") |
Was this report helpful? Give feedback by reacting with 👍 or 👎
atexit
doesn't handle SIGTERM, which we need to have clean shutdown in unprivileged container contexts. I also added names to child processes and threads to help with debugging.