@@ -22,8 +22,10 @@ use common::tasks::{
2222 DoubleParams , DoubleTask , FailingParams , FailingTask , SleepParams , SleepingTask ,
2323} ;
2424use durable:: { Durable , MIGRATOR , RetryStrategy , SpawnOptions , WorkerOptions } ;
25- use sqlx:: { AssertSqlSafe , PgPool } ;
26- use std:: time:: Duration ;
25+ use sqlx:: postgres:: { PgConnectOptions , PgConnection } ;
26+ use sqlx:: { AssertSqlSafe , Connection , PgPool } ;
27+ use std:: time:: { Duration , Instant } ;
28+ use uuid:: Uuid ;
2729
2830async fn create_client ( pool : PgPool , queue_name : & str ) -> Durable {
2931 Durable :: builder ( )
@@ -363,3 +365,134 @@ async fn test_concurrent_emit_and_cancel(pool: PgPool) -> sqlx::Result<()> {
363365
364366 Ok ( ( ) )
365367}
368+
369+ /// Regression test: claim_task uses SKIP LOCKED to avoid deadlock with emit_event.
370+ ///
371+ /// emit_event locks tasks first (locked_tasks CTE with FOR UPDATE), then updates runs.
372+ /// claim_task joins runs+tasks with FOR UPDATE SKIP LOCKED.
373+ ///
374+ /// This test verifies that when a task is locked (simulating emit_event holding the lock),
375+ /// claim_task skips that task instead of blocking (which would cause deadlock).
376+ ///
377+ /// We make the test deterministic by:
378+ /// - Creating a task and making it claimable (pending state)
379+ /// - Holding a FOR UPDATE lock on the task row in a separate connection
380+ /// - Calling claim_task - it should complete immediately with 0 results (not block)
381+ #[ sqlx:: test( migrator = "MIGRATOR" ) ]
382+ async fn test_claim_task_skips_locked_tasks_no_deadlock ( pool : PgPool ) -> sqlx:: Result < ( ) > {
383+ let queue = "skip_locked_test" ;
384+
385+ // Setup: Create queue and spawn a task
386+ sqlx:: query ( "SELECT durable.create_queue($1)" )
387+ . bind ( queue)
388+ . execute ( & pool)
389+ . await ?;
390+
391+ let ( task_id, run_id) : ( Uuid , Uuid ) =
392+ sqlx:: query_as ( "SELECT task_id, run_id FROM durable.spawn_task($1, $2, $3, $4)" )
393+ . bind ( queue)
394+ . bind ( "test-task" )
395+ . bind ( serde_json:: json!( { } ) )
396+ . bind ( serde_json:: json!( { } ) )
397+ . fetch_one ( & pool)
398+ . await ?;
399+
400+ // Verify task is pending and claimable
401+ let state = get_task_state ( & pool, queue, task_id) . await ?;
402+ assert_eq ! ( state, Some ( "pending" . to_string( ) ) ) ;
403+
404+ // Get connect options from pool for creating separate connections
405+ let connect_opts: PgConnectOptions = ( * pool. connect_options ( ) ) . clone ( ) ;
406+
407+ // Open lock connection and hold FOR UPDATE lock on the task row
408+ // This simulates emit_event's locked_tasks CTE holding the lock mid-transaction
409+ let lock_opts = connect_opts. clone ( ) . application_name ( "durable-task-locker" ) ;
410+ let mut lock_conn = PgConnection :: connect_with ( & lock_opts) . await ?;
411+
412+ sqlx:: query ( "BEGIN" ) . execute ( & mut lock_conn) . await ?;
413+ sqlx:: query ( AssertSqlSafe ( format ! (
414+ "SELECT 1 FROM durable.t_{} WHERE task_id = $1 FOR UPDATE" ,
415+ queue
416+ ) ) )
417+ . bind ( task_id)
418+ . execute ( & mut lock_conn)
419+ . await ?;
420+
421+ // Wait until the lock is confirmed held by checking pg_stat_activity
422+ let deadline = Instant :: now ( ) + Duration :: from_secs ( 5 ) ;
423+ loop {
424+ let row: Option < ( String , ) > = sqlx:: query_as (
425+ "SELECT state FROM pg_stat_activity WHERE application_name = $1" ,
426+ )
427+ . bind ( "durable-task-locker" )
428+ . fetch_optional ( & pool)
429+ . await ?;
430+
431+ if let Some ( ( ref state, ) ) = row
432+ && state == "idle in transaction"
433+ {
434+ break ;
435+ }
436+ assert ! (
437+ Instant :: now( ) < deadline,
438+ "Lock connection did not reach expected state"
439+ ) ;
440+ tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
441+ }
442+
443+ // Now call claim_task from another connection
444+ // If SKIP LOCKED works correctly, it should complete immediately with 0 results
445+ // If SKIP LOCKED didn't apply to the task table, it would block and timeout
446+ let claim_opts = connect_opts. clone ( ) . application_name ( "durable-claimer" ) ;
447+ let mut claim_conn = PgConnection :: connect_with ( & claim_opts) . await ?;
448+
449+ // Set a short statement timeout - if claim_task blocks, it will fail
450+ sqlx:: query ( "SET statement_timeout = '500ms'" )
451+ . execute ( & mut claim_conn)
452+ . await ?;
453+
454+ let claim_result: Vec < ( Uuid , ) > =
455+ sqlx:: query_as ( "SELECT run_id FROM durable.claim_task($1, $2, $3, $4)" )
456+ . bind ( queue)
457+ . bind ( "worker" )
458+ . bind ( 60 )
459+ . bind ( 1 )
460+ . fetch_all ( & mut claim_conn)
461+ . await ?;
462+
463+ // claim_task should have completed (not timed out) and returned 0 results
464+ // because the task was locked and SKIP LOCKED caused it to be skipped
465+ assert ! (
466+ claim_result. is_empty( ) ,
467+ "claim_task should skip locked task, but got {} results" ,
468+ claim_result. len( )
469+ ) ;
470+
471+ // Reset statement timeout
472+ sqlx:: query ( "SET statement_timeout = 0" )
473+ . execute ( & mut claim_conn)
474+ . await ?;
475+
476+ // Release the lock
477+ sqlx:: query ( "ROLLBACK" ) . execute ( & mut lock_conn) . await ?;
478+ drop ( lock_conn) ;
479+
480+ // Now claim_task should be able to claim the task
481+ let claim_result2: Vec < ( Uuid , ) > =
482+ sqlx:: query_as ( "SELECT run_id FROM durable.claim_task($1, $2, $3, $4)" )
483+ . bind ( queue)
484+ . bind ( "worker" )
485+ . bind ( 60 )
486+ . bind ( 1 )
487+ . fetch_all ( & mut claim_conn)
488+ . await ?;
489+
490+ assert_eq ! (
491+ claim_result2. len( ) ,
492+ 1 ,
493+ "claim_task should claim the task after lock is released"
494+ ) ;
495+ assert_eq ! ( claim_result2[ 0 ] . 0 , run_id) ;
496+
497+ Ok ( ( ) )
498+ }
0 commit comments