From 71a97edbee5e738c86ab9a3f4fe865c9d44a83b2 Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Wed, 11 Sep 2024 14:22:33 +0800 Subject: [PATCH 1/3] Fix negative expire_ms --- arq/connections.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/arq/connections.py b/arq/connections.py index c1058890..e843ac47 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -167,7 +167,9 @@ async def enqueue_job( else: score = enqueue_time_ms - expires_ms = expires_ms or score - enqueue_time_ms + self.expires_extra_ms + expires_ms = expires_ms or ( + 1 if score - enqueue_time_ms < 1 else score - enqueue_time_ms + self.expires_extra_ms + ) job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer) pipe.multi() From 8d03584c5aa9b7a483512a021cf65bcd74f9986b Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Wed, 11 Sep 2024 15:01:26 +0800 Subject: [PATCH 2/3] The other fix --- arq/connections.py | 4 +--- arq/worker.py | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index e843ac47..c1058890 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -167,9 +167,7 @@ async def enqueue_job( else: score = enqueue_time_ms - expires_ms = expires_ms or ( - 1 if score - enqueue_time_ms < 1 else score - enqueue_time_ms + self.expires_extra_ms - ) + expires_ms = expires_ms or score - enqueue_time_ms + self.expires_extra_ms job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer) pipe.multi() diff --git a/arq/worker.py b/arq/worker.py index 4c33b677..545ccf1c 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -751,7 +751,8 @@ async def run_cron(self, n: datetime, delay: float, num_windows: int = 2) -> Non job_id = f'{cron_job.name}:{to_unix_ms(cron_job.next_run)}' if cron_job.unique else None job_futures.add( self.pool.enqueue_job( - cron_job.name, _job_id=job_id, _queue_name=self.queue_name, _defer_until=cron_job.next_run + cron_job.name, _job_id=job_id, _queue_name=self.queue_name, + _defer_until=(cron_job.next_run if cron_job.next_run > datetime.now(tz=self.timezone) else None), ) ) cron_job.calculate_next(cron_job.next_run) From fd1b57628f98c708979754f7910d1b134abc20f5 Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Wed, 11 Sep 2024 15:04:01 +0800 Subject: [PATCH 3/3] Lint --- arq/worker.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/arq/worker.py b/arq/worker.py index 545ccf1c..4bba4d61 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -751,8 +751,12 @@ async def run_cron(self, n: datetime, delay: float, num_windows: int = 2) -> Non job_id = f'{cron_job.name}:{to_unix_ms(cron_job.next_run)}' if cron_job.unique else None job_futures.add( self.pool.enqueue_job( - cron_job.name, _job_id=job_id, _queue_name=self.queue_name, - _defer_until=(cron_job.next_run if cron_job.next_run > datetime.now(tz=self.timezone) else None), + cron_job.name, + _job_id=job_id, + _queue_name=self.queue_name, + _defer_until=( + cron_job.next_run if cron_job.next_run > datetime.now(tz=self.timezone) else None + ), ) ) cron_job.calculate_next(cron_job.next_run)