diff --git a/src/entity.rs b/src/entity.rs index 62f9993..3204d56 100644 --- a/src/entity.rs +++ b/src/entity.rs @@ -78,8 +78,6 @@ pub enum JobTerminalState { Completed, /// The job exhausted its retries and was marked as errored. Errored, - /// The job was explicitly cancelled before completion. - Cancelled, } #[derive(Clone, Eq, Hash, PartialEq, Debug, Serialize, Deserialize, sqlx::Type)] @@ -143,7 +141,6 @@ pub enum JobEvent { result: JobResult, }, JobCompleted, - Cancelled, AttemptCounterReset, } @@ -251,25 +248,16 @@ impl Job { serde_json::from_value(self.config.clone()) } - /// Returns `true` once the job has emitted a `JobCompleted` or `Cancelled` event. + /// Returns `true` once the job has emitted a `JobCompleted` event. pub fn completed(&self) -> bool { self.events .iter_all() .rev() - .any(|event| matches!(event, JobEvent::JobCompleted | JobEvent::Cancelled)) - } - - /// Returns `true` if the job was cancelled. - pub fn cancelled(&self) -> bool { - self.events - .iter_all() - .rev() - .any(|event| matches!(event, JobEvent::Cancelled)) + .any(|event| matches!(event, JobEvent::JobCompleted)) } /// Determine the terminal state of a job, if it has reached one. /// - /// - `Cancelled` if a `Cancelled` event exists /// - `Errored` if `JobCompleted` exists and the last execution event before it was /// `ExecutionErrored` /// - `Completed` if `JobCompleted` exists (normal completion) @@ -277,7 +265,6 @@ impl Job { pub fn terminal_state(&self) -> Option { let mut rev = self.events.iter_all().rev(); match rev.next()? { - JobEvent::Cancelled => Some(JobTerminalState::Cancelled), JobEvent::JobCompleted => match rev.next() { Some(JobEvent::ExecutionErrored { .. }) => Some(JobTerminalState::Errored), _ => Some(JobTerminalState::Completed), @@ -350,14 +337,6 @@ impl Job { self.events.push(JobEvent::JobCompleted); } - pub(crate) fn cancel(&mut self) -> es_entity::Idempotent<()> { - if self.completed() { - return es_entity::Idempotent::AlreadyApplied; - } - self.events.push(JobEvent::Cancelled); - es_entity::Idempotent::Executed(()) - } - pub(super) fn schedule_retry( &mut self, error: String, @@ -461,7 +440,6 @@ impl TryFromEvents for Job { JobEvent::ExecutionErrored { .. } => {} JobEvent::ResultUpdated { .. } => {} JobEvent::JobCompleted => {} - JobEvent::Cancelled => {} JobEvent::AttemptCounterReset => {} } } diff --git a/src/error.rs b/src/error.rs index 731dd6a..7c6739d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -39,10 +39,6 @@ pub enum JobError { DuplicateId(Option), #[error("JobError - DuplicateUniqueJobType: {0:?}")] DuplicateUniqueJobType(Option), - #[error( - "JobError - CannotCancelJob: job is not in pending state (may be running or already completed)" - )] - CannotCancelJob, #[error("JobError - Config: {0}")] Config(String), #[error("JobError - Migration: {0}")] diff --git a/src/lib.rs b/src/lib.rs index aa4fea4..aaf8e84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -221,7 +221,6 @@ mod tracker; pub mod error; -use es_entity::AtomicOperation; use tracing::instrument; use std::sync::{Arc, Mutex}; @@ -497,42 +496,13 @@ impl Jobs { Ok(self.repo.find_by_id(id).await?) } - /// Cancel a pending job, removing it from the execution queue. - /// - /// This operation is idempotent — calling it on an already cancelled or - /// completed job is a no-op. If the job exists but is currently running - /// (not pending), returns [`JobError::CannotCancelJob`]. - #[instrument(name = "job.cancel_job", skip(self))] - pub async fn cancel_job(&self, id: JobId) -> Result<(), JobError> { - let mut op = self.repo.begin_op_with_clock(&self.clock).await?; - let mut job = self.repo.find_by_id(id).await?; - - if job.cancel().did_execute() { - let result = sqlx::query!( - r#"DELETE FROM job_executions WHERE id = $1 AND state = 'pending'"#, - id as JobId, - ) - .execute(op.as_executor()) - .await?; - - if result.rows_affected() == 0 { - return Err(JobError::CannotCancelJob); - } - - self.repo.update_in_op(&mut op, &mut job).await?; - op.commit().await?; - } - - Ok(()) - } - /// Returns a reference to the clock used by this job service. pub fn clock(&self) -> &ClockHandle { &self.clock } - /// Block until the given job reaches a terminal state (completed, errored, or - /// cancelled) and return the outcome together with any result value the + /// Block until the given job reaches a terminal state (completed or errored) + /// and return the outcome together with any result value the /// runner attached via [`CurrentJob::set_result`]. /// /// When `timeout` is `Some(duration)`, the call returns diff --git a/src/notification_router.rs b/src/notification_router.rs index 2217cd5..6f4c14c 100644 --- a/src/notification_router.rs +++ b/src/notification_router.rs @@ -134,8 +134,8 @@ impl JobNotificationRouter { match has_execution_row(&pool, job_id).await { Ok(false) => { // No execution row → job is terminal, but the caller - // needs to know *how* it ended (Completed vs Errored - // vs Cancelled), so we load the entity's event history. + // needs to know *how* it ended (Completed vs Errored), + // so we load the entity's event history. // On transient DB failure the waiter is parked for the // sweep to retry. match load_terminal_state(&repo, job_id).await { diff --git a/tests/job.rs b/tests/job.rs index 1ee6ba4..4348c75 100644 --- a/tests/job.rs +++ b/tests/job.rs @@ -651,151 +651,6 @@ async fn test_bulk_spawn_empty_batch() -> anyhow::Result<()> { Ok(()) } -#[tokio::test] -async fn test_cancel_pending_job() -> anyhow::Result<()> { - let pool = helpers::init_pool().await?; - let config = JobSvcConfig::builder() - .pool(pool) - .build() - .expect("Failed to build JobsConfig"); - - let mut jobs = Jobs::init(config).await?; - let spawner = jobs.add_initializer(TestJobInitializer { - job_type: JobType::new("cancel-pending-job"), - }); - - // Spawn a job scheduled far in the future so it stays pending - let job_id = JobId::new(); - let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24); - spawner - .spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at) - .await?; - - // Cancel the pending job - jobs.cancel_job(job_id).await?; - - // Verify it's findable as a cancelled/completed entity - let found = jobs.find(job_id).await?; - assert!(found.completed(), "Cancelled job should be completed"); - assert!(found.cancelled(), "Job should be marked as cancelled"); - - Ok(()) -} - -#[tokio::test] -async fn test_cancel_running_job_fails() -> anyhow::Result<()> { - let pool = helpers::init_pool().await?; - let config = JobSvcConfig::builder() - .pool(pool) - .build() - .expect("Failed to build JobsConfig"); - - let mut jobs = Jobs::init(config).await?; - - let started = Arc::new(Mutex::new(Vec::::new())); - let completed = Arc::new(Mutex::new(Vec::::new())); - let release = Arc::new(Notify::new()); - - let spawner = jobs.add_initializer(QueueJobInitializer { - job_type: JobType::new("cancel-running-test"), - started: Arc::clone(&started), - completed: Arc::clone(&completed), - release: Arc::clone(&release), - }); - - jobs.start_poll() - .await - .expect("Failed to start job polling"); - - let job_id = JobId::new(); - spawner - .spawn(job_id, QueueJobConfig { label: "X".into() }) - .await?; - - // Wait for the job to start running - let mut attempts = 0; - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - if !started.lock().await.is_empty() { - break; - } - attempts += 1; - assert!(attempts < 100, "Job never started"); - } - - // Cancel on a running job should fail - let result = jobs.cancel_job(job_id).await; - assert!( - matches!(result, Err(JobError::CannotCancelJob)), - "Cancelling a running job should return JobNotPending, got err: {:?}", - result.err(), - ); - - // Release the job so it completes normally - release.notify_one(); - - // Wait for completion - let mut attempts = 0; - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - let job = jobs.find(job_id).await?; - if job.completed() { - break; - } - attempts += 1; - assert!(attempts < 100, "Job never completed"); - } - - Ok(()) -} - -#[tokio::test] -async fn test_cancel_already_completed_job_is_idempotent() -> anyhow::Result<()> { - let pool = helpers::init_pool().await?; - let config = JobSvcConfig::builder() - .pool(pool) - .build() - .expect("Failed to build JobsConfig"); - - let mut jobs = Jobs::init(config).await?; - let spawner = jobs.add_initializer(TestJobInitializer { - job_type: JobType::new("cancel-completed-job"), - }); - - jobs.start_poll() - .await - .expect("Failed to start job polling"); - - let job_id = JobId::new(); - spawner - .spawn(job_id, TestJobConfig { delay_ms: 10 }) - .await?; - - // Wait for the job to complete - let mut attempts = 0; - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - let job = jobs.find(job_id).await?; - if job.completed() { - break; - } - attempts += 1; - assert!(attempts < 100, "Job never completed"); - } - - // Cancel on an already completed job is a no-op - jobs.cancel_job(job_id).await?; - - let job = jobs.find(job_id).await?; - assert!( - !job.cancelled(), - "Completed job should not be marked cancelled" - ); - assert!(job.completed(), "Job should still be completed"); - - Ok(()) -} - // -- await_completion tests -- /// An initializer whose runner always returns an error. @@ -891,40 +746,6 @@ async fn test_await_completion_on_error() -> anyhow::Result<()> { Ok(()) } -#[tokio::test] -async fn test_await_completion_on_cancel() -> anyhow::Result<()> { - let pool = helpers::init_pool().await?; - let config = JobSvcConfig::builder() - .pool(pool) - .build() - .expect("Failed to build JobsConfig"); - - let mut jobs = Jobs::init(config).await?; - let spawner = jobs.add_initializer(TestJobInitializer { - job_type: JobType::new("await-cancel-job"), - }); - jobs.start_poll().await?; - - // Spawn a job scheduled far in the future so it stays pending - let job_id = JobId::new(); - let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24); - spawner - .spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at) - .await?; - - let jobs_clone = jobs.clone(); - let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await }); - - // Give the waiter time to register before cancelling - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - jobs.cancel_job(job_id).await?; - - let outcome = handle.await??; - assert_eq!(outcome.state(), JobTerminalState::Cancelled); - - Ok(()) -} - #[tokio::test] async fn test_await_completion_already_completed() -> anyhow::Result<()> { let pool = helpers::init_pool().await?;