Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ target/
.env
.bacon-locations
.claude/settings.local.json
.mcp.json
35 changes: 35 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ tokio = { workspace = true }
uuid = { workspace = true }
rand = { workspace = true }

dashmap = { workspace = true }
tokio-util = { workspace = true }

schemars = { workspace = true, optional = true }

[dev-dependencies]
Expand All @@ -58,4 +61,6 @@ tokio = { version = "1.50", features = ["rt-multi-thread", "macros"] }
uuid = { version = "1.22", features = ["serde", "v7"] }
futures = "0.3"
rand = "0.10"
dashmap = "6"
tokio-util = "0.7"
schemars = { version = "1.0", features = ["derive", "chrono04", "rust_decimal1"] }
20 changes: 20 additions & 0 deletions migrations/20250905000000_add_cancelled_at.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- Add cancelled_at column to support cooperative job cancellation.
-- When set to a non-NULL value, a running job is marked for cancellation.

ALTER TABLE job_executions ADD COLUMN cancelled_at TIMESTAMPTZ;

-- When cancelled_at transitions from NULL to non-NULL, fire a NOTIFY so
-- the poller instance running the job can cancel it promptly.
CREATE OR REPLACE FUNCTION notify_job_cancel() RETURNS TRIGGER AS $$
BEGIN
IF OLD.cancelled_at IS NULL AND NEW.cancelled_at IS NOT NULL THEN
PERFORM pg_notify('job_cancel', NEW.id::text);
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER job_executions_notify_cancel_trigger
AFTER UPDATE ON job_executions
FOR EACH ROW
EXECUTE FUNCTION notify_job_cancel();
47 changes: 47 additions & 0 deletions src/cancellation_tokens.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! Maps running job IDs to their cancellation tokens.

use dashmap::DashMap;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

use crate::JobId;

/// Maps each running job to its [`CancellationToken`].
///
/// When a job is dispatched, a token is registered here. Cross-node cancel
/// signals (NOTIFY or keep-alive fallback) look up the token and trigger
/// cancellation.
#[derive(Clone)]
pub(crate) struct CancellationTokens {
inner: Arc<DashMap<JobId, CancellationToken>>,
}

impl CancellationTokens {
pub fn new() -> Self {
Self {
inner: Arc::new(DashMap::new()),
}
}

/// Register a new running job and return its cancellation token.
pub fn register(&self, id: JobId) -> CancellationToken {
let token = CancellationToken::new();
self.inner.insert(id, token.clone());
token
}

/// Signal cancellation for a running job. Returns `true` if the job was found.
pub fn cancel(&self, id: &JobId) -> bool {
if let Some(entry) = self.inner.get(id) {
entry.cancel();
true
} else {
false
}
}

/// Remove a job from the map (called on completion/failure/abort).
pub fn remove(&self, id: &JobId) {
self.inner.remove(id);
}
}
9 changes: 9 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ pub struct JobPollerConfig {
#[serde(default = "default_shutdown_timeout")]
/// How long to wait for jobs to complete gracefully during shutdown before rescheduling them.
pub shutdown_timeout: Duration,
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(default = "default_cancel_timeout")]
/// How long to wait for a cancelled job to finish cooperatively before force-aborting it.
pub cancel_timeout: Duration,
}

impl Default for JobPollerConfig {
Expand All @@ -33,6 +37,7 @@ impl Default for JobPollerConfig {
max_jobs_per_process: default_max_jobs_per_process(),
min_jobs_per_process: default_min_jobs_per_process(),
shutdown_timeout: default_shutdown_timeout(),
cancel_timeout: default_cancel_timeout(),
}
}
}
Expand Down Expand Up @@ -161,3 +166,7 @@ fn default_min_jobs_per_process() -> usize {
fn default_shutdown_timeout() -> Duration {
Duration::from_secs(5)
}

fn default_cancel_timeout() -> Duration {
Duration::from_secs(5)
}
37 changes: 37 additions & 0 deletions src/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use es_entity::clock::ClockHandle;
use serde::{Serialize, de::DeserializeOwned};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;

use super::{JobId, error::JobError};

Expand All @@ -15,6 +16,7 @@ pub struct CurrentJob {
shutdown_rx: tokio::sync::broadcast::Receiver<
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Receiver<()>>,
>,
cancel_token: CancellationToken,
clock: ClockHandle,
}

Expand All @@ -27,6 +29,7 @@ impl CurrentJob {
shutdown_rx: tokio::sync::broadcast::Receiver<
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Receiver<()>>,
>,
cancel_token: CancellationToken,
clock: ClockHandle,
) -> Self {
Self {
Expand All @@ -35,6 +38,7 @@ impl CurrentJob {
pool,
execution_state_json: execution_state,
shutdown_rx,
cancel_token,
clock,
}
}
Expand Down Expand Up @@ -155,4 +159,37 @@ impl CurrentJob {
pub fn is_shutdown_requested(&mut self) -> bool {
self.shutdown_rx.try_recv().is_ok()
}

/// Wait for a cancellation signal. Resolves when the job has been cancelled.
///
/// Unlike [`shutdown_requested`](Self::shutdown_requested), this is specific
/// to an individual job being cancelled (e.g. via [`Jobs::cancel_job`](crate::Jobs::cancel_job)).
///
/// # Example
///
/// ```no_run
/// # use job::CurrentJob;
/// # async fn example(current_job: CurrentJob) {
/// tokio::select! {
/// _ = current_job.cancellation_requested() => {
/// // Cancellation requested, clean up and exit
/// return;
/// }
/// result = do_work() => {
/// // Normal work completion
/// }
/// }
/// # }
/// # async fn do_work() {}
/// ```
pub async fn cancellation_requested(&self) {
self.cancel_token.cancelled().await;
}

/// Non-blocking check if cancellation has been requested for this job.
///
/// Returns `true` if the job's cancellation token has been triggered.
pub fn is_cancellation_requested(&self) -> bool {
self.cancel_token.is_cancelled()
}
}
27 changes: 24 additions & 3 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use es_entity::AtomicOperation;
use es_entity::clock::ClockHandle;
use futures::FutureExt;
use serde_json::Value as JsonValue;
use tokio_util::sync::CancellationToken;
use tracing::{Span, instrument};

use std::{panic::AssertUnwindSafe, sync::Arc};

use super::{
JobId, current::CurrentJob, entity::RetryPolicy, error::JobError, repo::JobRepo, runner::*,
tracker::JobTracker,
JobId, cancellation_tokens::CancellationTokens, current::CurrentJob, entity::RetryPolicy,
error::JobError, repo::JobRepo, runner::*, tracker::JobTracker,
};

#[derive(Debug)]
Expand All @@ -24,25 +25,34 @@ pub(crate) struct JobDispatcher {
retry_settings: RetrySettings,
runner: Option<Box<dyn JobRunner>>,
tracker: Arc<JobTracker>,
cancellation_tokens: CancellationTokens,
cancel_token: CancellationToken,
job_id: JobId,
rescheduled: bool,
instance_id: uuid::Uuid,
clock: ClockHandle,
}
impl JobDispatcher {
#[allow(clippy::too_many_arguments)]
pub fn new(
repo: Arc<JobRepo>,
tracker: Arc<JobTracker>,
retry_settings: RetrySettings,
_id: JobId,
id: JobId,
runner: Box<dyn JobRunner>,
instance_id: uuid::Uuid,
cancellation_tokens: CancellationTokens,
cancel_token: CancellationToken,
clock: ClockHandle,
) -> Self {
Self {
repo,
retry_settings,
runner: Some(runner),
tracker,
cancellation_tokens,
cancel_token,
job_id: id,
rescheduled: false,
instance_id,
clock,
Expand Down Expand Up @@ -88,6 +98,7 @@ impl JobDispatcher {
self.repo.pool().clone(),
polled_job.data_json,
shutdown_rx,
self.cancel_token.clone(),
self.clock.clone(),
);
self.tracker.dispatch_job();
Expand All @@ -96,6 +107,10 @@ impl JobDispatcher {
span.record("conclusion", "Error");
self.fail_job(job.id, e, polled_job.attempt).await?
}
Ok(JobCompletion::Cancelled) => {
span.record("conclusion", "Cancelled");
self.cancel_and_complete_job(job.id).await?;
}
Ok(JobCompletion::Complete) => {
span.record("conclusion", "Complete");
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
Expand Down Expand Up @@ -331,6 +346,11 @@ impl JobDispatcher {
Ok(())
}

#[instrument(name = "job.cancel_and_complete_job", skip(self), fields(id = %id))]
async fn cancel_and_complete_job(&mut self, id: JobId) -> Result<(), JobError> {
super::repo::finalize_cancelled_job(&self.repo, &self.clock, id, self.instance_id).await
}

#[instrument(name = "job.reschedule_job", skip(self, op), fields(id = %id, reschedule_at = %reschedule_at, attempt = 1))]
async fn reschedule_job(
&mut self,
Expand Down Expand Up @@ -360,6 +380,7 @@ impl JobDispatcher {

impl Drop for JobDispatcher {
fn drop(&mut self) {
self.cancellation_tokens.remove(&self.job_id);
self.tracker.job_completed(self.rescheduled)
}
}
Loading
Loading