Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ pub enum JobError {
"JobError - AwaitCompletionShutdown: notification channel closed while awaiting job {0}"
)]
AwaitCompletionShutdown(JobId),
#[error(
"JobError - TimedOut: job {0} did not reach terminal state within the specified timeout"
)]
TimedOut(JobId),
}

impl From<Box<dyn std::error::Error>> for JobError {
Expand Down
31 changes: 29 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ use es_entity::AtomicOperation;
use tracing::instrument;

use std::sync::{Arc, Mutex};
use std::time::Duration;

pub use config::*;
pub use current::*;
Expand Down Expand Up @@ -535,18 +536,44 @@ impl Jobs {
/// Block until the given job reaches a terminal state (completed, errored, or
/// cancelled) and return the outcome.
///
/// When `timeout` is `Some(duration)`, the call returns
/// [`JobError::TimedOut`] if the job has not reached a terminal state
/// within the specified duration. Pass `None` to wait indefinitely.
///
/// # Errors
///
/// Returns [`JobError::Find`] if the job does not exist.
/// Returns [`JobError::TimedOut`] if the timeout elapses before the job
/// reaches a terminal state.
/// Returns [`JobError::AwaitCompletionShutdown`] if the notification channel is
/// dropped (e.g., during shutdown) before delivering the terminal state.
#[instrument(name = "job.await_completion", skip(self))]
pub async fn await_completion(&self, id: JobId) -> Result<JobTerminalState, JobError> {
pub async fn await_completion(
&self,
id: JobId,
timeout: Option<Duration>,
) -> Result<JobTerminalState, JobError> {
// Fail fast if the job doesn't exist — avoids a 5-minute silent hang
// in the waiter manager for a JobId that will never resolve.
self.find(id).await?;
let rx = self.router.wait_for_terminal(id);
rx.await.map_err(|_| JobError::AwaitCompletionShutdown(id))
match timeout {
Some(duration) => tokio::time::timeout(duration, rx)
.await
.map_err(|_| JobError::TimedOut(id))?
.map_err(|_| JobError::AwaitCompletionShutdown(id)),
None => rx.await.map_err(|_| JobError::AwaitCompletionShutdown(id)),
}
}

/// Non-blocking check for job completion.
///
/// Returns the terminal state immediately if the job has reached one,
/// or `None` if it is still running.
#[instrument(name = "job.poll_completion", skip(self))]
pub async fn poll_completion(&self, id: JobId) -> Result<Option<JobTerminalState>, JobError> {
let job = self.find(id).await?;
Ok(job.terminal_state())
}

/// Gracefully shut down the job poller.
Expand Down
98 changes: 94 additions & 4 deletions tests/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use job::{
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, Notify};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -858,7 +859,7 @@ async fn test_await_completion_on_success() -> anyhow::Result<()> {
.await?;

let jobs_clone = jobs.clone();
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await });
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await });

let state = handle.await??;
assert_eq!(state, JobTerminalState::Completed);
Expand All @@ -882,7 +883,7 @@ async fn test_await_completion_on_error() -> anyhow::Result<()> {
spawner.spawn(job_id, FailingJobConfig).await?;

let jobs_clone = jobs.clone();
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await });
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await });

let state = handle.await??;
assert_eq!(state, JobTerminalState::Errored);
Expand Down Expand Up @@ -912,7 +913,7 @@ async fn test_await_completion_on_cancel() -> anyhow::Result<()> {
.await?;

let jobs_clone = jobs.clone();
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await });
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await });

// Give the waiter time to register before cancelling
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Expand Down Expand Up @@ -956,8 +957,97 @@ async fn test_await_completion_already_completed() -> anyhow::Result<()> {
}

// Now call await_completion — should return immediately
let state = jobs.await_completion(job_id).await?;
let state = jobs.await_completion(job_id, None).await?;
assert_eq!(state, JobTerminalState::Completed);

Ok(())
}

#[tokio::test]
async fn test_poll_completion() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let config = JobSvcConfig::builder()
.pool(pool)
.build()
.expect("Failed to build JobsConfig");

let mut jobs = Jobs::init(config).await?;
let spawner = jobs.add_initializer(TestJobInitializer {
job_type: JobType::new("poll-completion-job"),
});
jobs.start_poll().await?;

// Spawn a job scheduled far in the future so it stays pending
let job_id = JobId::new();
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
spawner
.spawn_at(job_id, TestJobConfig { delay_ms: 10 }, schedule_at)
.await?;

// Poll immediately — job hasn't completed yet
let state = jobs.poll_completion(job_id).await?;
assert_eq!(state, None, "Pending job should return None");

// Now spawn a quick job that will complete fast
let quick_id = JobId::new();
spawner
.spawn(quick_id, TestJobConfig { delay_ms: 10 })
.await?;

// Wait for the quick job to finish
let mut attempts = 0;
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let job = jobs.find(quick_id).await?;
if job.completed() {
break;
}
attempts += 1;
assert!(attempts < 100, "Quick job never completed");
}

// Poll the completed job — should return terminal state
let state = jobs.poll_completion(quick_id).await?;
assert_eq!(
state,
Some(JobTerminalState::Completed),
"Completed job should return Some(Completed)"
);

Ok(())
}

#[tokio::test]
async fn test_await_completion_timeout() -> anyhow::Result<()> {
let pool = helpers::init_pool().await?;
let config = JobSvcConfig::builder()
.pool(pool)
.build()
.expect("Failed to build JobsConfig");

let mut jobs = Jobs::init(config).await?;
let spawner = jobs.add_initializer(TestJobInitializer {
job_type: JobType::new("await-timeout-job"),
});
jobs.start_poll().await?;

// Spawn a job scheduled far in the future so it never completes during the test
let job_id = JobId::new();
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
spawner
.spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at)
.await?;

// Call await_completion with a short timeout
let result = jobs
.await_completion(job_id, Some(Duration::from_millis(200)))
.await;

assert!(
matches!(result, Err(JobError::TimedOut(id)) if id == job_id),
"Expected TimedOut error, got: {:?}",
result,
);

Ok(())
}
Loading