Skip to content

Commit 93540fc

Browse files
bodymindartsclaude
andcommitted
refactor(job): remove cancel_job API
Consumers handle cancellation at their own layer, so the crate-level cancel_job method, Cancelled event variant, CannotCancelJob error, and related tests are no longer needed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 63f3f3b commit 93540fc

5 files changed

Lines changed: 6 additions & 241 deletions

File tree

src/entity.rs

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ pub enum JobTerminalState {
7878
Completed,
7979
/// The job exhausted its retries and was marked as errored.
8080
Errored,
81-
/// The job was explicitly cancelled before completion.
82-
Cancelled,
8381
}
8482

8583
#[derive(Clone, Eq, Hash, PartialEq, Debug, Serialize, Deserialize, sqlx::Type)]
@@ -143,7 +141,6 @@ pub enum JobEvent {
143141
result: JobResult,
144142
},
145143
JobCompleted,
146-
Cancelled,
147144
AttemptCounterReset,
148145
}
149146

@@ -251,33 +248,23 @@ impl Job {
251248
serde_json::from_value(self.config.clone())
252249
}
253250

254-
/// Returns `true` once the job has emitted a `JobCompleted` or `Cancelled` event.
251+
/// Returns `true` once the job has emitted a `JobCompleted` event.
255252
pub fn completed(&self) -> bool {
256253
self.events
257254
.iter_all()
258255
.rev()
259-
.any(|event| matches!(event, JobEvent::JobCompleted | JobEvent::Cancelled))
260-
}
261-
262-
/// Returns `true` if the job was cancelled.
263-
pub fn cancelled(&self) -> bool {
264-
self.events
265-
.iter_all()
266-
.rev()
267-
.any(|event| matches!(event, JobEvent::Cancelled))
256+
.any(|event| matches!(event, JobEvent::JobCompleted))
268257
}
269258

270259
/// Determine the terminal state of a job, if it has reached one.
271260
///
272-
/// - `Cancelled` if a `Cancelled` event exists
273261
/// - `Errored` if `JobCompleted` exists and the last execution event before it was
274262
/// `ExecutionErrored`
275263
/// - `Completed` if `JobCompleted` exists (normal completion)
276264
/// - `None` if the job has not reached a terminal state
277265
pub fn terminal_state(&self) -> Option<JobTerminalState> {
278266
let mut rev = self.events.iter_all().rev();
279267
match rev.next()? {
280-
JobEvent::Cancelled => Some(JobTerminalState::Cancelled),
281268
JobEvent::JobCompleted => match rev.next() {
282269
Some(JobEvent::ExecutionErrored { .. }) => Some(JobTerminalState::Errored),
283270
_ => Some(JobTerminalState::Completed),
@@ -350,14 +337,6 @@ impl Job {
350337
self.events.push(JobEvent::JobCompleted);
351338
}
352339

353-
pub(crate) fn cancel(&mut self) -> es_entity::Idempotent<()> {
354-
if self.completed() {
355-
return es_entity::Idempotent::AlreadyApplied;
356-
}
357-
self.events.push(JobEvent::Cancelled);
358-
es_entity::Idempotent::Executed(())
359-
}
360-
361340
pub(super) fn schedule_retry(
362341
&mut self,
363342
error: String,
@@ -461,7 +440,6 @@ impl TryFromEvents<JobEvent> for Job {
461440
JobEvent::ExecutionErrored { .. } => {}
462441
JobEvent::ResultUpdated { .. } => {}
463442
JobEvent::JobCompleted => {}
464-
JobEvent::Cancelled => {}
465443
JobEvent::AttemptCounterReset => {}
466444
}
467445
}

src/error.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ pub enum JobError {
3939
DuplicateId(Option<String>),
4040
#[error("JobError - DuplicateUniqueJobType: {0:?}")]
4141
DuplicateUniqueJobType(Option<String>),
42-
#[error(
43-
"JobError - CannotCancelJob: job is not in pending state (may be running or already completed)"
44-
)]
45-
CannotCancelJob,
4642
#[error("JobError - Config: {0}")]
4743
Config(String),
4844
#[error("JobError - Migration: {0}")]

src/lib.rs

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,6 @@ mod tracker;
221221

222222
pub mod error;
223223

224-
use es_entity::AtomicOperation;
225224
use tracing::instrument;
226225

227226
use std::sync::{Arc, Mutex};
@@ -497,42 +496,13 @@ impl Jobs {
497496
Ok(self.repo.find_by_id(id).await?)
498497
}
499498

500-
/// Cancel a pending job, removing it from the execution queue.
501-
///
502-
/// This operation is idempotent — calling it on an already cancelled or
503-
/// completed job is a no-op. If the job exists but is currently running
504-
/// (not pending), returns [`JobError::CannotCancelJob`].
505-
#[instrument(name = "job.cancel_job", skip(self))]
506-
pub async fn cancel_job(&self, id: JobId) -> Result<(), JobError> {
507-
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
508-
let mut job = self.repo.find_by_id(id).await?;
509-
510-
if job.cancel().did_execute() {
511-
let result = sqlx::query!(
512-
r#"DELETE FROM job_executions WHERE id = $1 AND state = 'pending'"#,
513-
id as JobId,
514-
)
515-
.execute(op.as_executor())
516-
.await?;
517-
518-
if result.rows_affected() == 0 {
519-
return Err(JobError::CannotCancelJob);
520-
}
521-
522-
self.repo.update_in_op(&mut op, &mut job).await?;
523-
op.commit().await?;
524-
}
525-
526-
Ok(())
527-
}
528-
529499
/// Returns a reference to the clock used by this job service.
530500
pub fn clock(&self) -> &ClockHandle {
531501
&self.clock
532502
}
533503

534-
/// Block until the given job reaches a terminal state (completed, errored, or
535-
/// cancelled) and return the outcome together with any result value the
504+
/// Block until the given job reaches a terminal state (completed or errored)
505+
/// and return the outcome together with any result value the
536506
/// runner attached via [`CurrentJob::set_result`].
537507
///
538508
/// When `timeout` is `Some(duration)`, the call returns

src/notification_router.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ impl JobNotificationRouter {
134134
match has_execution_row(&pool, job_id).await {
135135
Ok(false) => {
136136
// No execution row → job is terminal, but the caller
137-
// needs to know *how* it ended (Completed vs Errored
138-
// vs Cancelled), so we load the entity's event history.
137+
// needs to know *how* it ended (Completed vs Errored),
138+
// so we load the entity's event history.
139139
// On transient DB failure the waiter is parked for the
140140
// sweep to retry.
141141
match load_terminal_state(&repo, job_id).await {

tests/job.rs

Lines changed: 0 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -651,151 +651,6 @@ async fn test_bulk_spawn_empty_batch() -> anyhow::Result<()> {
651651
Ok(())
652652
}
653653

654-
#[tokio::test]
655-
async fn test_cancel_pending_job() -> anyhow::Result<()> {
656-
let pool = helpers::init_pool().await?;
657-
let config = JobSvcConfig::builder()
658-
.pool(pool)
659-
.build()
660-
.expect("Failed to build JobsConfig");
661-
662-
let mut jobs = Jobs::init(config).await?;
663-
let spawner = jobs.add_initializer(TestJobInitializer {
664-
job_type: JobType::new("cancel-pending-job"),
665-
});
666-
667-
// Spawn a job scheduled far in the future so it stays pending
668-
let job_id = JobId::new();
669-
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
670-
spawner
671-
.spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at)
672-
.await?;
673-
674-
// Cancel the pending job
675-
jobs.cancel_job(job_id).await?;
676-
677-
// Verify it's findable as a cancelled/completed entity
678-
let found = jobs.find(job_id).await?;
679-
assert!(found.completed(), "Cancelled job should be completed");
680-
assert!(found.cancelled(), "Job should be marked as cancelled");
681-
682-
Ok(())
683-
}
684-
685-
#[tokio::test]
686-
async fn test_cancel_running_job_fails() -> anyhow::Result<()> {
687-
let pool = helpers::init_pool().await?;
688-
let config = JobSvcConfig::builder()
689-
.pool(pool)
690-
.build()
691-
.expect("Failed to build JobsConfig");
692-
693-
let mut jobs = Jobs::init(config).await?;
694-
695-
let started = Arc::new(Mutex::new(Vec::<String>::new()));
696-
let completed = Arc::new(Mutex::new(Vec::<String>::new()));
697-
let release = Arc::new(Notify::new());
698-
699-
let spawner = jobs.add_initializer(QueueJobInitializer {
700-
job_type: JobType::new("cancel-running-test"),
701-
started: Arc::clone(&started),
702-
completed: Arc::clone(&completed),
703-
release: Arc::clone(&release),
704-
});
705-
706-
jobs.start_poll()
707-
.await
708-
.expect("Failed to start job polling");
709-
710-
let job_id = JobId::new();
711-
spawner
712-
.spawn(job_id, QueueJobConfig { label: "X".into() })
713-
.await?;
714-
715-
// Wait for the job to start running
716-
let mut attempts = 0;
717-
loop {
718-
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
719-
if !started.lock().await.is_empty() {
720-
break;
721-
}
722-
attempts += 1;
723-
assert!(attempts < 100, "Job never started");
724-
}
725-
726-
// Cancel on a running job should fail
727-
let result = jobs.cancel_job(job_id).await;
728-
assert!(
729-
matches!(result, Err(JobError::CannotCancelJob)),
730-
"Cancelling a running job should return JobNotPending, got err: {:?}",
731-
result.err(),
732-
);
733-
734-
// Release the job so it completes normally
735-
release.notify_one();
736-
737-
// Wait for completion
738-
let mut attempts = 0;
739-
loop {
740-
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
741-
let job = jobs.find(job_id).await?;
742-
if job.completed() {
743-
break;
744-
}
745-
attempts += 1;
746-
assert!(attempts < 100, "Job never completed");
747-
}
748-
749-
Ok(())
750-
}
751-
752-
#[tokio::test]
753-
async fn test_cancel_already_completed_job_is_idempotent() -> anyhow::Result<()> {
754-
let pool = helpers::init_pool().await?;
755-
let config = JobSvcConfig::builder()
756-
.pool(pool)
757-
.build()
758-
.expect("Failed to build JobsConfig");
759-
760-
let mut jobs = Jobs::init(config).await?;
761-
let spawner = jobs.add_initializer(TestJobInitializer {
762-
job_type: JobType::new("cancel-completed-job"),
763-
});
764-
765-
jobs.start_poll()
766-
.await
767-
.expect("Failed to start job polling");
768-
769-
let job_id = JobId::new();
770-
spawner
771-
.spawn(job_id, TestJobConfig { delay_ms: 10 })
772-
.await?;
773-
774-
// Wait for the job to complete
775-
let mut attempts = 0;
776-
loop {
777-
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
778-
let job = jobs.find(job_id).await?;
779-
if job.completed() {
780-
break;
781-
}
782-
attempts += 1;
783-
assert!(attempts < 100, "Job never completed");
784-
}
785-
786-
// Cancel on an already completed job is a no-op
787-
jobs.cancel_job(job_id).await?;
788-
789-
let job = jobs.find(job_id).await?;
790-
assert!(
791-
!job.cancelled(),
792-
"Completed job should not be marked cancelled"
793-
);
794-
assert!(job.completed(), "Job should still be completed");
795-
796-
Ok(())
797-
}
798-
799654
// -- await_completion tests --
800655

801656
/// An initializer whose runner always returns an error.
@@ -891,40 +746,6 @@ async fn test_await_completion_on_error() -> anyhow::Result<()> {
891746
Ok(())
892747
}
893748

894-
#[tokio::test]
895-
async fn test_await_completion_on_cancel() -> anyhow::Result<()> {
896-
let pool = helpers::init_pool().await?;
897-
let config = JobSvcConfig::builder()
898-
.pool(pool)
899-
.build()
900-
.expect("Failed to build JobsConfig");
901-
902-
let mut jobs = Jobs::init(config).await?;
903-
let spawner = jobs.add_initializer(TestJobInitializer {
904-
job_type: JobType::new("await-cancel-job"),
905-
});
906-
jobs.start_poll().await?;
907-
908-
// Spawn a job scheduled far in the future so it stays pending
909-
let job_id = JobId::new();
910-
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
911-
spawner
912-
.spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at)
913-
.await?;
914-
915-
let jobs_clone = jobs.clone();
916-
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await });
917-
918-
// Give the waiter time to register before cancelling
919-
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
920-
jobs.cancel_job(job_id).await?;
921-
922-
let outcome = handle.await??;
923-
assert_eq!(outcome.state(), JobTerminalState::Cancelled);
924-
925-
Ok(())
926-
}
927-
928749
#[tokio::test]
929750
async fn test_await_completion_already_completed() -> anyhow::Result<()> {
930751
let pool = helpers::init_pool().await?;

0 commit comments

Comments
 (0)