Skip to content

Commit cecde21

Browse files
committed
fixing signal handling
1 parent 197b812 commit cecde21

File tree

6 files changed

+91
-22
lines changed

6 files changed

+91
-22
lines changed

HISTORY.rst

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
History
44
-------
55

6+
v0.3.1 (2017-01-20)
7+
...................
8+
* fix main process signal handling so the worker shuts down when just the main process receives a signal
9+
* re-enqueue un-started jobs popped from the queue if the worker is about to exit
10+
611
v0.3.0 (2017-01-19)
712
...................
813
* rename settings class to ``RedisSettings`` and simplify significantly

arq/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
__all__ = ['VERSION']
44

5-
VERSION = StrictVersion('0.3')
5+
VERSION = StrictVersion('0.3.1')

arq/worker.py

+32-11
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ class BadJob(Exception):
3535
pass
3636

3737

38+
# special signal sent by the main process in case the worker process hasn't received a signal (eg. SIGTERM or SIGINT)
39+
SIG_SUPERVISOR = signal.SIGRTMIN + 7
40+
41+
3842
class BaseWorker(RedisMixin):
3943
"""
4044
Base class for Workers to inherit from.
@@ -80,6 +84,7 @@ def __init__(self, *,
8084
self.job_class = None # type: type # TODO
8185
signal.signal(signal.SIGINT, self.handle_sig)
8286
signal.signal(signal.SIGTERM, self.handle_sig)
87+
signal.signal(SIG_SUPERVISOR, self.handle_supervisor_signal)
8388
super().__init__(**kwargs) # type: ignore # TODO
8489
self._closing_lock = asyncio.Lock(loop=self.loop)
8590

@@ -176,15 +181,15 @@ async def work(self):
176181
msg = await redis.blpop(*redis_queues, timeout=1)
177182
if msg is None:
178183
continue
179-
_queue, data = msg
180-
if self._burst_mode and _queue == quit_queue:
184+
raw_queue, data = msg
185+
if self._burst_mode and raw_queue == quit_queue:
181186
work_logger.debug('got job from the quit queue, stopping')
182187
break
183-
queue = queue_lookup[_queue]
184-
work_logger.debug('scheduling job from queue %s', queue)
185-
await self.schedule_job(queue, data)
188+
queue = queue_lookup[raw_queue]
189+
await self.schedule_job(data, queue, raw_queue)
186190

187-
async def schedule_job(self, queue, data):
191+
async def schedule_job(self, data, queue, raw_queue):
192+
work_logger.debug('scheduling job from queue %s', queue)
188193
job = self.job_class(queue, data)
189194

190195
pt_cnt = len(self._pending_tasks)
@@ -193,6 +198,11 @@ async def schedule_job(self, queue, data):
193198
_, self._pending_tasks = await asyncio.wait(self._pending_tasks, loop=self.loop,
194199
return_when=asyncio.FIRST_COMPLETED)
195200

201+
if not self.running:
202+
work_logger.warning('job popped from queue, but exit is imminent, re-queueing the job')
203+
async with await self.get_redis_conn() as redis:
204+
await redis.lpush(raw_queue, data)
205+
return
196206
task = self.loop.create_task(self.run_job(job))
197207
task.add_done_callback(self.job_callback)
198208
self.loop.call_later(self.timeout_seconds, self.cancel_job, task, job)
@@ -280,9 +290,19 @@ async def close(self):
280290
await super().close()
281291
self._closed = True
282292

293+
def handle_supervisor_signal(self, signum, frame):
294+
self.running = False
295+
work_logger.warning('pid=%d, got shutdown signal from main process, stopping...', os.getpid())
296+
signal.signal(signal.SIGINT, self.handle_sig_force)
297+
signal.signal(signal.SIGTERM, self.handle_sig_force)
298+
signal.signal(signal.SIGALRM, self.handle_sig_force)
299+
signal.alarm(self.shutdown_delay)
300+
raise HandledExit()
301+
283302
def handle_sig(self, signum, frame):
284303
self.running = False
285304
work_logger.warning('pid=%d, got signal: %s, stopping...', os.getpid(), Signals(signum).name)
305+
signal.signal(SIG_SUPERVISOR, signal.SIG_IGN)
286306
signal.signal(signal.SIGINT, self.handle_sig_force)
287307
signal.signal(signal.SIGTERM, self.handle_sig_force)
288308
signal.signal(signal.SIGALRM, self.handle_sig_force)
@@ -365,17 +385,18 @@ def run_worker(self, worker_path, worker_class, burst):
365385
work_logger.critical('worker process %s exited badly with exit code %s',
366386
self.process.pid, self.process.exitcode)
367387
sys.exit(3)
368-
# could restart worker here, but better to leave it up to the real manager
388+
# could restart worker here, but better to leave it up to the real manager eg. docker restart: always
369389

370390
def handle_sig(self, signum, frame):
371391
signal.signal(signal.SIGINT, self.handle_sig_force)
372392
signal.signal(signal.SIGTERM, self.handle_sig_force)
373393
work_logger.warning('got signal: %s, waiting for worker pid=%s to finish...', Signals(signum).name,
374394
self.process and self.process.pid)
375-
for i in range(100): # pragma: no branch
376-
if not self.process or not self.process.is_alive():
377-
return
378-
time.sleep(0.1)
395+
# sleep to make sure handle_sig above has executed if it's going to and detached handle_supervisor_signal
396+
time.sleep(0.01)
397+
if self.process and self.process.is_alive():
398+
work_logger.debug("sending custom shutdown signal to worker in case it didn't receive the signal")
399+
os.kill(self.process.pid, SIG_SUPERVISOR)
379400

380401
def handle_sig_force(self, signum, frame):
381402
work_logger.error('got signal: %s again, forcing exit', Signals(signum).name)

tests/example.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ class WorkerSignalQuit(Worker):
2020
"""
2121
max_concurrent_tasks = 1
2222

23-
async def schedule_job(self, queue, data):
24-
await super().schedule_job(queue, data)
23+
async def schedule_job(self, *args):
24+
await super().schedule_job(*args)
2525
if self.jobs_complete >= 2:
2626
self.handle_sig(2, None)
2727

@@ -30,7 +30,7 @@ class WorkerSignalTwiceQuit(Worker):
3030
"""
3131
worker which simulates receiving sigint twice after 2 jobs
3232
"""
33-
async def schedule_job(self, queue, data):
34-
await super().schedule_job(queue, data)
33+
async def schedule_job(self, *args):
34+
await super().schedule_job(*args)
3535
if self.jobs_complete >= 2:
3636
self.handle_sig_force(2, None)

tests/test_cli.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,34 @@ def test_worker_exited_badly(tmpworkdir, monkeypatch):
4545
def test_main_process_sigint(tmpworkdir, monkeypatch, caplog):
4646
monkeypatch.setattr(arq.worker.Process, 'start', MagicMock())
4747
monkeypatch.setattr(arq.worker.Process, 'join', MagicMock())
48-
is_alive = MagicMock(side_effect=[True, True, False, False])
49-
monkeypatch.setattr(arq.worker.Process, 'is_alive', is_alive)
48+
monkeypatch.setattr(arq.worker.Process, 'is_alive', MagicMock(return_value=True))
5049
monkeypatch.setattr(arq.worker.Process, 'exitcode', 0)
5150
monkeypatch.setattr(arq.worker.Process, 'pid', 123)
51+
52+
os_kill = MagicMock()
53+
monkeypatch.setattr(arq.worker.os, 'kill', os_kill)
54+
5255
tmpworkdir.join('test.py').write(EXAMPLE_FILE)
5356
work_runner = arq.worker.RunWorkerProcess('test.py', 'Worker')
5457
work_runner.handle_sig(signal.SIGINT, None)
55-
assert is_alive.call_count == 3
5658
assert 'got signal: SIGINT, waiting for worker pid=123 to finish...' in caplog
59+
os_kill.assert_called_once_with(123, arq.worker.SIG_SUPERVISOR)
60+
61+
62+
def test_main_process_sigint_worker_stopped(tmpworkdir, monkeypatch, caplog):
63+
monkeypatch.setattr(arq.worker.Process, 'start', MagicMock())
64+
monkeypatch.setattr(arq.worker.Process, 'join', MagicMock())
65+
monkeypatch.setattr(arq.worker.Process, 'is_alive', MagicMock(return_value=False))
66+
monkeypatch.setattr(arq.worker.Process, 'exitcode', 0)
67+
monkeypatch.setattr(arq.worker.Process, 'pid', 123)
68+
69+
os_kill = MagicMock()
70+
monkeypatch.setattr(arq.worker.os, 'kill', os_kill)
71+
72+
tmpworkdir.join('test.py').write(EXAMPLE_FILE)
73+
work_runner = arq.worker.RunWorkerProcess('test.py', 'Worker')
74+
work_runner.handle_sig(signal.SIGINT, None)
75+
assert os_kill.called is False
5776

5877

5978
def test_main_process_sigint_twice(tmpworkdir, monkeypatch, caplog):
@@ -88,6 +107,5 @@ def test_main_process_sigint_twice_worker_running(tmpworkdir, monkeypatch, caplo
88107
with pytest.raises(arq.worker.ImmediateExit):
89108
work_runner.handle_sig_force(signal.SIGINT, None)
90109
assert is_alive.call_count == 1
91-
assert os_kill.called
92-
assert os_kill.call_args == ((123, signal.SIGTERM),)
110+
os_kill.assert_called_once_with(123, signal.SIGTERM)
93111
assert 'got signal: SIGINT again, forcing exit' in caplog

tests/test_worker.py

+26-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
import logging
33
import re
44
from multiprocessing import Process
5+
from unittest.mock import MagicMock
56

67
import pytest
78

9+
import arq.worker
810
from arq.testing import RaiseWorker
911
from arq.worker import import_string, start_worker
1012

@@ -117,12 +119,16 @@ async def test_run_quit(tmpworkdir, redis_conn, actor, caplog):
117119
await actor.save_slow(2, 0.1)
118120
await actor.save_slow(3, 0.1)
119121
await actor.save_slow(4, 0.1)
122+
assert 4 == await redis_conn.llen(b'arq:q:dft')
120123

121124
assert not tmpworkdir.join('save_slow').exists()
122125
worker = WorkerQuit(loop=actor.loop)
123126
await worker.run()
124-
assert tmpworkdir.join('save_slow').read() == '3'
127+
# the third job should be remove from the queue and readded
128+
assert tmpworkdir.join('save_slow').read() == '2'
125129
assert '1 pending tasks, waiting for one to finish before creating task for DemoActor.save_slow(2, 0.1)' in caplog
130+
assert 'job popped from queue, but exit is imminent, re-queueing the job' in caplog
131+
assert 2 == await redis_conn.llen(b'arq:q:dft')
126132

127133

128134
async def test_task_exc(redis_conn, actor, caplog):
@@ -171,6 +177,25 @@ def test_run_sigint_twice(tmpworkdir, redis_conn, loop, caplog):
171177
assert 'Worker exiting after an unhandled error: ImmediateExit' in caplog
172178

173179

180+
async def test_run_supervisor_signal(actor, monkeypatch):
181+
mock_signal_signal = MagicMock()
182+
monkeypatch.setattr(arq.worker.signal, 'signal', mock_signal_signal)
183+
mock_signal_alarm = MagicMock()
184+
monkeypatch.setattr(arq.worker.signal, 'alarm', mock_signal_alarm)
185+
186+
worker = Worker(burst=True, loop=actor.loop)
187+
assert worker.running is True
188+
assert mock_signal_signal.call_count == 3
189+
assert mock_signal_alarm.call_count == 0
190+
191+
with pytest.raises(arq.worker.HandledExit):
192+
worker.handle_supervisor_signal(arq.worker.SIG_SUPERVISOR, None)
193+
194+
assert worker.running is False
195+
assert mock_signal_signal.call_count == 6
196+
assert mock_signal_alarm.call_count == 1
197+
198+
174199
async def test_non_existent_function(redis_conn, actor, caplog):
175200
await actor.enqueue_job('doesnt_exist')
176201
worker = Worker(burst=True, loop=actor.loop)

0 commit comments

Comments
 (0)