Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 2 additions & 24 deletions src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ pub enum JobTerminalState {
Completed,
/// The job exhausted its retries and was marked as errored.
Errored,
/// The job was explicitly cancelled before completion.
Cancelled,
}

#[derive(Clone, Eq, Hash, PartialEq, Debug, Serialize, Deserialize, sqlx::Type)]
Expand Down Expand Up @@ -143,7 +141,6 @@ pub enum JobEvent {
result: JobResult,
},
JobCompleted,
Cancelled,
AttemptCounterReset,
}

Expand Down Expand Up @@ -251,33 +248,23 @@ impl Job {
serde_json::from_value(self.config.clone())
}

/// Returns `true` once the job has emitted a `JobCompleted` or `Cancelled` event.
/// Returns `true` once the job has emitted a `JobCompleted` event.
pub fn completed(&self) -> bool {
self.events
.iter_all()
.rev()
.any(|event| matches!(event, JobEvent::JobCompleted | JobEvent::Cancelled))
}

/// Returns `true` if the job was cancelled.
pub fn cancelled(&self) -> bool {
self.events
.iter_all()
.rev()
.any(|event| matches!(event, JobEvent::Cancelled))
.any(|event| matches!(event, JobEvent::JobCompleted))
}

/// Determine the terminal state of a job, if it has reached one.
///
/// - `Cancelled` if a `Cancelled` event exists
/// - `Errored` if `JobCompleted` exists and the last execution event before it was
/// `ExecutionErrored`
/// - `Completed` if `JobCompleted` exists (normal completion)
/// - `None` if the job has not reached a terminal state
pub fn terminal_state(&self) -> Option<JobTerminalState> {
let mut rev = self.events.iter_all().rev();
match rev.next()? {
JobEvent::Cancelled => Some(JobTerminalState::Cancelled),
JobEvent::JobCompleted => match rev.next() {
Some(JobEvent::ExecutionErrored { .. }) => Some(JobTerminalState::Errored),
_ => Some(JobTerminalState::Completed),
Expand Down Expand Up @@ -350,14 +337,6 @@ impl Job {
self.events.push(JobEvent::JobCompleted);
}

pub(crate) fn cancel(&mut self) -> es_entity::Idempotent<()> {
if self.completed() {
return es_entity::Idempotent::AlreadyApplied;
}
self.events.push(JobEvent::Cancelled);
es_entity::Idempotent::Executed(())
}

pub(super) fn schedule_retry(
&mut self,
error: String,
Expand Down Expand Up @@ -461,7 +440,6 @@ impl TryFromEvents<JobEvent> for Job {
JobEvent::ExecutionErrored { .. } => {}
JobEvent::ResultUpdated { .. } => {}
JobEvent::JobCompleted => {}
JobEvent::Cancelled => {}
JobEvent::AttemptCounterReset => {}
}
}
Expand Down
4 changes: 0 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ pub enum JobError {
DuplicateId(Option<String>),
#[error("JobError - DuplicateUniqueJobType: {0:?}")]
DuplicateUniqueJobType(Option<String>),
#[error(
"JobError - CannotCancelJob: job is not in pending state (may be running or already completed)"
)]
CannotCancelJob,
#[error("JobError - Config: {0}")]
Config(String),
#[error("JobError - Migration: {0}")]
Expand Down
34 changes: 2 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ mod tracker;

pub mod error;

use es_entity::AtomicOperation;
use tracing::instrument;

use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -497,42 +496,13 @@ impl Jobs {
Ok(self.repo.find_by_id(id).await?)
}

/// Cancel a pending job, removing it from the execution queue.
///
/// This operation is idempotent — calling it on an already cancelled or
/// completed job is a no-op. If the job exists but is currently running
/// (not pending), returns [`JobError::CannotCancelJob`].
#[instrument(name = "job.cancel_job", skip(self))]
pub async fn cancel_job(&self, id: JobId) -> Result<(), JobError> {
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let mut job = self.repo.find_by_id(id).await?;

if job.cancel().did_execute() {
let result = sqlx::query!(
r#"DELETE FROM job_executions WHERE id = $1 AND state = 'pending'"#,
id as JobId,
)
.execute(op.as_executor())
.await?;

if result.rows_affected() == 0 {
return Err(JobError::CannotCancelJob);
}

self.repo.update_in_op(&mut op, &mut job).await?;
op.commit().await?;
}

Ok(())
}

/// Returns a reference to the clock used by this job service.
pub fn clock(&self) -> &ClockHandle {
&self.clock
}

/// Block until the given job reaches a terminal state (completed, errored, or
/// cancelled) and return the outcome together with any result value the
/// Block until the given job reaches a terminal state (completed or errored)
/// and return the outcome together with any result value the
/// runner attached via [`CurrentJob::set_result`].
///
/// When `timeout` is `Some(duration)`, the call returns
Expand Down
4 changes: 2 additions & 2 deletions src/notification_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl JobNotificationRouter {
match has_execution_row(&pool, job_id).await {
Ok(false) => {
// No execution row → job is terminal, but the caller
// needs to know *how* it ended (Completed vs Errored
// vs Cancelled), so we load the entity's event history.
// needs to know *how* it ended (Completed vs Errored),
// so we load the entity's event history.
// On transient DB failure the waiter is parked for the
// sweep to retry.
match load_terminal_state(&repo, job_id).await {
Expand Down
179 changes: 0 additions & 179 deletions tests/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,151 +651,6 @@ async fn test_bulk_spawn_empty_batch() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test]
async fn test_cancel_pending_job() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let config = JobSvcConfig::builder()
.pool(pool)
.build()
.expect("Failed to build JobsConfig");

let mut jobs = Jobs::init(config).await?;
let spawner = jobs.add_initializer(TestJobInitializer {
job_type: JobType::new("cancel-pending-job"),
});

// Spawn a job scheduled far in the future so it stays pending
let job_id = JobId::new();
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
spawner
.spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at)
.await?;

// Cancel the pending job
jobs.cancel_job(job_id).await?;

// Verify it's findable as a cancelled/completed entity
let found = jobs.find(job_id).await?;
assert!(found.completed(), "Cancelled job should be completed");
assert!(found.cancelled(), "Job should be marked as cancelled");

Ok(())
}

#[tokio::test]
async fn test_cancel_running_job_fails() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let config = JobSvcConfig::builder()
.pool(pool)
.build()
.expect("Failed to build JobsConfig");

let mut jobs = Jobs::init(config).await?;

let started = Arc::new(Mutex::new(Vec::<String>::new()));
let completed = Arc::new(Mutex::new(Vec::<String>::new()));
let release = Arc::new(Notify::new());

let spawner = jobs.add_initializer(QueueJobInitializer {
job_type: JobType::new("cancel-running-test"),
started: Arc::clone(&started),
completed: Arc::clone(&completed),
release: Arc::clone(&release),
});

jobs.start_poll()
.await
.expect("Failed to start job polling");

let job_id = JobId::new();
spawner
.spawn(job_id, QueueJobConfig { label: "X".into() })
.await?;

// Wait for the job to start running
let mut attempts = 0;
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
if !started.lock().await.is_empty() {
break;
}
attempts += 1;
assert!(attempts < 100, "Job never started");
}

// Cancel on a running job should fail
let result = jobs.cancel_job(job_id).await;
assert!(
matches!(result, Err(JobError::CannotCancelJob)),
"Cancelling a running job should return JobNotPending, got err: {:?}",
result.err(),
);

// Release the job so it completes normally
release.notify_one();

// Wait for completion
let mut attempts = 0;
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let job = jobs.find(job_id).await?;
if job.completed() {
break;
}
attempts += 1;
assert!(attempts < 100, "Job never completed");
}

Ok(())
}

#[tokio::test]
async fn test_cancel_already_completed_job_is_idempotent() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let config = JobSvcConfig::builder()
.pool(pool)
.build()
.expect("Failed to build JobsConfig");

let mut jobs = Jobs::init(config).await?;
let spawner = jobs.add_initializer(TestJobInitializer {
job_type: JobType::new("cancel-completed-job"),
});

jobs.start_poll()
.await
.expect("Failed to start job polling");

let job_id = JobId::new();
spawner
.spawn(job_id, TestJobConfig { delay_ms: 10 })
.await?;

// Wait for the job to complete
let mut attempts = 0;
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let job = jobs.find(job_id).await?;
if job.completed() {
break;
}
attempts += 1;
assert!(attempts < 100, "Job never completed");
}

// Cancel on an already completed job is a no-op
jobs.cancel_job(job_id).await?;

let job = jobs.find(job_id).await?;
assert!(
!job.cancelled(),
"Completed job should not be marked cancelled"
);
assert!(job.completed(), "Job should still be completed");

Ok(())
}

// -- await_completion tests --

/// An initializer whose runner always returns an error.
Expand Down Expand Up @@ -891,40 +746,6 @@ async fn test_await_completion_on_error() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test]
async fn test_await_completion_on_cancel() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let config = JobSvcConfig::builder()
.pool(pool)
.build()
.expect("Failed to build JobsConfig");

let mut jobs = Jobs::init(config).await?;
let spawner = jobs.add_initializer(TestJobInitializer {
job_type: JobType::new("await-cancel-job"),
});
jobs.start_poll().await?;

// Spawn a job scheduled far in the future so it stays pending
let job_id = JobId::new();
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
spawner
.spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at)
.await?;

let jobs_clone = jobs.clone();
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await });

// Give the waiter time to register before cancelling
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
jobs.cancel_job(job_id).await?;

let outcome = handle.await??;
assert_eq!(outcome.state(), JobTerminalState::Cancelled);

Ok(())
}

#[tokio::test]
async fn test_await_completion_already_completed() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
Expand Down
Loading