Skip to content

Commit c2d072e

Browse files
feat(await): add optional timeout and poll_completion to await_completion (#62)
* feat(await): add optional timeout to await_completion Add an optional `timeout: Option<Duration>` parameter to `await_completion` so callers can limit how long they wait for a job to reach a terminal state. When the timeout elapses, a new `JobError::TimedOut` is returned. Passing `None` preserves the original indefinite-wait behaviour. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat(poll): add non-blocking poll_completion method Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 069332e commit c2d072e

3 files changed

Lines changed: 127 additions & 6 deletions

File tree

src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ pub enum JobError {
4949
"JobError - AwaitCompletionShutdown: notification channel closed while awaiting job {0}"
5050
)]
5151
AwaitCompletionShutdown(JobId),
52+
#[error(
53+
"JobError - TimedOut: job {0} did not reach terminal state within the specified timeout"
54+
)]
55+
TimedOut(JobId),
5256
}
5357

5458
impl From<Box<dyn std::error::Error>> for JobError {

src/lib.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ use es_entity::AtomicOperation;
225225
use tracing::instrument;
226226

227227
use std::sync::{Arc, Mutex};
228+
use std::time::Duration;
228229

229230
pub use config::*;
230231
pub use current::*;
@@ -533,18 +534,44 @@ impl Jobs {
533534
/// Block until the given job reaches a terminal state (completed, errored, or
534535
/// cancelled) and return the outcome.
535536
///
537+
/// When `timeout` is `Some(duration)`, the call returns
538+
/// [`JobError::TimedOut`] if the job has not reached a terminal state
539+
/// within the specified duration. Pass `None` to wait indefinitely.
540+
///
536541
/// # Errors
537542
///
538543
/// Returns [`JobError::Find`] if the job does not exist.
544+
/// Returns [`JobError::TimedOut`] if the timeout elapses before the job
545+
/// reaches a terminal state.
539546
/// Returns [`JobError::AwaitCompletionShutdown`] if the notification channel is
540547
/// dropped (e.g., during shutdown) before delivering the terminal state.
541548
#[instrument(name = "job.await_completion", skip(self))]
542-
pub async fn await_completion(&self, id: JobId) -> Result<JobTerminalState, JobError> {
549+
pub async fn await_completion(
550+
&self,
551+
id: JobId,
552+
timeout: Option<Duration>,
553+
) -> Result<JobTerminalState, JobError> {
543554
// Fail fast if the job doesn't exist — avoids a 5-minute silent hang
544555
// in the waiter manager for a JobId that will never resolve.
545556
self.find(id).await?;
546557
let rx = self.router.wait_for_terminal(id);
547-
rx.await.map_err(|_| JobError::AwaitCompletionShutdown(id))
558+
match timeout {
559+
Some(duration) => tokio::time::timeout(duration, rx)
560+
.await
561+
.map_err(|_| JobError::TimedOut(id))?
562+
.map_err(|_| JobError::AwaitCompletionShutdown(id)),
563+
None => rx.await.map_err(|_| JobError::AwaitCompletionShutdown(id)),
564+
}
565+
}
566+
567+
/// Non-blocking check for job completion.
568+
///
569+
/// Returns the terminal state immediately if the job has reached one,
570+
/// or `None` if it is still running.
571+
#[instrument(name = "job.poll_completion", skip(self))]
572+
pub async fn poll_completion(&self, id: JobId) -> Result<Option<JobTerminalState>, JobError> {
573+
let job = self.find(id).await?;
574+
Ok(job.terminal_state())
548575
}
549576

550577
/// Gracefully shut down the job poller.

tests/job.rs

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use job::{
88
};
99
use serde::{Deserialize, Serialize};
1010
use std::sync::Arc;
11+
use std::time::Duration;
1112
use tokio::sync::{Mutex, Notify};
1213

1314
#[derive(Debug, Serialize, Deserialize)]
@@ -857,7 +858,7 @@ async fn test_await_completion_on_success() -> anyhow::Result<()> {
857858
.await?;
858859

859860
let jobs_clone = jobs.clone();
860-
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await });
861+
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await });
861862

862863
let state = handle.await??;
863864
assert_eq!(state, JobTerminalState::Completed);
@@ -881,7 +882,7 @@ async fn test_await_completion_on_error() -> anyhow::Result<()> {
881882
spawner.spawn(job_id, FailingJobConfig).await?;
882883

883884
let jobs_clone = jobs.clone();
884-
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await });
885+
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await });
885886

886887
let state = handle.await??;
887888
assert_eq!(state, JobTerminalState::Errored);
@@ -911,7 +912,7 @@ async fn test_await_completion_on_cancel() -> anyhow::Result<()> {
911912
.await?;
912913

913914
let jobs_clone = jobs.clone();
914-
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id).await });
915+
let handle = tokio::spawn(async move { jobs_clone.await_completion(job_id, None).await });
915916

916917
// Give the waiter time to register before cancelling
917918
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
@@ -955,8 +956,97 @@ async fn test_await_completion_already_completed() -> anyhow::Result<()> {
955956
}
956957

957958
// Now call await_completion — should return immediately
958-
let state = jobs.await_completion(job_id).await?;
959+
let state = jobs.await_completion(job_id, None).await?;
959960
assert_eq!(state, JobTerminalState::Completed);
960961

961962
Ok(())
962963
}
964+
965+
#[tokio::test]
966+
async fn test_poll_completion() -> anyhow::Result<()> {
967+
let pool = helpers::init_pool().await?;
968+
let config = JobSvcConfig::builder()
969+
.pool(pool)
970+
.build()
971+
.expect("Failed to build JobsConfig");
972+
973+
let mut jobs = Jobs::init(config).await?;
974+
let spawner = jobs.add_initializer(TestJobInitializer {
975+
job_type: JobType::new("poll-completion-job"),
976+
});
977+
jobs.start_poll().await?;
978+
979+
// Spawn a job scheduled far in the future so it stays pending
980+
let job_id = JobId::new();
981+
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
982+
spawner
983+
.spawn_at(job_id, TestJobConfig { delay_ms: 10 }, schedule_at)
984+
.await?;
985+
986+
// Poll immediately — job hasn't completed yet
987+
let state = jobs.poll_completion(job_id).await?;
988+
assert_eq!(state, None, "Pending job should return None");
989+
990+
// Now spawn a quick job that will complete fast
991+
let quick_id = JobId::new();
992+
spawner
993+
.spawn(quick_id, TestJobConfig { delay_ms: 10 })
994+
.await?;
995+
996+
// Wait for the quick job to finish
997+
let mut attempts = 0;
998+
loop {
999+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1000+
let job = jobs.find(quick_id).await?;
1001+
if job.completed() {
1002+
break;
1003+
}
1004+
attempts += 1;
1005+
assert!(attempts < 100, "Quick job never completed");
1006+
}
1007+
1008+
// Poll the completed job — should return terminal state
1009+
let state = jobs.poll_completion(quick_id).await?;
1010+
assert_eq!(
1011+
state,
1012+
Some(JobTerminalState::Completed),
1013+
"Completed job should return Some(Completed)"
1014+
);
1015+
1016+
Ok(())
1017+
}
1018+
1019+
#[tokio::test]
1020+
async fn test_await_completion_timeout() -> anyhow::Result<()> {
1021+
let pool = helpers::init_pool().await?;
1022+
let config = JobSvcConfig::builder()
1023+
.pool(pool)
1024+
.build()
1025+
.expect("Failed to build JobsConfig");
1026+
1027+
let mut jobs = Jobs::init(config).await?;
1028+
let spawner = jobs.add_initializer(TestJobInitializer {
1029+
job_type: JobType::new("await-timeout-job"),
1030+
});
1031+
jobs.start_poll().await?;
1032+
1033+
// Spawn a job scheduled far in the future so it never completes during the test
1034+
let job_id = JobId::new();
1035+
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
1036+
spawner
1037+
.spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at)
1038+
.await?;
1039+
1040+
// Call await_completion with a short timeout
1041+
let result = jobs
1042+
.await_completion(job_id, Some(Duration::from_millis(200)))
1043+
.await;
1044+
1045+
assert!(
1046+
matches!(result, Err(JobError::TimedOut(id)) if id == job_id),
1047+
"Expected TimedOut error, got: {:?}",
1048+
result,
1049+
);
1050+
1051+
Ok(())
1052+
}

0 commit comments

Comments
 (0)