41
41
)
42
42
43
43
if TYPE_CHECKING :
44
+ from redis .asyncio .client import Pipeline
45
+
44
46
from .typing import SecondsTimedelta , StartupShutdown , WorkerCoroutine , WorkerSettingsType # noqa F401
45
47
46
48
logger = logging .getLogger ('arq.worker' )
@@ -404,11 +406,10 @@ async def _cancel_aborted_jobs(self) -> None:
404
406
"""
405
407
Go through job_ids in the abort_jobs_ss sorted set and cancel those tasks.
406
408
"""
409
+ pipe : 'Pipeline[bytes]'
407
410
async with self .pool .pipeline (transaction = True ) as pipe :
408
- pipe .zrange (abort_jobs_ss , start = 0 , end = - 1 ) # type: ignore[unused-coroutine]
409
- pipe .zremrangebyscore ( # type: ignore[unused-coroutine]
410
- abort_jobs_ss , min = timestamp_ms () + abort_job_max_age , max = float ('inf' )
411
- )
411
+ pipe .zrange (abort_jobs_ss , start = 0 , end = - 1 )
412
+ pipe .zremrangebyscore (abort_jobs_ss , min = timestamp_ms () + abort_job_max_age , max = float ('inf' ))
412
413
abort_job_ids , _ = await pipe .execute ()
413
414
414
415
aborted : Set [str ] = set ()
@@ -445,6 +446,7 @@ async def start_jobs(self, job_ids: List[bytes]) -> None:
445
446
446
447
job_id = job_id_b .decode ()
447
448
in_progress_key = in_progress_key_prefix + job_id
449
+ pipe : 'Pipeline[bytes]'
448
450
async with self .pool .pipeline (transaction = True ) as pipe :
449
451
await pipe .watch (in_progress_key )
450
452
ongoing_exists = await pipe .exists (in_progress_key )
@@ -457,9 +459,7 @@ async def start_jobs(self, job_ids: List[bytes]) -> None:
457
459
continue
458
460
459
461
pipe .multi ()
460
- pipe .psetex ( # type: ignore[no-untyped-call]
461
- in_progress_key , int (self .in_progress_timeout_s * 1000 ), b'1'
462
- )
462
+ pipe .psetex (in_progress_key , int (self .in_progress_timeout_s * 1000 ), b'1' )
463
463
try :
464
464
await pipe .execute ()
465
465
except (ResponseError , WatchError ):
@@ -474,12 +474,13 @@ async def start_jobs(self, job_ids: List[bytes]) -> None:
474
474
475
475
async def run_job (self , job_id : str , score : int ) -> None : # noqa: C901
476
476
start_ms = timestamp_ms ()
477
+ pipe : 'Pipeline[bytes]'
477
478
async with self .pool .pipeline (transaction = True ) as pipe :
478
- pipe .get (job_key_prefix + job_id ) # type: ignore[unused-coroutine]
479
- pipe .incr (retry_key_prefix + job_id ) # type: ignore[unused-coroutine]
480
- pipe .expire (retry_key_prefix + job_id , 88400 ) # type: ignore[unused-coroutine]
479
+ pipe .get (job_key_prefix + job_id )
480
+ pipe .incr (retry_key_prefix + job_id )
481
+ pipe .expire (retry_key_prefix + job_id , 88400 )
481
482
if self .allow_abort_jobs :
482
- pipe .zrem (abort_jobs_ss , job_id ) # type: ignore[unused-coroutine]
483
+ pipe .zrem (abort_jobs_ss , job_id )
483
484
v , job_try , _ , abort_job = await pipe .execute ()
484
485
else :
485
486
v , job_try , _ = await pipe .execute ()
@@ -686,41 +687,44 @@ async def finish_job(
686
687
incr_score : Optional [int ],
687
688
keep_in_progress : Optional [float ],
688
689
) -> None :
690
+
691
+ tr : 'Pipeline[bytes]'
689
692
async with self .pool .pipeline (transaction = True ) as tr :
690
693
delete_keys = []
691
694
in_progress_key = in_progress_key_prefix + job_id
692
695
if keep_in_progress is None :
693
696
delete_keys += [in_progress_key ]
694
697
else :
695
- tr .pexpire (in_progress_key , to_ms (keep_in_progress )) # type: ignore[unused-coroutine]
698
+ tr .pexpire (in_progress_key , to_ms (keep_in_progress ))
696
699
697
700
if finish :
698
701
if result_data :
699
702
expire = None if keep_result_forever else result_timeout_s
700
- tr .set (result_key_prefix + job_id , result_data , px = to_ms (expire )) # type: ignore[unused-coroutine]
703
+ tr .set (result_key_prefix + job_id , result_data , px = to_ms (expire ))
701
704
delete_keys += [retry_key_prefix + job_id , job_key_prefix + job_id ]
702
- tr .zrem (abort_jobs_ss , job_id ) # type: ignore[unused-coroutine]
703
- tr .zrem (self .queue_name , job_id ) # type: ignore[unused-coroutine]
705
+ tr .zrem (abort_jobs_ss , job_id )
706
+ tr .zrem (self .queue_name , job_id )
704
707
elif incr_score :
705
- tr .zincrby (self .queue_name , incr_score , job_id ) # type: ignore[unused-coroutine]
708
+ tr .zincrby (self .queue_name , incr_score , job_id )
706
709
if delete_keys :
707
- tr .delete (* delete_keys ) # type: ignore[unused-coroutine]
710
+ tr .delete (* delete_keys )
708
711
await tr .execute ()
709
712
710
713
async def finish_failed_job (self , job_id : str , result_data : Optional [bytes ]) -> None :
714
+ tr : 'Pipeline[bytes]'
711
715
async with self .pool .pipeline (transaction = True ) as tr :
712
- tr .delete ( # type: ignore[unused-coroutine]
716
+ tr .delete (
713
717
retry_key_prefix + job_id ,
714
718
in_progress_key_prefix + job_id ,
715
719
job_key_prefix + job_id ,
716
720
)
717
- tr .zrem (abort_jobs_ss , job_id ) # type: ignore[unused-coroutine]
718
- tr .zrem (self .queue_name , job_id ) # type: ignore[unused-coroutine]
721
+ tr .zrem (abort_jobs_ss , job_id )
722
+ tr .zrem (self .queue_name , job_id )
719
723
# result_data would only be None if serializing the result fails
720
724
keep_result = self .keep_result_forever or self .keep_result_s > 0
721
725
if result_data is not None and keep_result : # pragma: no branch
722
726
expire = 0 if self .keep_result_forever else self .keep_result_s
723
- tr .set (result_key_prefix + job_id , result_data , px = to_ms (expire )) # type: ignore[unused-coroutine]
727
+ tr .set (result_key_prefix + job_id , result_data , px = to_ms (expire ))
724
728
await tr .execute ()
725
729
726
730
async def heart_beat (self ) -> None :
0 commit comments