Skip to content

Commit ee57e3b

Browse files
committed
improved preventing new jobs starting when the worker is stopping
1 parent cecde21 commit ee57e3b

8 files changed

+36
-31
lines changed

HISTORY.rst

+4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
History
44
-------
55

6+
v0.3.2 (TBD)
7+
...................
8+
* improved solution for preventing new jobs starting when the worker is about to stop
9+
610
v0.3.1 (2017-01-20)
711
...................
812
* fix main process signal handling so the worker shuts down when just the main process receives a signal

arq/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
__all__ = ['VERSION']
44

5-
VERSION = StrictVersion('0.3.1')
5+
VERSION = StrictVersion('0.3.2a1')

arq/worker.py

+21-21
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class BadJob(Exception):
3636

3737

3838
# special signal sent by the main process in case the worker process hasn't received a signal (eg. SIGTERM or SIGINT)
39-
SIG_SUPERVISOR = signal.SIGRTMIN + 7
39+
SIG_PROXY = signal.SIGRTMIN + 7
4040

4141

4242
class BaseWorker(RedisMixin):
@@ -84,7 +84,7 @@ def __init__(self, *,
8484
self.job_class = None # type: type # TODO
8585
signal.signal(signal.SIGINT, self.handle_sig)
8686
signal.signal(signal.SIGTERM, self.handle_sig)
87-
signal.signal(SIG_SUPERVISOR, self.handle_supervisor_signal)
87+
signal.signal(SIG_PROXY, self.handle_proxy_signal)
8888
super().__init__(**kwargs) # type: ignore # TODO
8989
self._closing_lock = asyncio.Lock(loop=self.loop)
9090

@@ -160,7 +160,7 @@ async def run(self, reuse=False):
160160
finally:
161161
if reuse:
162162
work_logger.info('waiting for %d jobs to finish', len(self._pending_tasks))
163-
await asyncio.gather(*self._pending_tasks, loop=self.loop)
163+
await asyncio.wait(self._pending_tasks, loop=self.loop)
164164
else:
165165
await self.close()
166166
if self._task_exception:
@@ -186,28 +186,28 @@ async def work(self):
186186
work_logger.debug('got job from the quit queue, stopping')
187187
break
188188
queue = queue_lookup[raw_queue]
189-
await self.schedule_job(data, queue, raw_queue)
189+
self.schedule_job(data, queue)
190+
await self.below_concurrency_limit()
190191

191-
async def schedule_job(self, data, queue, raw_queue):
192+
def schedule_job(self, data, queue):
192193
work_logger.debug('scheduling job from queue %s', queue)
193194
job = self.job_class(queue, data)
194195

195-
pt_cnt = len(self._pending_tasks)
196-
if pt_cnt >= self.max_concurrent_tasks:
197-
work_logger.debug('%d pending tasks, waiting for one to finish before creating task for %s', pt_cnt, job)
198-
_, self._pending_tasks = await asyncio.wait(self._pending_tasks, loop=self.loop,
199-
return_when=asyncio.FIRST_COMPLETED)
200-
201-
if not self.running:
202-
work_logger.warning('job popped from queue, but exit is imminent, re-queueing the job')
203-
async with await self.get_redis_conn() as redis:
204-
await redis.lpush(raw_queue, data)
205-
return
206196
task = self.loop.create_task(self.run_job(job))
207197
task.add_done_callback(self.job_callback)
208198
self.loop.call_later(self.timeout_seconds, self.cancel_job, task, job)
209199
self._pending_tasks.add(task)
210200

201+
async def below_concurrency_limit(self):
202+
pt_cnt = len(self._pending_tasks)
203+
while True:
204+
if pt_cnt < self.max_concurrent_tasks:
205+
return
206+
work_logger.debug('%d pending tasks, waiting for one to finish', pt_cnt)
207+
_, self._pending_tasks = await asyncio.wait(self._pending_tasks, loop=self.loop,
208+
return_when=asyncio.FIRST_COMPLETED)
209+
pt_cnt = len(self._pending_tasks)
210+
211211
def cancel_job(self, task, job):
212212
if not task.cancel():
213213
return
@@ -290,9 +290,9 @@ async def close(self):
290290
await super().close()
291291
self._closed = True
292292

293-
def handle_supervisor_signal(self, signum, frame):
293+
def handle_proxy_signal(self, signum, frame):
294294
self.running = False
295-
work_logger.warning('pid=%d, got shutdown signal from main process, stopping...', os.getpid())
295+
work_logger.warning('pid=%d, got signal proxied from main process, stopping...', os.getpid())
296296
signal.signal(signal.SIGINT, self.handle_sig_force)
297297
signal.signal(signal.SIGTERM, self.handle_sig_force)
298298
signal.signal(signal.SIGALRM, self.handle_sig_force)
@@ -302,7 +302,7 @@ def handle_supervisor_signal(self, signum, frame):
302302
def handle_sig(self, signum, frame):
303303
self.running = False
304304
work_logger.warning('pid=%d, got signal: %s, stopping...', os.getpid(), Signals(signum).name)
305-
signal.signal(SIG_SUPERVISOR, signal.SIG_IGN)
305+
signal.signal(SIG_PROXY, signal.SIG_IGN)
306306
signal.signal(signal.SIGINT, self.handle_sig_force)
307307
signal.signal(signal.SIGTERM, self.handle_sig_force)
308308
signal.signal(signal.SIGALRM, self.handle_sig_force)
@@ -392,11 +392,11 @@ def handle_sig(self, signum, frame):
392392
signal.signal(signal.SIGTERM, self.handle_sig_force)
393393
work_logger.warning('got signal: %s, waiting for worker pid=%s to finish...', Signals(signum).name,
394394
self.process and self.process.pid)
395-
# sleep to make sure handle_sig above has executed if it's going to and detached handle_supervisor_signal
395+
# sleep to make sure worker.handle_sig above has executed if it's going to and detached handle_proxy_signal
396396
time.sleep(0.01)
397397
if self.process and self.process.is_alive():
398398
work_logger.debug("sending custom shutdown signal to worker in case it didn't receive the signal")
399-
os.kill(self.process.pid, SIG_SUPERVISOR)
399+
os.kill(self.process.pid, SIG_PROXY)
400400

401401
def handle_sig_force(self, signum, frame):
402402
work_logger.error('got signal: %s again, forcing exit', Signals(signum).name)

setup.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[tool:pytest]
22
testpaths = tests
33
addopts = --isort
4+
timeout = 5
45

56
[flake8]
67
max-complexity = 10

tests/example.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ class WorkerSignalQuit(Worker):
2020
"""
2121
max_concurrent_tasks = 1
2222

23-
async def schedule_job(self, *args):
24-
await super().schedule_job(*args)
23+
def schedule_job(self, *args):
24+
super().schedule_job(*args)
2525
if self.jobs_complete >= 2:
2626
self.handle_sig(2, None)
2727

@@ -30,7 +30,7 @@ class WorkerSignalTwiceQuit(Worker):
3030
"""
3131
worker which simulates receiving sigint twice after 2 jobs
3232
"""
33-
async def schedule_job(self, *args):
34-
await super().schedule_job(*args)
33+
def schedule_job(self, *args):
34+
super().schedule_job(*args)
3535
if self.jobs_complete >= 2:
3636
self.handle_sig_force(2, None)

tests/requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pytest-cov==2.4.0
1111
pytest-isort==0.1.0
1212
pytest-mock==1.5.0
1313
pytest-sugar==0.8.0
14+
pytest-timeout==1.2.0
1415
pytest-toolbox==0.1
1516
pytz==2016.10
1617
Sphinx==1.5.1

tests/test_cli.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_main_process_sigint(tmpworkdir, monkeypatch, caplog):
5656
work_runner = arq.worker.RunWorkerProcess('test.py', 'Worker')
5757
work_runner.handle_sig(signal.SIGINT, None)
5858
assert 'got signal: SIGINT, waiting for worker pid=123 to finish...' in caplog
59-
os_kill.assert_called_once_with(123, arq.worker.SIG_SUPERVISOR)
59+
os_kill.assert_called_once_with(123, arq.worker.SIG_PROXY)
6060

6161

6262
def test_main_process_sigint_worker_stopped(tmpworkdir, monkeypatch, caplog):

tests/test_worker.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,7 @@ async def test_run_quit(tmpworkdir, redis_conn, actor, caplog):
126126
await worker.run()
127127
# the third job should be remove from the queue and readded
128128
assert tmpworkdir.join('save_slow').read() == '2'
129-
assert '1 pending tasks, waiting for one to finish before creating task for DemoActor.save_slow(2, 0.1)' in caplog
130-
assert 'job popped from queue, but exit is imminent, re-queueing the job' in caplog
129+
assert '1 pending tasks, waiting for one to finish' in caplog
131130
assert 2 == await redis_conn.llen(b'arq:q:dft')
132131

133132

@@ -177,7 +176,7 @@ def test_run_sigint_twice(tmpworkdir, redis_conn, loop, caplog):
177176
assert 'Worker exiting after an unhandled error: ImmediateExit' in caplog
178177

179178

180-
async def test_run_supervisor_signal(actor, monkeypatch):
179+
async def test_run_proxy_signal(actor, monkeypatch):
181180
mock_signal_signal = MagicMock()
182181
monkeypatch.setattr(arq.worker.signal, 'signal', mock_signal_signal)
183182
mock_signal_alarm = MagicMock()
@@ -189,7 +188,7 @@ async def test_run_supervisor_signal(actor, monkeypatch):
189188
assert mock_signal_alarm.call_count == 0
190189

191190
with pytest.raises(arq.worker.HandledExit):
192-
worker.handle_supervisor_signal(arq.worker.SIG_SUPERVISOR, None)
191+
worker.handle_proxy_signal(arq.worker.SIG_PROXY, None)
193192

194193
assert worker.running is False
195194
assert mock_signal_signal.call_count == 6

0 commit comments

Comments
 (0)