Skip to content

Commit 2f752e2

Browse files
authored
Fix race condition on task retry (python-arq#487)
1 parent 3914e48 commit 2f752e2

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

arq/worker.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,10 @@ async def start_jobs(self, job_ids: List[bytes]) -> None:
447447
await pipe.watch(in_progress_key)
448448
ongoing_exists = await pipe.exists(in_progress_key)
449449
score = await pipe.zscore(self.queue_name, job_id)
450-
if ongoing_exists or not score:
450+
if ongoing_exists or not score or score > timestamp_ms():
451451
# job already started elsewhere, or already finished and removed from queue
452+
# if score > ts_now,
453+
# it means probably the job was re-enqueued with a delay in another worker
452454
self.job_counter = self.job_counter - 1
453455
self.sem.release()
454456
logger.debug('job %s already running elsewhere', job_id)

tests/test_worker.py

+34
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,40 @@ async def test_job_successful(arq_redis: ArqRedis, worker, caplog):
160160
assert 'X.XXs → testing:foobar()\n X.XXs ← testing:foobar ● 42' in log
161161

162162

163+
async def test_job_retry_race_condition(arq_redis: ArqRedis, worker):
164+
async def retry_job(ctx):
165+
if ctx['job_try'] == 1:
166+
raise Retry(defer=10)
167+
168+
job_id = 'testing'
169+
await arq_redis.enqueue_job('retry_job', _job_id=job_id)
170+
171+
worker_one: Worker = worker(functions=[func(retry_job, name='retry_job')])
172+
worker_two: Worker = worker(functions=[func(retry_job, name='retry_job')])
173+
174+
assert worker_one.jobs_complete == 0
175+
assert worker_one.jobs_failed == 0
176+
assert worker_one.jobs_retried == 0
177+
178+
assert worker_two.jobs_complete == 0
179+
assert worker_two.jobs_failed == 0
180+
assert worker_two.jobs_retried == 0
181+
182+
await worker_one.start_jobs([job_id.encode()])
183+
await asyncio.gather(*worker_one.tasks.values())
184+
185+
await worker_two.start_jobs([job_id.encode()])
186+
await asyncio.gather(*worker_two.tasks.values())
187+
188+
assert worker_one.jobs_complete == 0
189+
assert worker_one.jobs_failed == 0
190+
assert worker_one.jobs_retried == 1
191+
192+
assert worker_two.jobs_complete == 0
193+
assert worker_two.jobs_failed == 0
194+
assert worker_two.jobs_retried == 0
195+
196+
163197
async def test_job_successful_no_result_logging(arq_redis: ArqRedis, worker, caplog):
164198
caplog.set_level(logging.INFO)
165199
await arq_redis.enqueue_job('foobar', _job_id='testing')

0 commit comments

Comments
 (0)