feat(job): add result passing and unified notification router#60
feat(job): add result passing and unified notification router#60bodymindarts merged 9 commits intomainfrom
Conversation
1fbff49 to
43843d7
Compare
src/current.rs
Outdated
| /// each chunk so that partial progress is preserved even on failure. | ||
| pub fn set_result<T: Serialize>(&self, result: &T) -> Result<(), JobError> { | ||
| let json = | ||
| serde_json::to_value(result).map_err(JobError::CouldNotSerializeExecutionState)?; |
There was a problem hiding this comment.
This reuses the execution state error variant for result serialization. Might be worth a dedicated CouldNotSerializeResult variant so the error message points to the
right place when debugging.
src/dispatcher.rs
Outdated
| id: JobId, | ||
| error: JobError, | ||
| attempt: u32, | ||
| result: Option<serde_json::Value>, |
There was a problem hiding this comment.
It seems a bit ugly to pass around this json like this. Would it be worth wrapping it with a newtype?
Allow job runners to attach a result value that callers receive through await_completion. The result flows from runner → OnceLock → entity event → JobCompletionResult without requiring any new migrations (backward-compatible serde(default) on the JobCompleted event variant). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The new await_completion tests registered pollers for the shared "test-job" type. Since nextest runs each test as a separate process sharing the same Postgres database, pollers from different processes competed for the same jobs. When a process exited before completing a stolen job (tokio runtime drops, cancelling the shutdown task), the job was left orphaned in 'running' state — causing test_cancel_already_completed_job_is_idempotent to time out waiting for its job to complete. Fix: give each await_completion test its own job type via a new AwaitTestJobInitializer so cross-process pollers never interfere. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ss poller interference Each test now gets its own unique job type via TestJobInitializer so that nextest parallel processes don't steal each other's jobs from the shared database. This extends the await_completion fix to all remaining tests that shared the "test-job" type, which caused flaky timeouts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace OnceLock with Mutex<Option> so callers can call set_result multiple times. The last value set before completion or error is persisted — enabling incremental progress tracking in batch jobs. Partial results are preserved on error so callers can see how far a job got before failing. - Remove ResultAlreadySet error variant (multiple calls now allowed) - Update dispatcher to use Mutex-based result holder - Add tests for incremental set_result and partial progress on error Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…t newtype Add a dedicated CouldNotSerializeResult error variant so result serialization failures in set_result point to the right place instead of reusing the ExecutionState variant. Introduce a JobResult newtype wrapping serde_json::Value to give semantic meaning to result payloads and prevent accidental mix-ups with other JSON values (config, execution state). Used throughout dispatcher, entity, current job, and public API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
d1185b6 to
a43f225
Compare
src/entity.rs
Outdated
| Self(value) | ||
| } | ||
|
|
||
| /// Return a reference to the inner JSON value. |
There was a problem hiding this comment.
| /// Return a reference to the inner JSON value. | |
| /// Consume the wrapper and return the inner JSON value. |
src/current.rs
Outdated
| pub fn set_result<T: Serialize>(&self, result: &T) -> Result<(), JobError> { | ||
| let json = serde_json::to_value(result).map_err(JobError::CouldNotSerializeResult)?; | ||
| let mut guard = self.result.lock().expect("result mutex poisoned"); | ||
| *guard = Some(JobResult::new(json)); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
This is a weird pattern, of setting the result in-memory on the CurrentJob. I think we could store it on the job_execution instead, or even on the entity. Preferably on the entity, IMO, if that does not add a bunch of complexity. Since I think the job_execution is also a bit hacky, and we should move towards making the entity the one source of truth
There was a problem hiding this comment.
Probably the job_execution is most practical for now
…g in memory Replace the in-memory Arc<Mutex<Option<JobResult>>> pattern with direct database persistence via the repo. CurrentJob now holds an Arc<JobRepo> and set_result is async — each call loads the job entity, pushes a ResultUpdated event, and persists immediately. This ensures partial progress is durable even if the process crashes mid-execution. Key changes: - Add JobEvent::ResultUpdated variant; Job::result() scans for it - Remove result param from complete_job, error_job, maybe_schedule_retry - CurrentJob gains repo handle; set_result and set_result_in_op are async - Dispatcher no longer extracts results from an Arc<Mutex> after run() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
src/current.rs
Outdated
| *guard = Some(JobResult::new(json)); | ||
| let job_result = JobResult::new(json); | ||
| let mut op = self.repo.begin_op_with_clock(&self.clock).await?; | ||
| let mut job = self.repo.find_by_id(self.id).await?; |
There was a problem hiding this comment.
use find_by_id_in_op when op is in scope.. same below in in_op version.
src/current.rs
Outdated
| let job_result = JobResult::new(json); | ||
| let mut op = self.repo.begin_op_with_clock(&self.clock).await?; | ||
| let mut job = self.repo.find_by_id(self.id).await?; | ||
| job.update_result(job_result); |
There was a problem hiding this comment.
update_result should return es_entity::Idempotent and we only do another DB roundtrip if did_execute().
- Fix into_inner doc comment to accurately describe consuming behavior - Use find_by_id_in_op in set_result_in_op to load entity within the existing transaction scope - Make update_result return Idempotent<()> to skip DB persist when the result value is unchanged Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
src/entity.rs
Outdated
| /// Returns [`Idempotent::AlreadyApplied`] when the new value is identical | ||
| /// to the current one, allowing callers to skip the DB round-trip. | ||
| pub(crate) fn update_result(&mut self, result: JobResult) -> es_entity::Idempotent<()> { | ||
| if let Some(existing) = self.result() |
There was a problem hiding this comment.
use idempotency_guard!
src/current.rs
Outdated
| /// preserved even on failure. | ||
| pub async fn set_result<T: Serialize>(&self, result: &T) -> Result<(), JobError> { | ||
| let json = serde_json::to_value(result).map_err(JobError::CouldNotSerializeResult)?; | ||
| let job_result = JobResult::new(json); |
There was a problem hiding this comment.
shouldn't this be job_result = JobResult::try_from(T)?
src/entity.rs
Outdated
| } | ||
|
|
||
| /// Returns the result wrapper, if any. | ||
| pub fn result(&self) -> Option<&JobResult> { |
There was a problem hiding this comment.
I don't think we need this... rename typed_result to result.
- Use idempotency_guard! macro in update_result instead of manual check - Add JobResult::try_from() for serialization instead of JobResult::new() - Remove raw result() accessor; rename typed_result to result on both Job and JobCompletionResult so callers only get typed results Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
HonestMajority
left a comment
There was a problem hiding this comment.
Now it looks great!
Summary
CurrentJob::set_result()that callers receive throughJobs::await_completion()OnceLock→ entity event →JobCompletionResult(lock-free, write-once)Key changes
CurrentJob::set_result<T: Serialize>()— runners call this to attach a resultJobCompletionResult— new return type fromawait_completion()carrying both terminal state and optional resultJobEvent::JobCompleted { result }— backward-compatible (serde(default)) result storage in entity events; no new migration neededJobError::ResultAlreadySet— returned ifset_resultis called more than onceErr, it's still captured)Test plan
test_await_completion_returns_result— runner sets result, caller receives typed valuetest_await_completion_returns_partial_result_on_error— runner sets result then errors, caller gets both errored state and partial resulttest_await_completion_no_result— runner doesn't set result, caller getsNoneJobCompletionResultreturn typenix flake checkpasses (fmt, clippy, deny, audit)🤖 Generated with Claude Code