Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ target/
.env
.bacon-locations
.claude/settings.local.json
.mcp.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ tokio = { workspace = true }
uuid = { workspace = true }
rand = { workspace = true }

dashmap = { workspace = true }
tokio-util = { workspace = true }
schemars = { workspace = true, optional = true }

[dev-dependencies]
Expand All @@ -58,4 +60,6 @@ tokio = { version = "1.50", features = ["rt-multi-thread", "macros"] }
uuid = { version = "1.22", features = ["serde", "v7"] }
futures = "0.3"
rand = "0.10"
dashmap = "6.1"
tokio-util = "0.7"
schemars = { version = "1.0", features = ["derive", "chrono04", "rust_decimal1"] }
1 change: 1 addition & 0 deletions migrations/20250904065521_job_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE jobs (
id UUID PRIMARY KEY,
unique_per_type BOOLEAN NOT NULL,
job_type VARCHAR NOT NULL,
cancelled_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX idx_unique_job_type ON jobs (job_type) WHERE unique_per_type = TRUE;
Expand Down
42 changes: 42 additions & 0 deletions src/cancellation_tokens.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//! Shared store of per-job cancellation tokens.

use dashmap::DashMap;
use tokio_util::sync::CancellationToken;

use crate::JobId;

/// Thread-safe store mapping running job IDs to their cancellation tokens.
///
/// Tokens are inserted when a job is dispatched and removed when it completes
/// (or is cancelled). The notification router calls [`cancel`] when a
/// `job_cancel` event arrives; the poller sweep also calls it as a safety net.
pub(crate) struct CancellationTokens {
tokens: DashMap<JobId, CancellationToken>,
}

impl CancellationTokens {
pub fn new() -> Self {
Self {
tokens: DashMap::new(),
}
}

/// Insert a new token for `job_id` and return a clone the runner can observe.
pub fn insert(&self, job_id: JobId) -> CancellationToken {
let token = CancellationToken::new();
self.tokens.insert(job_id, token.clone());
token
}

/// Remove the token without cancelling it (used on normal completion).
pub fn remove(&self, job_id: &JobId) {
self.tokens.remove(job_id);
}

/// Cancel the token for `job_id`, signalling the running job to stop.
pub fn cancel(&self, job_id: &JobId) {
if let Some((_, token)) = self.tokens.remove(job_id) {
token.cancel();
}
}
}
9 changes: 9 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ pub struct JobPollerConfig {
#[serde(default = "default_shutdown_timeout")]
/// How long to wait for jobs to complete gracefully during shutdown before rescheduling them.
pub shutdown_timeout: Duration,
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(default = "default_cancel_timeout")]
/// Grace period after cancellation is requested before force-aborting a running job.
pub cancel_timeout: Duration,
}

impl Default for JobPollerConfig {
Expand All @@ -33,6 +37,7 @@ impl Default for JobPollerConfig {
max_jobs_per_process: default_max_jobs_per_process(),
min_jobs_per_process: default_min_jobs_per_process(),
shutdown_timeout: default_shutdown_timeout(),
cancel_timeout: default_cancel_timeout(),
}
}
}
Expand Down Expand Up @@ -161,3 +166,7 @@ fn default_min_jobs_per_process() -> usize {
fn default_shutdown_timeout() -> Duration {
Duration::from_secs(5)
}

fn default_cancel_timeout() -> Duration {
Duration::from_secs(30)
}
42 changes: 42 additions & 0 deletions src/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
use es_entity::clock::ClockHandle;
use serde::{Serialize, de::DeserializeOwned};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;

use std::sync::{Arc, Mutex};

use super::{JobId, error::JobError};

Expand All @@ -16,9 +19,12 @@ pub struct CurrentJob {
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Receiver<()>>,
>,
clock: ClockHandle,
result: Arc<Mutex<Option<serde_json::Value>>>,
cancel_token: CancellationToken,
}

impl CurrentJob {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
id: JobId,
attempt: u32,
Expand All @@ -28,6 +34,8 @@ impl CurrentJob {
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Receiver<()>>,
>,
clock: ClockHandle,
result: Arc<Mutex<Option<serde_json::Value>>>,
cancel_token: CancellationToken,
) -> Self {
Self {
id,
Expand All @@ -36,6 +44,8 @@ impl CurrentJob {
execution_state_json: execution_state,
shutdown_rx,
clock,
result,
cancel_token,
}
}

Expand Down Expand Up @@ -122,6 +132,22 @@ impl CurrentJob {
Ok(ret)
}

/// Attach or update the result value for this job execution.
///
/// The result is serialized to JSON and will be available to callers via
/// [`Jobs::await_completion`](crate::Jobs::await_completion). Each call
/// overwrites the previous value — the **last** value set before the job
/// completes (or errors) is what gets persisted. This allows incremental
/// progress updates; for example, a batch job can call `set_result` after
/// each chunk so that partial progress is preserved even on failure.
pub fn set_result<T: Serialize>(&self, result: &T) -> Result<(), JobError> {
let json =
serde_json::to_value(result).map_err(JobError::CouldNotSerializeExecutionState)?;
let mut guard = self.result.lock().expect("result mutex poisoned");
*guard = Some(json);
Ok(())
}

/// Wait for a shutdown signal. Returns `true` if shutdown was requested.
///
/// Job runners can use this to detect when the application is shutting down
Expand Down Expand Up @@ -155,4 +181,20 @@ impl CurrentJob {
pub fn is_shutdown_requested(&mut self) -> bool {
self.shutdown_rx.try_recv().is_ok()
}

/// Non-blocking check if cancellation has been requested for this job.
///
/// Returns `true` once the job has been cancelled via [`Jobs::cancel_job`](crate::Jobs::cancel_job).
/// Job runners should check this periodically and return
/// [`JobCompletion::Cancelled`](crate::JobCompletion::Cancelled) when `true`.
pub fn cancellation_requested(&self) -> bool {
self.cancel_token.is_cancelled()
}

/// Returns a future that resolves when cancellation is requested.
///
/// Useful in `tokio::select!` branches for cooperative cancellation.
pub async fn cancellation_notified(&self) {
self.cancel_token.cancelled().await;
}
}
Loading
Loading