From b21d128618c35aa431cb744349e0bc2d7d70f529 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Sun, 15 Mar 2026 21:19:49 +0100 Subject: [PATCH 1/3] feat(job): implement robust cancel_job with cross-node NOTIFY and force-abort MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a full cooperative cancellation system (Oban model) with: Phase 1 — Schema: new migration adding cancelled_at column to job_executions and a trigger that fires pg_notify('job_cancel') when the column transitions from NULL to non-NULL. New ExecutionCancelled and JobCancelled event variants. Poll query now filters out cancelled pending rows. Phase 2 — Local infrastructure: RunningJobRegistry (DashMap-backed) tracks running jobs with CancellationToken per job. The monitor task watches for cancel signals alongside shutdown, with configurable cancel_timeout (default 5s) for cooperative exit before force-abort. CurrentJob exposes cancellation_requested() / is_cancellation_requested() for runners. JobCompletion::Cancelled variant for cooperative cancel. Phase 3 — Cross-node signaling: PgListener subscribes to job_cancel channel. On notification, looks up RunningJobRegistry and cancels the token. Keep-alive handler polls for cancelled_at IS NOT NULL as fallback for missed NOTIFY (connection drops). Phase 4 — Public API: Jobs::cancel_job() with idempotent CancelResult enum (CancelledWhilePending, CancelledWhileRunning, AlreadyCompleted, AlreadyCancelled, NotFound). Pending jobs are immediately deleted and events recorded. Running jobs get cancelled_at set + NOTIFY fired. Co-Authored-By: Claude Opus 4.6 --- .gitignore | 1 + Cargo.lock | 35 ++++ Cargo.toml | 5 + .../20250905000000_add_cancelled_at.sql | 20 +++ src/config.rs | 9 + src/current.rs | 37 ++++ src/dispatcher.rs | 40 ++++- src/entity.rs | 27 ++- src/error.rs | 4 - src/lib.rs | 124 ++++++++++--- src/poller.rs | 166 +++++++++++++++--- src/runner.rs | 2 + src/running_registry.rs | 47 +++++ tests/job.rs | 32 ++-- 14 files changed, 476 insertions(+), 73 deletions(-) create mode 100644 migrations/20250905000000_add_cancelled_at.sql create mode 100644 src/running_registry.rs diff --git a/.gitignore b/.gitignore index 7fb613a..656bd25 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ target/ .env .bacon-locations .claude/settings.local.json +.mcp.json diff --git a/Cargo.lock b/Cargo.lock index 9f99069..e55ff81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,6 +352,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.7.10" @@ -706,6 +720,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -960,6 +980,7 @@ dependencies = [ "anyhow", "async-trait", "chrono", + "dashmap", "derive_builder", "es-entity", "futures", @@ -971,6 +992,7 @@ dependencies = [ "sqlx", "thiserror", "tokio", + "tokio-util", "tracing", "uuid", ] @@ -2199,6 +2221,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tracing" version = "0.1.44" diff --git a/Cargo.toml b/Cargo.toml index 9b4cd1c..29b79c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,9 @@ tokio = { workspace = true } uuid = { workspace = true } rand = { workspace = true } +dashmap = { workspace = true } +tokio-util = { workspace = true } + schemars = { workspace = true, optional = true } [dev-dependencies] @@ -58,4 +61,6 @@ tokio = { version = "1.50", features = ["rt-multi-thread", "macros"] } uuid = { version = "1.22", features = ["serde", "v7"] } futures = "0.3" rand = "0.10" +dashmap = "6" +tokio-util = "0.7" schemars = { version = "1.0", features = ["derive", "chrono04", "rust_decimal1"] } diff --git a/migrations/20250905000000_add_cancelled_at.sql b/migrations/20250905000000_add_cancelled_at.sql new file mode 100644 index 0000000..852cc81 --- /dev/null +++ b/migrations/20250905000000_add_cancelled_at.sql @@ -0,0 +1,20 @@ +-- Add cancelled_at column to support cooperative job cancellation. +-- When set to a non-NULL value, a running job is marked for cancellation. + +ALTER TABLE job_executions ADD COLUMN cancelled_at TIMESTAMPTZ; + +-- When cancelled_at transitions from NULL to non-NULL, fire a NOTIFY so +-- the poller instance running the job can cancel it promptly. +CREATE OR REPLACE FUNCTION notify_job_cancel() RETURNS TRIGGER AS $$ +BEGIN + IF OLD.cancelled_at IS NULL AND NEW.cancelled_at IS NOT NULL THEN + PERFORM pg_notify('job_cancel', NEW.id::text); + END IF; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER job_executions_notify_cancel_trigger +AFTER UPDATE ON job_executions +FOR EACH ROW +EXECUTE FUNCTION notify_job_cancel(); diff --git a/src/config.rs b/src/config.rs index a20c0e5..0e5a36f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,6 +24,10 @@ pub struct JobPollerConfig { #[serde(default = "default_shutdown_timeout")] /// How long to wait for jobs to complete gracefully during shutdown before rescheduling them. pub shutdown_timeout: Duration, + #[serde_as(as = "serde_with::DurationSeconds")] + #[serde(default = "default_cancel_timeout")] + /// How long to wait for a cancelled job to finish cooperatively before force-aborting it. + pub cancel_timeout: Duration, } impl Default for JobPollerConfig { @@ -33,6 +37,7 @@ impl Default for JobPollerConfig { max_jobs_per_process: default_max_jobs_per_process(), min_jobs_per_process: default_min_jobs_per_process(), shutdown_timeout: default_shutdown_timeout(), + cancel_timeout: default_cancel_timeout(), } } } @@ -161,3 +166,7 @@ fn default_min_jobs_per_process() -> usize { fn default_shutdown_timeout() -> Duration { Duration::from_secs(5) } + +fn default_cancel_timeout() -> Duration { + Duration::from_secs(5) +} diff --git a/src/current.rs b/src/current.rs index 422c32b..b37e768 100644 --- a/src/current.rs +++ b/src/current.rs @@ -3,6 +3,7 @@ use es_entity::clock::ClockHandle; use serde::{Serialize, de::DeserializeOwned}; use sqlx::PgPool; +use tokio_util::sync::CancellationToken; use super::{JobId, error::JobError}; @@ -15,6 +16,7 @@ pub struct CurrentJob { shutdown_rx: tokio::sync::broadcast::Receiver< tokio::sync::mpsc::Sender>, >, + cancel_token: CancellationToken, clock: ClockHandle, } @@ -27,6 +29,7 @@ impl CurrentJob { shutdown_rx: tokio::sync::broadcast::Receiver< tokio::sync::mpsc::Sender>, >, + cancel_token: CancellationToken, clock: ClockHandle, ) -> Self { Self { @@ -35,6 +38,7 @@ impl CurrentJob { pool, execution_state_json: execution_state, shutdown_rx, + cancel_token, clock, } } @@ -155,4 +159,37 @@ impl CurrentJob { pub fn is_shutdown_requested(&mut self) -> bool { self.shutdown_rx.try_recv().is_ok() } + + /// Wait for a cancellation signal. Resolves when the job has been cancelled. + /// + /// Unlike [`shutdown_requested`](Self::shutdown_requested), this is specific + /// to an individual job being cancelled (e.g. via [`Jobs::cancel_job`](crate::Jobs::cancel_job)). + /// + /// # Example + /// + /// ```no_run + /// # use job::CurrentJob; + /// # async fn example(current_job: CurrentJob) { + /// tokio::select! { + /// _ = current_job.cancellation_requested() => { + /// // Cancellation requested, clean up and exit + /// return; + /// } + /// result = do_work() => { + /// // Normal work completion + /// } + /// } + /// # } + /// # async fn do_work() {} + /// ``` + pub async fn cancellation_requested(&self) { + self.cancel_token.cancelled().await; + } + + /// Non-blocking check if cancellation has been requested for this job. + /// + /// Returns `true` if the job's cancellation token has been triggered. + pub fn is_cancellation_requested(&self) -> bool { + self.cancel_token.is_cancelled() + } } diff --git a/src/dispatcher.rs b/src/dispatcher.rs index ced1fcf..5b258ed 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -3,13 +3,14 @@ use es_entity::AtomicOperation; use es_entity::clock::ClockHandle; use futures::FutureExt; use serde_json::Value as JsonValue; +use tokio_util::sync::CancellationToken; use tracing::{Span, instrument}; use std::{panic::AssertUnwindSafe, sync::Arc}; use super::{ JobId, current::CurrentJob, entity::RetryPolicy, error::JobError, repo::JobRepo, runner::*, - tracker::JobTracker, + running_registry::RunningJobRegistry, tracker::JobTracker, }; #[derive(Debug)] @@ -24,18 +25,24 @@ pub(crate) struct JobDispatcher { retry_settings: RetrySettings, runner: Option>, tracker: Arc, + running_registry: RunningJobRegistry, + cancel_token: CancellationToken, + job_id: JobId, rescheduled: bool, instance_id: uuid::Uuid, clock: ClockHandle, } impl JobDispatcher { + #[allow(clippy::too_many_arguments)] pub fn new( repo: Arc, tracker: Arc, retry_settings: RetrySettings, - _id: JobId, + id: JobId, runner: Box, instance_id: uuid::Uuid, + running_registry: RunningJobRegistry, + cancel_token: CancellationToken, clock: ClockHandle, ) -> Self { Self { @@ -43,6 +50,9 @@ impl JobDispatcher { retry_settings, runner: Some(runner), tracker, + running_registry, + cancel_token, + job_id: id, rescheduled: false, instance_id, clock, @@ -88,6 +98,7 @@ impl JobDispatcher { self.repo.pool().clone(), polled_job.data_json, shutdown_rx, + self.cancel_token.clone(), self.clock.clone(), ); self.tracker.dispatch_job(); @@ -96,6 +107,10 @@ impl JobDispatcher { span.record("conclusion", "Error"); self.fail_job(job.id, e, polled_job.attempt).await? } + Ok(JobCompletion::Cancelled) => { + span.record("conclusion", "Cancelled"); + self.cancel_and_complete_job(job.id).await?; + } Ok(JobCompletion::Complete) => { span.record("conclusion", "Complete"); let mut op = self.repo.begin_op_with_clock(&self.clock).await?; @@ -331,6 +346,26 @@ impl JobDispatcher { Ok(()) } + #[instrument(name = "job.cancel_and_complete_job", skip(self), fields(id = %id))] + async fn cancel_and_complete_job(&mut self, id: JobId) -> Result<(), JobError> { + let mut op = self.repo.begin_op_with_clock(&self.clock).await?; + let mut job = self.repo.find_by_id(&id).await?; + sqlx::query!( + r#" + DELETE FROM job_executions + WHERE id = $1 AND poller_instance_id = $2 + "#, + id as JobId, + self.instance_id + ) + .execute(op.as_executor()) + .await?; + job.cancel_job(); + self.repo.update_in_op(&mut op, &mut job).await?; + op.commit().await?; + Ok(()) + } + #[instrument(name = "job.reschedule_job", skip(self, op), fields(id = %id, reschedule_at = %reschedule_at, attempt = 1))] async fn reschedule_job( &mut self, @@ -360,6 +395,7 @@ impl JobDispatcher { impl Drop for JobDispatcher { fn drop(&mut self) { + self.running_registry.remove(&self.job_id); self.tracker.job_completed(self.rescheduled) } } diff --git a/src/entity.rs b/src/entity.rs index aa9e43e..543f6bd 100644 --- a/src/entity.rs +++ b/src/entity.rs @@ -68,8 +68,9 @@ pub enum JobEvent { ExecutionErrored { error: String, }, + ExecutionCancelled, JobCompleted, - Cancelled, + JobCancelled, AttemptCounterReset, } @@ -177,20 +178,25 @@ impl Job { serde_json::from_value(self.config.clone()) } - /// Returns `true` once the job has emitted a `JobCompleted` or `Cancelled` event. + /// Returns `true` once the job has emitted a `JobCompleted` or `JobCancelled` event. pub fn completed(&self) -> bool { self.events .iter_all() .rev() - .any(|event| matches!(event, JobEvent::JobCompleted | JobEvent::Cancelled)) + .any(|event| matches!(event, JobEvent::JobCompleted | JobEvent::JobCancelled)) } - /// Returns `true` if the job was cancelled. + /// Returns `true` once the job has emitted a `JobCancelled` event. pub fn cancelled(&self) -> bool { self.events .iter_all() .rev() - .any(|event| matches!(event, JobEvent::Cancelled)) + .any(|event| matches!(event, JobEvent::JobCancelled)) + } + + pub(super) fn cancel_job(&mut self) { + self.events.push(JobEvent::ExecutionCancelled); + self.events.push(JobEvent::JobCancelled); } pub(crate) fn inject_tracing_parent(&self) { @@ -236,14 +242,6 @@ impl Job { self.events.push(JobEvent::JobCompleted); } - pub(crate) fn cancel(&mut self) -> es_entity::Idempotent<()> { - if self.completed() { - return es_entity::Idempotent::AlreadyApplied; - } - self.events.push(JobEvent::Cancelled); - es_entity::Idempotent::Executed(()) - } - pub(super) fn schedule_retry( &mut self, error: String, @@ -332,8 +330,9 @@ impl TryFromEvents for Job { JobEvent::ExecutionCompleted => {} JobEvent::ExecutionAborted { .. } => {} JobEvent::ExecutionErrored { .. } => {} + JobEvent::ExecutionCancelled => {} JobEvent::JobCompleted => {} - JobEvent::Cancelled => {} + JobEvent::JobCancelled => {} JobEvent::AttemptCounterReset => {} } } diff --git a/src/error.rs b/src/error.rs index ae50ecd..8dd30a7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -36,10 +36,6 @@ pub enum JobError { DuplicateId(Option), #[error("JobError - DuplicateUniqueJobType: {0:?}")] DuplicateUniqueJobType(Option), - #[error( - "JobError - CannotCancelJob: job is not in pending state (may be running or already completed)" - )] - CannotCancelJob, #[error("JobError - Config: {0}")] Config(String), #[error("JobError - Migration: {0}")] diff --git a/src/lib.rs b/src/lib.rs index 4086013..b5219aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -215,12 +215,12 @@ mod poller; mod registry; mod repo; mod runner; +mod running_registry; mod spawner; mod tracker; pub mod error; -use es_entity::AtomicOperation; use tracing::instrument; use std::sync::{Arc, Mutex}; @@ -239,9 +239,33 @@ pub use spawner::*; use error::*; use poller::*; use repo::*; +use running_registry::RunningJobRegistry; + +#[derive(sqlx::FromRow)] +struct CancelRunningRow { + poller_instance_id: Option, +} es_entity::entity_id! { JobId } +/// Outcome of a [`Jobs::cancel_job`] request. +#[derive(Debug, Clone)] +pub enum CancelResult { + /// The job was still pending and has been immediately cancelled. + CancelledWhilePending, + /// The job was running; a cancel signal has been sent via `cancelled_at` / NOTIFY. + CancelledWhileRunning { + /// The poller instance that owns the running job. + poller_instance_id: uuid::Uuid, + }, + /// The job had already finished (successfully or with an error). + AlreadyCompleted, + /// The job was already cancelled. + AlreadyCancelled, + /// No job with this identifier exists. + NotFound, +} + #[derive(Clone)] /// Primary entry point for interacting with the Job crate. Provides APIs to register job /// handlers, manage configuration, and control scheduling and execution. @@ -249,6 +273,7 @@ pub struct Jobs { config: JobSvcConfig, repo: Arc, registry: Arc>>, + running_registry: RunningJobRegistry, poller_handle: Option>, clock: ClockHandle, } @@ -278,11 +303,13 @@ impl Jobs { let repo = Arc::new(JobRepo::new(&pool)); let registry = Arc::new(Mutex::new(Some(JobRegistry::new()))); + let running_registry = RunningJobRegistry::new(); let clock = config.clock.clone(); Ok(Self { repo, config, registry, + running_registry, poller_handle: None, clock, }) @@ -444,6 +471,7 @@ impl Jobs { self.config.poller_config.clone(), Arc::clone(&self.repo), registry, + self.running_registry.clone(), self.clock.clone(), ) .start() @@ -481,33 +509,87 @@ impl Jobs { Ok(self.repo.find_by_id(id).await?) } - /// Cancel a pending job, removing it from the execution queue. - /// - /// This operation is idempotent — calling it on an already cancelled or - /// completed job is a no-op. If the job exists but is currently running - /// (not pending), returns [`JobError::CannotCancelJob`]. - #[instrument(name = "job.cancel_job", skip(self))] - pub async fn cancel_job(&self, id: JobId) -> Result<(), JobError> { - let mut op = self.repo.begin_op_with_clock(&self.clock).await?; - let mut job = self.repo.find_by_id(id).await?; + /// Cancel a running or pending job. + /// + /// The method is idempotent: calling it on an already-cancelled or + /// already-completed job returns the appropriate variant without error. + /// + /// For **pending** jobs the execution row is deleted immediately and + /// cancellation events are recorded. + /// + /// For **running** jobs the `cancelled_at` column is set, which fires a + /// Postgres NOTIFY picked up by the owning poller. The poller then + /// cooperatively cancels the job (with a force-abort fallback). + #[instrument(name = "job.cancel_job", skip(self), fields(id = %id), err)] + pub async fn cancel_job(&self, id: JobId) -> Result { + // 1. Check if the job entity exists + let job = match self.repo.find_by_id(id).await { + Ok(j) => j, + Err(e) if e.was_not_found() => return Ok(CancelResult::NotFound), + Err(e) => return Err(e.into()), + }; - if job.cancel().did_execute() { - let result = sqlx::query!( - r#"DELETE FROM job_executions WHERE id = $1 AND state = 'pending'"#, - id as JobId, - ) - .execute(op.as_executor()) - .await?; + // 2. Already cancelled? + if job.cancelled() { + return Ok(CancelResult::AlreadyCancelled); + } - if result.rows_affected() == 0 { - return Err(JobError::CannotCancelJob); - } + // 3. Already completed? + if job.completed() { + return Ok(CancelResult::AlreadyCompleted); + } + + // 4. Try to cancel a pending job — DELETE from job_executions where state = 'pending' + let pending_result = sqlx::query( + r#" + DELETE FROM job_executions + WHERE id = $1 AND state = 'pending' + "#, + ) + .bind(id) + .execute(self.repo.pool()) + .await?; + if pending_result.rows_affected() > 0 { + // Record events + let mut op = self.repo.begin_op_with_clock(&self.clock).await?; + let mut job = self.repo.find_by_id(id).await?; + job.cancel_job(); self.repo.update_in_op(&mut op, &mut job).await?; op.commit().await?; + return Ok(CancelResult::CancelledWhilePending); } - Ok(()) + // 5. Try to cancel a running job — SET cancelled_at + let running_row = sqlx::query_as::<_, CancelRunningRow>( + r#" + UPDATE job_executions + SET cancelled_at = COALESCE(cancelled_at, NOW()) + WHERE id = $1 AND state = 'running' + RETURNING poller_instance_id + "#, + ) + .bind(id) + .fetch_optional(self.repo.pool()) + .await?; + + if let Some(row) = running_row { + let poller_instance_id = row.poller_instance_id.unwrap_or(uuid::Uuid::nil()); + // Also try local cancel in case we're on the same node + self.running_registry.cancel(&id); + return Ok(CancelResult::CancelledWhileRunning { poller_instance_id }); + } + + // 6. Neither pending nor running matched — re-check state + let job = self.repo.find_by_id(id).await?; + if job.cancelled() { + Ok(CancelResult::AlreadyCancelled) + } else if job.completed() { + Ok(CancelResult::AlreadyCompleted) + } else { + // Edge case: job might have been re-scheduled between our checks + Ok(CancelResult::NotFound) + } } /// Returns a reference to the clock used by this job service. diff --git a/src/poller.rs b/src/poller.rs index f735d25..d76814b 100644 --- a/src/poller.rs +++ b/src/poller.rs @@ -14,7 +14,8 @@ use std::{ use super::{ JobId, config::JobPollerConfig, dispatcher::*, error::JobError, handle::OwnedTaskHandle, - registry::JobRegistry, repo::JobRepo, tracker::JobTracker, + registry::JobRegistry, repo::JobRepo, running_registry::RunningJobRegistry, + tracker::JobTracker, }; /// Helper macro to spawn tasks with optional names based on the tokio-task-names feature @@ -40,6 +41,7 @@ pub(crate) struct JobPoller { config: JobPollerConfig, repo: Arc, registry: JobRegistry, + running_registry: RunningJobRegistry, tracker: Arc, instance_id: uuid::Uuid, shutdown_tx: tokio::sync::broadcast::Sender< @@ -60,6 +62,8 @@ pub(crate) struct JobPollerHandle { shutdown_timeout: Duration, max_jobs_per_process: usize, repo: Arc, + #[allow(dead_code)] + pub(crate) running_registry: RunningJobRegistry, instance_id: uuid::Uuid, clock: ClockHandle, } @@ -71,6 +75,7 @@ impl JobPoller { config: JobPollerConfig, repo: Arc, registry: JobRegistry, + running_registry: RunningJobRegistry, clock: ClockHandle, ) -> Self { let (shutdown_tx, _) = tokio::sync::broadcast::channel::< @@ -84,6 +89,7 @@ impl JobPoller { repo, config, registry, + running_registry, instance_id: uuid::Uuid::now_v7(), shutdown_tx, clock, @@ -96,6 +102,7 @@ impl JobPoller { let keep_alive_handle = self.start_keep_alive_handler(); let shutdown_tx = self.shutdown_tx.clone(); let repo = Arc::clone(&self.repo); + let running_registry = self.running_registry.clone(); let instance_id = self.instance_id; let shutdown_timeout = self.config.shutdown_timeout; let max_jobs_per_process = self.config.max_jobs_per_process; @@ -116,6 +123,7 @@ impl JobPoller { shutdown_tx, shutdown_called: Arc::new(AtomicBool::new(false)), repo, + running_registry, instance_id, shutdown_timeout, max_jobs_per_process, @@ -205,7 +213,9 @@ impl JobPoller { async fn start_listener(&self) -> Result { let mut listener = PgListener::connect_with(self.repo.pool()).await?; listener.listen("job_execution").await?; + listener.listen("job_cancel").await?; let tracker = self.tracker.clone(); + let running_registry = self.running_registry.clone(); let supported_job_types = self.registry.registered_job_types(); Ok(OwnedTaskHandle::new(spawn_named_task!( "job-poller-listener", @@ -213,10 +223,27 @@ impl JobPoller { loop { match listener.recv().await { Ok(notification) => { - let job_type = notification.payload(); - // Only wake the tracker if this is a job type we support - if supported_job_types.iter().any(|jt| jt.as_str() == job_type) { - tracker.job_execution_inserted(); + let channel = notification.channel(); + let payload = notification.payload(); + match channel { + "job_cancel" => { + // payload is the job execution id (UUID) + if let Ok(id) = payload.parse::() { + let job_id = JobId::from(id); + if running_registry.cancel(&job_id) { + tracing::info!( + job_id = %job_id, + "cancelled running job via NOTIFY" + ); + } + } + } + _ => { + // job_execution channel — wake tracker + if supported_job_types.iter().any(|jt| jt.as_str() == payload) { + tracker.job_execution_inserted(); + } + } } } Err(_) => { @@ -274,6 +301,7 @@ impl JobPoller { let job_lost_interval = self.config.job_lost_interval; let pool = self.repo.pool().clone(); let instance_id = self.instance_id; + let running_registry = self.running_registry.clone(); let clock = self.clock.clone(); OwnedTaskHandle::new(spawn_named_task!( "job-poller-keep-alive-handler", @@ -304,6 +332,29 @@ impl JobPoller { { Ok(_) => { failures = 0; + // Fallback: pick up missed NOTIFY by checking for cancelled jobs + if let Ok(rows) = sqlx::query_as::<_, CancelledJobRow>( + r#" + SELECT id + FROM job_executions + WHERE poller_instance_id = $1 + AND state = 'running' + AND cancelled_at IS NOT NULL + "#, + ) + .bind(instance_id) + .fetch_all(&pool) + .await + { + for row in rows { + if running_registry.cancel(&row.id) { + tracing::info!( + job_id = %row.id, + "cancelled running job via keep-alive fallback" + ); + } + } + } job_lost_interval / 4 } Err(e) => { @@ -337,6 +388,9 @@ impl JobPoller { let retry_settings = self.registry.retry_settings(&job.job_type).clone(); let repo = Arc::clone(&self.repo); let tracker = self.tracker.clone(); + let running_registry = self.running_registry.clone(); + let cancel_token = running_registry.register(job.id); + let cancel_token_for_monitor = cancel_token.clone(); let instance_id = self.instance_id; let clock = self.clock.clone(); span.record("now", tracing::field::display(clock.now())); @@ -361,6 +415,8 @@ impl JobPoller { job.id, runner, instance_id, + running_registry, + cancel_token, clock, ) .execute_job(polled_job, shutdown_rx) @@ -371,7 +427,10 @@ impl JobPoller { }); let mut shutdown_rx = self.shutdown_tx.subscribe(); + let cancel_timeout = self.config.cancel_timeout; let shutdown_timeout = self.config.shutdown_timeout; + let monitor_repo = Arc::clone(&self.repo); + let monitor_clock = self.clock.clone(); #[cfg_attr( not(all(feature = "tokio-task-names", tokio_unstable)), allow(unused_variables) @@ -385,7 +444,36 @@ impl JobPoller { tokio::select! { _ = &mut job_handle => { - // Job completed - no need for shutdown coordination + // Job completed - no need for shutdown/cancel coordination + } + _ = cancel_token_for_monitor.cancelled() => { + // Cancellation requested — give the job time to exit cooperatively, + // then force-abort if it hasn't finished. + async { + if tokio::time::timeout(cancel_timeout, &mut job_handle).await.is_ok() { + tracing::info!("Job completed cooperatively after cancel signal"); + } else { + tracing::warn!("Job did not finish within cancel timeout, aborting"); + job_handle.abort(); + let _ = (&mut job_handle).await; + + // The dispatcher was aborted before it could record events. + // Record ExecutionCancelled + JobCancelled ourselves. + if let Err(e) = force_cancel_job( + &monitor_repo, + &monitor_clock, + job_id, + instance_id, + ).await { + tracing::error!(error = %e, "failed to record force-cancel events"); + } + } + }.instrument(tracing::info_span!( + parent: None, + "job.cancel_coordination", + job_id = %job_id, + job_type = %job_type, + )).await; } Ok(shutdown_notifier) = shutdown_rx.recv() => { let (send, recv) = tokio::sync::oneshot::channel(); @@ -432,6 +520,33 @@ impl JobPoller { } } +/// Called by the monitor task after force-aborting a running job that did not +/// cooperate within the cancel timeout. Records cancellation events and removes +/// the execution row. +async fn force_cancel_job( + repo: &JobRepo, + clock: &ClockHandle, + job_id: JobId, + instance_id: uuid::Uuid, +) -> Result<(), JobError> { + let mut op = repo.begin_op_with_clock(clock).await?; + let mut job = repo.find_by_id(job_id).await?; + sqlx::query( + r#" + DELETE FROM job_executions + WHERE id = $1 AND poller_instance_id = $2 + "#, + ) + .bind(job_id) + .bind(instance_id) + .execute(op.as_executor()) + .await?; + job.cancel_job(); + repo.update_in_op(&mut op, &mut job).await?; + op.commit().await?; + Ok(()) +} + #[instrument(name = "job.poll_jobs", level = "debug", skip(pool, supported_job_types, clock), fields(n_jobs_to_poll, instance_id = %instance_id, n_jobs_found = tracing::field::Empty), err)] async fn poll_jobs( pool: &PgPool, @@ -443,13 +558,17 @@ async fn poll_jobs( let now = clock.now(); Span::current().record("now", tracing::field::display(now)); - let rows = sqlx::query_as!( - JobPollRow, + let job_type_strings: Vec = supported_job_types + .iter() + .map(|jt| jt.as_str().to_owned()) + .collect(); + let rows: Vec = sqlx::query_as( r#" WITH eligible AS ( SELECT id, queue_id, execute_at, execution_state_json, attempt_index FROM job_executions WHERE state = 'pending' + AND cancelled_at IS NULL AND job_type = ANY($4) AND NOT EXISTS ( SELECT 1 FROM job_executions AS running @@ -490,26 +609,26 @@ async fn poll_jobs( ) SELECT * FROM ( SELECT - u.id AS "id?: JobId", - u.data_json AS "data_json?: JsonValue", - u.attempt_index AS "attempt_index?", - NULL::INTERVAL AS "max_wait?: PgInterval" + u.id, + u.data_json, + u.attempt_index, + NULL::INTERVAL AS max_wait FROM updated u UNION ALL SELECT - NULL::UUID AS "id?: JobId", - NULL::JSONB AS "data_json?: JsonValue", - NULL::INT AS "attempt_index?", - mw.wait_time AS "max_wait?: PgInterval" + NULL::UUID, + NULL::JSONB, + NULL::INT, + mw.wait_time FROM min_wait mw WHERE NOT EXISTS (SELECT 1 FROM updated) ) AS result "#, - n_jobs_to_poll as i32, - now, - instance_id, - supported_job_types as _, ) + .bind(n_jobs_to_poll as i32) + .bind(now) + .bind(instance_id) + .bind(&job_type_strings) .fetch_all(pool) .await?; @@ -517,13 +636,18 @@ async fn poll_jobs( Ok(JobPollResult::from_rows(rows)) } +#[derive(sqlx::FromRow)] +struct CancelledJobRow { + id: JobId, +} + #[derive(Debug)] enum JobPollResult { Jobs(Vec), WaitTillNextJob(Duration), } -#[derive(Debug)] +#[derive(Debug, sqlx::FromRow)] struct JobPollRow { id: Option, data_json: Option, diff --git a/src/runner.rs b/src/runner.rs index 4938579..4fd17b7 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -52,6 +52,8 @@ pub trait JobInitializer: Send + Sync + 'static { pub enum JobCompletion { /// Job finished successfully; mark the record as completed. Complete, + /// Job detected cancellation and exited cooperatively. + Cancelled, #[cfg(feature = "es-entity")] /// Job finished and returns an `EsEntity` operation that the job service will commit. CompleteWithOp(es_entity::DbOp<'static>), diff --git a/src/running_registry.rs b/src/running_registry.rs new file mode 100644 index 0000000..a700760 --- /dev/null +++ b/src/running_registry.rs @@ -0,0 +1,47 @@ +//! Registry of currently running jobs and their cancellation tokens. + +use dashmap::DashMap; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; + +use crate::JobId; + +/// Tracks running jobs and provides cooperative cancellation via [`CancellationToken`]. +/// +/// Each running job registers a token when dispatched. Cross-node cancel +/// signals (NOTIFY or keep-alive fallback) look up the token here and +/// trigger cancellation. +#[derive(Clone)] +pub(crate) struct RunningJobRegistry { + inner: Arc>, +} + +impl RunningJobRegistry { + pub fn new() -> Self { + Self { + inner: Arc::new(DashMap::new()), + } + } + + /// Register a new running job and return its cancellation token. + pub fn register(&self, id: JobId) -> CancellationToken { + let token = CancellationToken::new(); + self.inner.insert(id, token.clone()); + token + } + + /// Signal cancellation for a running job. Returns `true` if the job was found. + pub fn cancel(&self, id: &JobId) -> bool { + if let Some(entry) = self.inner.get(id) { + entry.cancel(); + true + } else { + false + } + } + + /// Remove a job from the registry (called on completion/failure/abort). + pub fn remove(&self, id: &JobId) { + self.inner.remove(id); + } +} diff --git a/tests/job.rs b/tests/job.rs index 9da3927..aec19db 100644 --- a/tests/job.rs +++ b/tests/job.rs @@ -3,8 +3,8 @@ mod helpers; use async_trait::async_trait; use chrono::{DateTime, Utc}; use job::{ - ArtificialClockConfig, ClockHandle, CurrentJob, Job, JobCompletion, JobId, JobInitializer, - JobRunner, JobSpawner, JobSpec, JobSvcConfig, JobType, Jobs, error::JobError, + ArtificialClockConfig, CancelResult, ClockHandle, CurrentJob, Job, JobCompletion, JobId, + JobInitializer, JobRunner, JobSpawner, JobSpec, JobSvcConfig, JobType, Jobs, error::JobError, }; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -658,7 +658,12 @@ async fn test_cancel_pending_job() -> anyhow::Result<()> { .await?; // Cancel the pending job - jobs.cancel_job(job_id).await?; + let result = jobs.cancel_job(job_id).await?; + assert!( + matches!(result, CancelResult::CancelledWhilePending), + "Expected CancelledWhilePending, got {:?}", + result, + ); // Verify it's findable as a cancelled/completed entity let found = jobs.find(job_id).await?; @@ -709,15 +714,15 @@ async fn test_cancel_running_job_fails() -> anyhow::Result<()> { assert!(attempts < 100, "Job never started"); } - // Cancel on a running job should fail - let result = jobs.cancel_job(job_id).await; + // Cancel on a running job should signal cancellation + let result = jobs.cancel_job(job_id).await?; assert!( - matches!(result, Err(JobError::CannotCancelJob)), - "Cancelling a running job should return JobNotPending, got err: {:?}", - result.err(), + matches!(result, CancelResult::CancelledWhileRunning { .. }), + "Expected CancelledWhileRunning, got {:?}", + result, ); - // Release the job so it completes normally + // Release the job so it completes release.notify_one(); // Wait for completion @@ -767,8 +772,13 @@ async fn test_cancel_already_completed_job_is_idempotent() -> anyhow::Result<()> assert!(attempts < 100, "Job never completed"); } - // Cancel on an already completed job is a no-op - jobs.cancel_job(job_id).await?; + // Cancel on an already completed job returns AlreadyCompleted + let result = jobs.cancel_job(job_id).await?; + assert!( + matches!(result, CancelResult::AlreadyCompleted), + "Expected AlreadyCompleted, got {:?}", + result, + ); let job = jobs.find(job_id).await?; assert!( From 51f3e6e9d0bce4d9883b1308d7b3f6446ad8755d Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Sun, 15 Mar 2026 22:06:31 +0100 Subject: [PATCH 2/3] fix(test): increase polling timeout for flaky test_create_and_run_job MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Increase max_attempts from 50 to 100 (5s → 10s) to reduce CI flakiness. The bulk spawn test already uses 100 attempts; align this test similarly. Our robust cancel changes (RunningJobRegistry, CancellationToken, monitor task select! branches) do not affect the normal job dispatch path — the cancelled_at IS NULL filter is a no-op for new jobs, and the DashMap operations are lock-free with trivial overhead. Co-Authored-By: Claude Opus 4.6 --- tests/job.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/job.rs b/tests/job.rs index aec19db..578ae20 100644 --- a/tests/job.rs +++ b/tests/job.rs @@ -76,7 +76,7 @@ async fn test_create_and_run_job() -> anyhow::Result<()> { .expect("Failed to create and spawn job"); let mut attempts = 0; - let max_attempts = 50; + let max_attempts = 100; loop { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; let job = jobs.find(job.id).await?; From b1cd6ef2af3f412f20836ce27d4d8f5565eae9f7 Mon Sep 17 00:00:00 2001 From: Justin Carter Date: Mon, 16 Mar 2026 16:28:37 +0100 Subject: [PATCH 3/3] fix: address code review issues for robust cancel (#54) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(entity): make cancel_job idempotent to prevent double events When a cancel signal fires, both the dispatcher (cooperative cancel) and the monitor task (force-abort timeout) can race to finalize the same job. Both paths call job.cancel_job() which unconditionally pushed ExecutionCancelled + JobCancelled events, leading to duplicate events. Now cancel_job() checks if the job is already cancelled before pushing events, making the operation safely idempotent. Co-Authored-By: Claude Opus 4.6 * fix(job): wrap pending cancel DELETE and events in single transaction Previously, the cancel_job() method for pending jobs performed the DELETE of the execution row on the raw pool, then opened a new transaction to record cancellation events. A crash between the DELETE and the commit would leave the execution row gone but the job entity never marked as cancelled. Now both the DELETE and event recording happen inside the same atomic operation, matching the pattern used by cancel_and_complete_job in the dispatcher. Co-Authored-By: Claude Opus 4.6 * test(job): add integration tests for cooperative cancel and force-abort Test A (cooperative cancel): Creates a runner that select!s on cancellation_requested() and returns JobCompletion::Cancelled when triggered. Verifies the job ends up cancelled and completed. Test B (force-abort): Creates a runner that blocks forever (ignores cancellation). Configures a short cancel_timeout (1 second) via JobPollerConfig. Verifies the monitor task force-aborts and records cancellation events. Co-Authored-By: Claude Opus 4.6 * refactor(job): extract shared cancel finalization into repo helper The cancel_and_complete_job (dispatcher) and force_cancel_job (poller) functions were nearly identical: begin_op → find job → DELETE execution row → cancel_job() → update → commit. If one was updated the other would diverge. Extract finalize_cancelled_job() in repo.rs and call it from both sites. Co-Authored-By: Claude Opus 4.6 * fix(poller): restore compile-time query checking in poll_jobs The sqlx::query_as! macro was replaced with runtime sqlx::query_as when adding the cancelled_at IS NULL filter, losing compile-time schema validation. Restore query_as! with explicit column type overrides (id as "id: JobId") and pass job_type_strings with `as _` cast for the ANY($4) parameter. Co-Authored-By: Claude Opus 4.6 * refactor(job): rename RunningJobRegistry to CancellationTokens The struct is a map from JobId to CancellationToken — it doesn't track general running-job information (that's JobTracker's role). The new name better describes its narrow responsibility. Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- ...ing_registry.rs => cancellation_tokens.rs} | 16 +- src/dispatcher.rs | 29 +-- src/entity.rs | 3 + src/lib.rs | 53 ++--- src/poller.rs | 67 +++--- src/repo.rs | 31 +++ tests/job.rs | 195 +++++++++++++++++- 7 files changed, 301 insertions(+), 93 deletions(-) rename src/{running_registry.rs => cancellation_tokens.rs} (65%) diff --git a/src/running_registry.rs b/src/cancellation_tokens.rs similarity index 65% rename from src/running_registry.rs rename to src/cancellation_tokens.rs index a700760..670ff6c 100644 --- a/src/running_registry.rs +++ b/src/cancellation_tokens.rs @@ -1,4 +1,4 @@ -//! Registry of currently running jobs and their cancellation tokens. +//! Maps running job IDs to their cancellation tokens. use dashmap::DashMap; use std::sync::Arc; @@ -6,17 +6,17 @@ use tokio_util::sync::CancellationToken; use crate::JobId; -/// Tracks running jobs and provides cooperative cancellation via [`CancellationToken`]. +/// Maps each running job to its [`CancellationToken`]. /// -/// Each running job registers a token when dispatched. Cross-node cancel -/// signals (NOTIFY or keep-alive fallback) look up the token here and -/// trigger cancellation. +/// When a job is dispatched, a token is registered here. Cross-node cancel +/// signals (NOTIFY or keep-alive fallback) look up the token and trigger +/// cancellation. #[derive(Clone)] -pub(crate) struct RunningJobRegistry { +pub(crate) struct CancellationTokens { inner: Arc>, } -impl RunningJobRegistry { +impl CancellationTokens { pub fn new() -> Self { Self { inner: Arc::new(DashMap::new()), @@ -40,7 +40,7 @@ impl RunningJobRegistry { } } - /// Remove a job from the registry (called on completion/failure/abort). + /// Remove a job from the map (called on completion/failure/abort). pub fn remove(&self, id: &JobId) { self.inner.remove(id); } diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 5b258ed..8f7d2c7 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -9,8 +9,8 @@ use tracing::{Span, instrument}; use std::{panic::AssertUnwindSafe, sync::Arc}; use super::{ - JobId, current::CurrentJob, entity::RetryPolicy, error::JobError, repo::JobRepo, runner::*, - running_registry::RunningJobRegistry, tracker::JobTracker, + JobId, cancellation_tokens::CancellationTokens, current::CurrentJob, entity::RetryPolicy, + error::JobError, repo::JobRepo, runner::*, tracker::JobTracker, }; #[derive(Debug)] @@ -25,7 +25,7 @@ pub(crate) struct JobDispatcher { retry_settings: RetrySettings, runner: Option>, tracker: Arc, - running_registry: RunningJobRegistry, + cancellation_tokens: CancellationTokens, cancel_token: CancellationToken, job_id: JobId, rescheduled: bool, @@ -41,7 +41,7 @@ impl JobDispatcher { id: JobId, runner: Box, instance_id: uuid::Uuid, - running_registry: RunningJobRegistry, + cancellation_tokens: CancellationTokens, cancel_token: CancellationToken, clock: ClockHandle, ) -> Self { @@ -50,7 +50,7 @@ impl JobDispatcher { retry_settings, runner: Some(runner), tracker, - running_registry, + cancellation_tokens, cancel_token, job_id: id, rescheduled: false, @@ -348,22 +348,7 @@ impl JobDispatcher { #[instrument(name = "job.cancel_and_complete_job", skip(self), fields(id = %id))] async fn cancel_and_complete_job(&mut self, id: JobId) -> Result<(), JobError> { - let mut op = self.repo.begin_op_with_clock(&self.clock).await?; - let mut job = self.repo.find_by_id(&id).await?; - sqlx::query!( - r#" - DELETE FROM job_executions - WHERE id = $1 AND poller_instance_id = $2 - "#, - id as JobId, - self.instance_id - ) - .execute(op.as_executor()) - .await?; - job.cancel_job(); - self.repo.update_in_op(&mut op, &mut job).await?; - op.commit().await?; - Ok(()) + super::repo::finalize_cancelled_job(&self.repo, &self.clock, id, self.instance_id).await } #[instrument(name = "job.reschedule_job", skip(self, op), fields(id = %id, reschedule_at = %reschedule_at, attempt = 1))] @@ -395,7 +380,7 @@ impl JobDispatcher { impl Drop for JobDispatcher { fn drop(&mut self) { - self.running_registry.remove(&self.job_id); + self.cancellation_tokens.remove(&self.job_id); self.tracker.job_completed(self.rescheduled) } } diff --git a/src/entity.rs b/src/entity.rs index 543f6bd..f80d4c0 100644 --- a/src/entity.rs +++ b/src/entity.rs @@ -195,6 +195,9 @@ impl Job { } pub(super) fn cancel_job(&mut self) { + if self.cancelled() { + return; + } self.events.push(JobEvent::ExecutionCancelled); self.events.push(JobEvent::JobCancelled); } diff --git a/src/lib.rs b/src/lib.rs index b5219aa..e63237f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -205,6 +205,7 @@ #![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))] #![forbid(unsafe_code)] +mod cancellation_tokens; mod config; mod current; mod dispatcher; @@ -215,12 +216,12 @@ mod poller; mod registry; mod repo; mod runner; -mod running_registry; mod spawner; mod tracker; pub mod error; +use es_entity::AtomicOperation; use tracing::instrument; use std::sync::{Arc, Mutex}; @@ -236,10 +237,10 @@ pub use registry::*; pub use runner::*; pub use spawner::*; +use cancellation_tokens::CancellationTokens; use error::*; use poller::*; use repo::*; -use running_registry::RunningJobRegistry; #[derive(sqlx::FromRow)] struct CancelRunningRow { @@ -273,7 +274,7 @@ pub struct Jobs { config: JobSvcConfig, repo: Arc, registry: Arc>>, - running_registry: RunningJobRegistry, + cancellation_tokens: CancellationTokens, poller_handle: Option>, clock: ClockHandle, } @@ -303,13 +304,13 @@ impl Jobs { let repo = Arc::new(JobRepo::new(&pool)); let registry = Arc::new(Mutex::new(Some(JobRegistry::new()))); - let running_registry = RunningJobRegistry::new(); + let cancellation_tokens = CancellationTokens::new(); let clock = config.clock.clone(); Ok(Self { repo, config, registry, - running_registry, + cancellation_tokens, poller_handle: None, clock, }) @@ -471,7 +472,7 @@ impl Jobs { self.config.poller_config.clone(), Arc::clone(&self.repo), registry, - self.running_registry.clone(), + self.cancellation_tokens.clone(), self.clock.clone(), ) .start() @@ -540,24 +541,28 @@ impl Jobs { } // 4. Try to cancel a pending job — DELETE from job_executions where state = 'pending' - let pending_result = sqlx::query( - r#" - DELETE FROM job_executions - WHERE id = $1 AND state = 'pending' - "#, - ) - .bind(id) - .execute(self.repo.pool()) - .await?; - - if pending_result.rows_affected() > 0 { - // Record events + // Both the DELETE and event recording happen in the same transaction so + // a crash between the two cannot leave the execution row gone but the + // job entity un-cancelled. + { let mut op = self.repo.begin_op_with_clock(&self.clock).await?; - let mut job = self.repo.find_by_id(id).await?; - job.cancel_job(); - self.repo.update_in_op(&mut op, &mut job).await?; - op.commit().await?; - return Ok(CancelResult::CancelledWhilePending); + let pending_result = sqlx::query!( + r#" + DELETE FROM job_executions + WHERE id = $1 AND state = 'pending' + "#, + id as JobId, + ) + .execute(op.as_executor()) + .await?; + + if pending_result.rows_affected() > 0 { + let mut job = self.repo.find_by_id(id).await?; + job.cancel_job(); + self.repo.update_in_op(&mut op, &mut job).await?; + op.commit().await?; + return Ok(CancelResult::CancelledWhilePending); + } } // 5. Try to cancel a running job — SET cancelled_at @@ -576,7 +581,7 @@ impl Jobs { if let Some(row) = running_row { let poller_instance_id = row.poller_instance_id.unwrap_or(uuid::Uuid::nil()); // Also try local cancel in case we're on the same node - self.running_registry.cancel(&id); + self.cancellation_tokens.cancel(&id); return Ok(CancelResult::CancelledWhileRunning { poller_instance_id }); } diff --git a/src/poller.rs b/src/poller.rs index d76814b..4caa922 100644 --- a/src/poller.rs +++ b/src/poller.rs @@ -13,8 +13,8 @@ use std::{ }; use super::{ - JobId, config::JobPollerConfig, dispatcher::*, error::JobError, handle::OwnedTaskHandle, - registry::JobRegistry, repo::JobRepo, running_registry::RunningJobRegistry, + JobId, cancellation_tokens::CancellationTokens, config::JobPollerConfig, dispatcher::*, + error::JobError, handle::OwnedTaskHandle, registry::JobRegistry, repo::JobRepo, tracker::JobTracker, }; @@ -41,7 +41,7 @@ pub(crate) struct JobPoller { config: JobPollerConfig, repo: Arc, registry: JobRegistry, - running_registry: RunningJobRegistry, + cancellation_tokens: CancellationTokens, tracker: Arc, instance_id: uuid::Uuid, shutdown_tx: tokio::sync::broadcast::Sender< @@ -63,7 +63,7 @@ pub(crate) struct JobPollerHandle { max_jobs_per_process: usize, repo: Arc, #[allow(dead_code)] - pub(crate) running_registry: RunningJobRegistry, + pub(crate) cancellation_tokens: CancellationTokens, instance_id: uuid::Uuid, clock: ClockHandle, } @@ -75,7 +75,7 @@ impl JobPoller { config: JobPollerConfig, repo: Arc, registry: JobRegistry, - running_registry: RunningJobRegistry, + cancellation_tokens: CancellationTokens, clock: ClockHandle, ) -> Self { let (shutdown_tx, _) = tokio::sync::broadcast::channel::< @@ -89,7 +89,7 @@ impl JobPoller { repo, config, registry, - running_registry, + cancellation_tokens, instance_id: uuid::Uuid::now_v7(), shutdown_tx, clock, @@ -102,7 +102,7 @@ impl JobPoller { let keep_alive_handle = self.start_keep_alive_handler(); let shutdown_tx = self.shutdown_tx.clone(); let repo = Arc::clone(&self.repo); - let running_registry = self.running_registry.clone(); + let cancellation_tokens = self.cancellation_tokens.clone(); let instance_id = self.instance_id; let shutdown_timeout = self.config.shutdown_timeout; let max_jobs_per_process = self.config.max_jobs_per_process; @@ -123,7 +123,7 @@ impl JobPoller { shutdown_tx, shutdown_called: Arc::new(AtomicBool::new(false)), repo, - running_registry, + cancellation_tokens, instance_id, shutdown_timeout, max_jobs_per_process, @@ -215,7 +215,7 @@ impl JobPoller { listener.listen("job_execution").await?; listener.listen("job_cancel").await?; let tracker = self.tracker.clone(); - let running_registry = self.running_registry.clone(); + let cancellation_tokens = self.cancellation_tokens.clone(); let supported_job_types = self.registry.registered_job_types(); Ok(OwnedTaskHandle::new(spawn_named_task!( "job-poller-listener", @@ -230,7 +230,7 @@ impl JobPoller { // payload is the job execution id (UUID) if let Ok(id) = payload.parse::() { let job_id = JobId::from(id); - if running_registry.cancel(&job_id) { + if cancellation_tokens.cancel(&job_id) { tracing::info!( job_id = %job_id, "cancelled running job via NOTIFY" @@ -301,7 +301,7 @@ impl JobPoller { let job_lost_interval = self.config.job_lost_interval; let pool = self.repo.pool().clone(); let instance_id = self.instance_id; - let running_registry = self.running_registry.clone(); + let cancellation_tokens = self.cancellation_tokens.clone(); let clock = self.clock.clone(); OwnedTaskHandle::new(spawn_named_task!( "job-poller-keep-alive-handler", @@ -347,7 +347,7 @@ impl JobPoller { .await { for row in rows { - if running_registry.cancel(&row.id) { + if cancellation_tokens.cancel(&row.id) { tracing::info!( job_id = %row.id, "cancelled running job via keep-alive fallback" @@ -388,8 +388,8 @@ impl JobPoller { let retry_settings = self.registry.retry_settings(&job.job_type).clone(); let repo = Arc::clone(&self.repo); let tracker = self.tracker.clone(); - let running_registry = self.running_registry.clone(); - let cancel_token = running_registry.register(job.id); + let cancellation_tokens = self.cancellation_tokens.clone(); + let cancel_token = cancellation_tokens.register(job.id); let cancel_token_for_monitor = cancel_token.clone(); let instance_id = self.instance_id; let clock = self.clock.clone(); @@ -415,7 +415,7 @@ impl JobPoller { job.id, runner, instance_id, - running_registry, + cancellation_tokens, cancel_token, clock, ) @@ -522,29 +522,14 @@ impl JobPoller { /// Called by the monitor task after force-aborting a running job that did not /// cooperate within the cancel timeout. Records cancellation events and removes -/// the execution row. +/// the execution row. Delegates to the shared `finalize_cancelled_job` helper. async fn force_cancel_job( repo: &JobRepo, clock: &ClockHandle, job_id: JobId, instance_id: uuid::Uuid, ) -> Result<(), JobError> { - let mut op = repo.begin_op_with_clock(clock).await?; - let mut job = repo.find_by_id(job_id).await?; - sqlx::query( - r#" - DELETE FROM job_executions - WHERE id = $1 AND poller_instance_id = $2 - "#, - ) - .bind(job_id) - .bind(instance_id) - .execute(op.as_executor()) - .await?; - job.cancel_job(); - repo.update_in_op(&mut op, &mut job).await?; - op.commit().await?; - Ok(()) + super::repo::finalize_cancelled_job(repo, clock, job_id, instance_id).await } #[instrument(name = "job.poll_jobs", level = "debug", skip(pool, supported_job_types, clock), fields(n_jobs_to_poll, instance_id = %instance_id, n_jobs_found = tracing::field::Empty), err)] @@ -562,7 +547,8 @@ async fn poll_jobs( .iter() .map(|jt| jt.as_str().to_owned()) .collect(); - let rows: Vec = sqlx::query_as( + let rows = sqlx::query_as!( + JobPollRow, r#" WITH eligible AS ( SELECT id, queue_id, execute_at, execution_state_json, attempt_index @@ -607,7 +593,12 @@ async fn poll_jobs( WHERE je.id = selected_jobs.id RETURNING je.id, selected_jobs.data_json, je.attempt_index ) - SELECT * FROM ( + SELECT + id as "id: JobId", + data_json, + attempt_index, + max_wait + FROM ( SELECT u.id, u.data_json, @@ -624,11 +615,11 @@ async fn poll_jobs( WHERE NOT EXISTS (SELECT 1 FROM updated) ) AS result "#, + n_jobs_to_poll as i32, + now, + instance_id, + &job_type_strings as _, ) - .bind(n_jobs_to_poll as i32) - .bind(now) - .bind(instance_id) - .bind(&job_type_strings) .fetch_all(pool) .await?; diff --git a/src/repo.rs b/src/repo.rs index 5cd84b0..b7dd276 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,8 +1,10 @@ +use es_entity::clock::ClockHandle; use sqlx::PgPool; use es_entity::*; use super::entity::*; +use super::error::JobError; use crate::JobId; #[derive(EsRepo, Clone)] @@ -28,6 +30,35 @@ impl JobRepo { } } +/// Atomically delete the execution row and record cancellation events. +/// +/// Used by both the dispatcher (cooperative cancel) and the monitor task +/// (force-abort) to finalize a cancelled job. Runs the DELETE and event +/// recording in a single transaction to avoid partial state. +pub(crate) async fn finalize_cancelled_job( + repo: &JobRepo, + clock: &ClockHandle, + job_id: JobId, + instance_id: uuid::Uuid, +) -> Result<(), JobError> { + let mut op = repo.begin_op_with_clock(clock).await?; + let mut job = repo.find_by_id(job_id).await?; + sqlx::query!( + r#" + DELETE FROM job_executions + WHERE id = $1 AND poller_instance_id = $2 + "#, + job_id as JobId, + instance_id, + ) + .execute(op.as_executor()) + .await?; + job.cancel_job(); + repo.update_in_op(&mut op, &mut job).await?; + op.commit().await?; + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests/job.rs b/tests/job.rs index 578ae20..f51d364 100644 --- a/tests/job.rs +++ b/tests/job.rs @@ -4,7 +4,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use job::{ ArtificialClockConfig, CancelResult, ClockHandle, CurrentJob, Job, JobCompletion, JobId, - JobInitializer, JobRunner, JobSpawner, JobSpec, JobSvcConfig, JobType, Jobs, error::JobError, + JobInitializer, JobPollerConfig, JobRunner, JobSpawner, JobSpec, JobSvcConfig, JobType, Jobs, + error::JobError, }; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -789,3 +790,195 @@ async fn test_cancel_already_completed_job_is_idempotent() -> anyhow::Result<()> Ok(()) } + +// -- Cooperative cancellation test infrastructure -- + +/// A job initializer whose runner detects cancellation via `select!` and +/// returns `JobCompletion::Cancelled`. +struct CooperativeCancelInitializer { + started: Arc, +} + +impl JobInitializer for CooperativeCancelInitializer { + type Config = TestJobConfig; + + fn job_type(&self) -> JobType { + JobType::new("cooperative-cancel") + } + + fn init( + &self, + _job: &Job, + _: JobSpawner, + ) -> Result, Box> { + Ok(Box::new(CooperativeCancelRunner { + started: Arc::clone(&self.started), + })) + } +} + +struct CooperativeCancelRunner { + started: Arc, +} + +#[async_trait] +impl JobRunner for CooperativeCancelRunner { + async fn run( + &self, + current_job: CurrentJob, + ) -> Result> { + self.started.notify_one(); + tokio::select! { + _ = current_job.cancellation_requested() => { + Ok(JobCompletion::Cancelled) + } + _ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => { + Ok(JobCompletion::Complete) + } + } + } +} + +#[tokio::test] +async fn test_cooperative_cancel() -> anyhow::Result<()> { + let pool = helpers::init_pool().await?; + let config = JobSvcConfig::builder() + .pool(pool) + .build() + .expect("Failed to build JobsConfig"); + + let mut jobs = Jobs::init(config).await?; + + let started = Arc::new(Notify::new()); + let spawner = jobs.add_initializer(CooperativeCancelInitializer { + started: Arc::clone(&started), + }); + + jobs.start_poll() + .await + .expect("Failed to start job polling"); + + let job_id = JobId::new(); + spawner.spawn(job_id, TestJobConfig { delay_ms: 0 }).await?; + + // Wait for the job to start running + tokio::time::timeout(tokio::time::Duration::from_secs(5), started.notified()) + .await + .expect("Job never started"); + + // Cancel the running job — the runner should detect it and return Cancelled + let result = jobs.cancel_job(job_id).await?; + assert!( + matches!(result, CancelResult::CancelledWhileRunning { .. }), + "Expected CancelledWhileRunning, got {:?}", + result, + ); + + // Wait for the job entity to be marked cancelled + let mut attempts = 0; + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let job = jobs.find(job_id).await?; + if job.cancelled() && job.completed() { + break; + } + attempts += 1; + assert!(attempts < 50, "Job was not marked cancelled within timeout"); + } + + Ok(()) +} + +/// A job initializer whose runner ignores cancellation entirely (blocks forever). +struct BlockingJobInitializer { + started: Arc, +} + +impl JobInitializer for BlockingJobInitializer { + type Config = TestJobConfig; + + fn job_type(&self) -> JobType { + JobType::new("blocking-forever") + } + + fn init( + &self, + _job: &Job, + _: JobSpawner, + ) -> Result, Box> { + Ok(Box::new(BlockingJobRunner { + started: Arc::clone(&self.started), + })) + } +} + +struct BlockingJobRunner { + started: Arc, +} + +#[async_trait] +impl JobRunner for BlockingJobRunner { + async fn run( + &self, + _current_job: CurrentJob, + ) -> Result> { + self.started.notify_one(); + // Block forever — ignores cancellation + std::future::pending::<()>().await; + Ok(JobCompletion::Complete) + } +} + +#[tokio::test] +async fn test_force_abort_cancel() -> anyhow::Result<()> { + let pool = helpers::init_pool().await?; + let config = JobSvcConfig::builder() + .pool(pool) + .poller_config(JobPollerConfig { + cancel_timeout: std::time::Duration::from_secs(1), + ..Default::default() + }) + .build() + .expect("Failed to build JobsConfig"); + + let mut jobs = Jobs::init(config).await?; + + let started = Arc::new(Notify::new()); + let spawner = jobs.add_initializer(BlockingJobInitializer { + started: Arc::clone(&started), + }); + + jobs.start_poll() + .await + .expect("Failed to start job polling"); + + let job_id = JobId::new(); + spawner.spawn(job_id, TestJobConfig { delay_ms: 0 }).await?; + + // Wait for the job to start running + tokio::time::timeout(tokio::time::Duration::from_secs(5), started.notified()) + .await + .expect("Job never started"); + + // Cancel the running job — the runner ignores it, so force-abort will kick in + let result = jobs.cancel_job(job_id).await?; + assert!( + matches!(result, CancelResult::CancelledWhileRunning { .. }), + "Expected CancelledWhileRunning, got {:?}", + result, + ); + + // Wait for the force-abort to cancel the job (cancel_timeout is 1s) + let mut attempts = 0; + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + let job = jobs.find(job_id).await?; + if job.cancelled() && job.completed() { + break; + } + attempts += 1; + assert!(attempts < 30, "Job was not force-cancelled within timeout"); + } + + Ok(()) +}