diff --git a/src/error.rs b/src/error.rs index c982e05..8e408ab 100644 --- a/src/error.rs +++ b/src/error.rs @@ -49,6 +49,10 @@ pub enum JobError { "JobError - AwaitCompletionShutdown: notification channel closed while awaiting job {0}" )] AwaitCompletionShutdown(JobId), + #[error( + "JobError - TimedOut: job {0} did not reach terminal state within the specified timeout" + )] + TimedOut(JobId), } impl From> for JobError { diff --git a/src/lib.rs b/src/lib.rs index 8f073c0..a190d69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -225,6 +225,7 @@ use es_entity::AtomicOperation; use tracing::instrument; use std::sync::{Arc, Mutex}; +use std::time::Duration; pub use config::*; pub use current::*; @@ -535,18 +536,44 @@ impl Jobs { /// Block until the given job reaches a terminal state (completed, errored, or /// cancelled) and return the outcome. /// + /// When `timeout` is `Some(duration)`, the call returns + /// [`JobError::TimedOut`] if the job has not reached a terminal state + /// within the specified duration. Pass `None` to wait indefinitely. + /// /// # Errors /// /// Returns [`JobError::Find`] if the job does not exist. + /// Returns [`JobError::TimedOut`] if the timeout elapses before the job + /// reaches a terminal state. /// Returns [`JobError::AwaitCompletionShutdown`] if the notification channel is /// dropped (e.g., during shutdown) before delivering the terminal state. #[instrument(name = "job.await_completion", skip(self))] - pub async fn await_completion(&self, id: JobId) -> Result { + pub async fn await_completion( + &self, + id: JobId, + timeout: Option, + ) -> Result { // Fail fast if the job doesn't exist — avoids a 5-minute silent hang // in the waiter manager for a JobId that will never resolve. self.find(id).await?; let rx = self.router.wait_for_terminal(id); - rx.await.map_err(|_| JobError::AwaitCompletionShutdown(id)) + match timeout { + Some(duration) => tokio::time::timeout(duration, rx) + .await + .map_err(|_| JobError::TimedOut(id))? + .map_err(|_| JobError::AwaitCompletionShutdown(id)), + None => rx.await.map_err(|_| JobError::AwaitCompletionShutdown(id)), + } + } + + /// Non-blocking check for job completion. + /// + /// Returns the terminal state immediately if the job has reached one, + /// or `None` if it is still running. + #[instrument(name = "job.poll_completion", skip(self))] + pub async fn poll_completion(&self, id: JobId) -> Result, JobError> { + let job = self.find(id).await?; + Ok(job.terminal_state()) } /// Gracefully shut down the job poller. diff --git a/tests/job.rs b/tests/job.rs index 149bb77..1c34532 100644 --- a/tests/job.rs +++ b/tests/job.rs @@ -9,6 +9,7 @@ use job::{ }; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use std::time::Duration; use tokio::sync::{Mutex, Notify}; #[derive(Debug, Serialize, Deserialize)] @@ -858,7 +859,7 @@ async fn test_await_completion_on_success() -> anyhow::Result<()> { .await?; let jobs_clone = jobs.clone(); - let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await }); + let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await }); let state = handle.await??; assert_eq!(state, JobTerminalState::Completed); @@ -882,7 +883,7 @@ async fn test_await_completion_on_error() -> anyhow::Result<()> { spawner.spawn(job_id, FailingJobConfig).await?; let jobs_clone = jobs.clone(); - let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await }); + let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await }); let state = handle.await??; assert_eq!(state, JobTerminalState::Errored); @@ -912,7 +913,7 @@ async fn test_await_completion_on_cancel() -> anyhow::Result<()> { .await?; let jobs_clone = jobs.clone(); - let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await }); + let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await }); // Give the waiter time to register before cancelling tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; @@ -956,8 +957,97 @@ async fn test_await_completion_already_completed() -> anyhow::Result<()> { } // Now call await_completion — should return immediately - let state = jobs.await_completion(job_id).await?; + let state = jobs.await_completion(job_id, None).await?; assert_eq!(state, JobTerminalState::Completed); Ok(()) } + +#[tokio::test] +async fn test_poll_completion() -> anyhow::Result<()> { + let pool = helpers::init_pool().await?; + let config = JobSvcConfig::builder() + .pool(pool) + .build() + .expect("Failed to build JobsConfig"); + + let mut jobs = Jobs::init(config).await?; + let spawner = jobs.add_initializer(TestJobInitializer { + job_type: JobType::new("poll-completion-job"), + }); + jobs.start_poll().await?; + + // Spawn a job scheduled far in the future so it stays pending + let job_id = JobId::new(); + let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24); + spawner + .spawn_at(job_id, TestJobConfig { delay_ms: 10 }, schedule_at) + .await?; + + // Poll immediately — job hasn't completed yet + let state = jobs.poll_completion(job_id).await?; + assert_eq!(state, None, "Pending job should return None"); + + // Now spawn a quick job that will complete fast + let quick_id = JobId::new(); + spawner + .spawn(quick_id, TestJobConfig { delay_ms: 10 }) + .await?; + + // Wait for the quick job to finish + let mut attempts = 0; + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + let job = jobs.find(quick_id).await?; + if job.completed() { + break; + } + attempts += 1; + assert!(attempts < 100, "Quick job never completed"); + } + + // Poll the completed job — should return terminal state + let state = jobs.poll_completion(quick_id).await?; + assert_eq!( + state, + Some(JobTerminalState::Completed), + "Completed job should return Some(Completed)" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_await_completion_timeout() -> anyhow::Result<()> { + let pool = helpers::init_pool().await?; + let config = JobSvcConfig::builder() + .pool(pool) + .build() + .expect("Failed to build JobsConfig"); + + let mut jobs = Jobs::init(config).await?; + let spawner = jobs.add_initializer(TestJobInitializer { + job_type: JobType::new("await-timeout-job"), + }); + jobs.start_poll().await?; + + // Spawn a job scheduled far in the future so it never completes during the test + let job_id = JobId::new(); + let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24); + spawner + .spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at) + .await?; + + // Call await_completion with a short timeout + let result = jobs + .await_completion(job_id, Some(Duration::from_millis(200))) + .await; + + assert!( + matches!(result, Err(JobError::TimedOut(id)) if id == job_id), + "Expected TimedOut error, got: {:?}", + result, + ); + + Ok(()) +}