feat(job): batch await and completion result helpers#73
Merged
HonestMajority merged 4 commits intomainfrom Mar 31, 2026
Merged
Conversation
Add CurrentJob::await_jobs() for shutdown-aware batch awaiting, Jobs::await_completions() for batch awaiting with timeout support, JobCompletionResults extension trait with failed_count/all_succeeded, and is_completed/is_errored convenience methods on JobCompletionResult. These eliminate five identical copies of await_job_completions and four identical failed_count helpers duplicated across lana-bank consumers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7ddc0e5 to
3f8c73f
Compare
The batch await primitive belongs on the Jobs service layer only. Consumers that need shutdown awareness can compose Jobs::await_completions with CurrentJob::shutdown_requested via tokio::select! themselves. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Match the documentation style of await_completion: describe the behaviour, link to related methods, and list all error variants. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Batch timeout error misleadingly blames first job ID
- I added a batch-specific timeout error (
JobError::BatchTimedOut(Vec<JobId>)) and updatedawait_completionsto return it so timeout diagnostics no longer misattribute the first job ID.
- I added a batch-specific timeout error (
Or push these changes by commenting:
@cursor push 867f920fa7
Preview (867f920fa7)
diff --git a/src/error.rs b/src/error.rs
--- a/src/error.rs
+++ b/src/error.rs
@@ -51,6 +51,10 @@
"JobError - TimedOut: job {0} did not reach terminal state within the specified timeout"
)]
TimedOut(JobId),
+ #[error(
+ "JobError - BatchTimedOut: jobs {0:?} did not all reach terminal state within the specified timeout"
+ )]
+ BatchTimedOut(Vec<JobId>),
}
impl From<Box<dyn std::error::Error>> for JobError {
diff --git a/src/lib.rs b/src/lib.rs
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -551,16 +551,16 @@
/// have finished.
///
/// When `timeout` is `Some(duration)`, the call returns
- /// [`JobError::TimedOut`] if the batch has not fully resolved within the
- /// specified duration. Pass `None` to wait indefinitely.
+ /// [`JobError::BatchTimedOut`] if the batch has not fully resolved within
+ /// the specified duration. Pass `None` to wait indefinitely.
///
/// An empty `ids` slice returns an empty `Vec` immediately.
///
/// # Errors
///
/// Returns [`JobError::Find`] if any job in the batch does not exist.
- /// Returns [`JobError::TimedOut`] if the timeout elapses before every job
- /// reaches a terminal state.
+ /// Returns [`JobError::BatchTimedOut`] if the timeout elapses before every
+ /// job reaches a terminal state.
/// Returns [`JobError::AwaitCompletionShutdown`] if the notification channel
/// is dropped (e.g., during shutdown) before all jobs have resolved.
#[instrument(name = "job.await_completions", skip(self))]
@@ -577,12 +577,9 @@
.map(|id| self.await_completion(*id, None))
.collect();
let results = match timeout {
- Some(duration) => {
- let first_id = ids[0];
- tokio::time::timeout(duration, futures::future::join_all(futs))
- .await
- .map_err(|_| JobError::TimedOut(first_id))?
- }
+ Some(duration) => tokio::time::timeout(duration, futures::future::join_all(futs))
+ .await
+ .map_err(|_| JobError::BatchTimedOut(ids.to_vec()))?,
None => futures::future::join_all(futs).await,
};
results.into_iter().collect()
diff --git a/tests/job.rs b/tests/job.rs
--- a/tests/job.rs
+++ b/tests/job.rs
@@ -1578,22 +1578,33 @@
});
jobs.start_poll().await?;
- // Schedule a job far in the future so it never completes
- let job_id = JobId::new();
+ // Spawn one quick job that should complete within timeout
+ let quick_id = JobId::new();
+ spawner
+ .spawn(quick_id, TestJobConfig { delay_ms: 10 })
+ .await?;
+
+ // Schedule another job far in the future so the batch never fully completes
+ let stuck_id = JobId::new();
let schedule_at = chrono::Utc::now() + chrono::Duration::hours(24);
spawner
- .spawn_at(job_id, TestJobConfig { delay_ms: 50 }, schedule_at)
+ .spawn_at(stuck_id, TestJobConfig { delay_ms: 50 }, schedule_at)
.await?;
+ let ids = vec![quick_id, stuck_id];
let result = jobs
- .await_completions(&[job_id], Some(Duration::from_millis(200)))
+ .await_completions(&ids, Some(Duration::from_secs(1)))
.await;
- assert!(
- matches!(result, Err(JobError::TimedOut(_))),
- "Expected TimedOut error, got: {:?}",
- result,
- );
+ match result {
+ Err(JobError::BatchTimedOut(timed_out_ids)) => {
+ assert_eq!(
+ timed_out_ids, ids,
+ "Batch timeout should report all batch IDs"
+ );
+ }
+ other => panic!("Expected BatchTimedOut error, got: {:?}", other),
+ }
Ok(())
}This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
Move terminal-state and result types out of entity.rs into outcome.rs and rename for clarity: - JobResult → JobReturnValue - JobCompletionResult → JobOutcome - JobCompletionResults → JobOutcomes - ResultUpdated event → ReturnValueUpdated - raw_result() → raw_return_value() - update_result() → update_return_value() JobTerminalState stays as-is. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
bodymindarts
approved these changes
Mar 31, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Summary
Adds batch-level primitives to the job crate so consumers don't need to roll their own when coordinating fan-out workflows (spawn N child jobs, await all, inspect results).
Jobs::await_completions(ids, timeout)— batch counterpart ofawait_completion. Awaits multiple jobs concurrently with an optional timeout covering the entire batch.JobCompletionResultstrait — extension trait onVec<JobCompletionResult>and[JobCompletionResult]providingfailed_count()andall_succeeded(). Replaces identical free-function copies scattered across lana-bank consumers.JobCompletionResult::is_completed()/is_errored()— convenience predicates used by the trait above.Context
lana-bank's EOD pipeline has 4–5 coordinator jobs that all fan out child jobs and then await them. Each one carried its own copy of
await_job_completionsandfailed_count. This PR moves the reusable parts into the job crate. Shutdown-awaretokio::select!remains at the consumer level since that's an application concern.🤖 Generated with Claude Code
Note
Medium Risk
Introduces new public APIs and renames existing completion/result types (
JobCompletionResult/JobResult->JobOutcome/JobReturnValue), which is a breaking change for consumers and could affect completion/result handling semantics if migrated incorrectly.Overview
Adds a batch API
Jobs::await_completions(ids, timeout)that awaits multiple jobs concurrently under a single optional timeout and returns aVec<JobOutcome>.Refactors completion/result types into a new
outcomemodule:JobResult/JobCompletionResultbecomeJobReturnValue/JobOutcome, job events renameResultUpdated->ReturnValueUpdated, andawait_completionnow returnsJobOutcome. Adds aJobOutcomesextension trait (forVec<JobOutcome>and slices) withfailed_count()/all_succeeded()plus convenience predicates onJobOutcome.Updates exports and tests to use the new types and validates batch waiting, empty batches, timeouts, and the new outcome helpers.
Written by Cursor Bugbot for commit a4753a8. This will update automatically on new commits. Configure here.