Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ target/
.env
.bacon-locations
.claude/settings.local.json
.mcp.json
48 changes: 47 additions & 1 deletion src/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use es_entity::clock::ClockHandle;
use serde::{Serialize, de::DeserializeOwned};
use sqlx::PgPool;

use super::{JobId, error::JobError};
use std::sync::Arc;

use super::{JobId, entity::JobResult, error::JobError, repo::JobRepo};

/// Context provided to a [`JobRunner`](crate::JobRunner) while a job is executing.
pub struct CurrentJob {
Expand All @@ -16,6 +18,7 @@ pub struct CurrentJob {
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Receiver<()>>,
>,
clock: ClockHandle,
repo: Arc<JobRepo>,
}

impl CurrentJob {
Expand All @@ -28,6 +31,7 @@ impl CurrentJob {
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Receiver<()>>,
>,
clock: ClockHandle,
repo: Arc<JobRepo>,
) -> Self {
Self {
id,
Expand All @@ -36,6 +40,7 @@ impl CurrentJob {
execution_state_json: execution_state,
shutdown_rx,
clock,
repo,
}
}

Expand Down Expand Up @@ -122,6 +127,47 @@ impl CurrentJob {
Ok(ret)
}

/// Attach or update the result value for this job execution.
///
/// The result is serialized to JSON and persisted to the database
/// immediately. It will be available to callers via
/// [`Jobs::await_completion`](crate::Jobs::await_completion). Each call
/// overwrites the previous value — the **last** value set is what callers
/// see. This allows incremental progress updates; for example, a batch job
/// can call `set_result` after each chunk so that partial progress is
/// preserved even on failure.
pub async fn set_result<T: Serialize>(&self, result: &T) -> Result<(), JobError> {
let json = serde_json::to_value(result).map_err(JobError::CouldNotSerializeResult)?;
let job_result = JobResult::new(json);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be job_result = JobResult::try_from(T)?

let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let mut job = self.repo.find_by_id_in_op(&mut op, self.id).await?;
if job.update_result(job_result).did_execute() {
self.repo.update_in_op(&mut op, &mut job).await?;
op.commit().await?;
}
Ok(())
}

/// Attach or update the result value as part of an existing database
/// operation.
///
/// This is the composable variant of [`set_result`](Self::set_result) —
/// callers provide an open atomic operation so the result write participates
/// in a larger transaction.
pub async fn set_result_in_op(
&self,
op: &mut impl es_entity::AtomicOperation,
result: &impl Serialize,
) -> Result<(), JobError> {
let json = serde_json::to_value(result).map_err(JobError::CouldNotSerializeResult)?;
let job_result = JobResult::new(json);
let mut job = self.repo.find_by_id_in_op(&mut *op, self.id).await?;
if job.update_result(job_result).did_execute() {
self.repo.update_in_op(op, &mut job).await?;
}
Ok(())
}

/// Wait for a shutdown signal. Returns `true` if shutdown was requested.
///
/// Job runners can use this to detect when the application is shutting down
Expand Down
1 change: 1 addition & 0 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl JobDispatcher {
polled_job.data_json,
shutdown_rx,
self.clock.clone(),
Arc::clone(&self.repo),
);
self.tracker.dispatch_job();
match Self::dispatch_job(self.runner.take().expect("runner"), current_job).await {
Expand Down
107 changes: 107 additions & 0 deletions src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,72 @@ use es_entity::{context::TracingContext, *};

use crate::{JobId, error::JobError};

/// Newtype wrapper around a raw JSON value representing the result produced by a
/// job runner via [`CurrentJob::set_result`](crate::CurrentJob::set_result).
///
/// Using a dedicated type instead of bare `serde_json::Value` gives call sites
/// semantic clarity and prevents accidental mix-ups with other JSON payloads
/// (config, execution state, etc.).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct JobResult(serde_json::Value);

impl JobResult {
/// Wrap a raw JSON value.
pub(crate) fn new(value: serde_json::Value) -> Self {
Self(value)
}

/// Consume the wrapper and return the inner JSON value.
pub fn into_inner(self) -> serde_json::Value {
self.0
}

/// Return a reference to the inner JSON value.
pub fn as_value(&self) -> &serde_json::Value {
&self.0
}

/// Deserialize the result into a typed struct.
pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
serde_json::from_value(self.0.clone())
}
}

/// Outcome returned by [`Jobs::await_completion`](crate::Jobs::await_completion),
/// carrying both the terminal state and an optional result value.
#[derive(Debug, Clone)]
pub struct JobCompletionResult {
state: JobTerminalState,
result: Option<JobResult>,
}

impl JobCompletionResult {
pub(crate) fn new(state: JobTerminalState, result: Option<JobResult>) -> Self {
Self { state, result }
}

/// The terminal state the job reached.
pub fn state(&self) -> JobTerminalState {
self.state
}

/// Returns the result wrapper, if any.
pub fn result(&self) -> Option<&JobResult> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this... rename typed_result to result.

self.result.as_ref()
}

/// Deserialize the result value into a typed struct.
pub fn typed_result<T: serde::de::DeserializeOwned>(
&self,
) -> Result<Option<T>, serde_json::Error> {
match &self.result {
Some(r) => r.deserialize().map(Some),
None => Ok(None),
}
}
}

/// Terminal outcome of a job lifecycle.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JobTerminalState {
Expand Down Expand Up @@ -79,6 +145,9 @@ pub enum JobEvent {
ExecutionErrored {
error: String,
},
ResultUpdated {
result: JobResult,
},
JobCompleted,
Cancelled,
AttemptCounterReset,
Expand Down Expand Up @@ -223,6 +292,29 @@ impl Job {
}
}

/// Returns the result value attached to this job, if any.
///
/// Scans for the latest `ResultUpdated` event (last write wins).
pub fn result(&self) -> Option<&JobResult> {
self.events.iter_all().rev().find_map(|event| {
if let JobEvent::ResultUpdated { result } = event {
Some(result)
} else {
None
}
})
}

/// Deserialize the result value into a typed struct.
pub fn typed_result<T: serde::de::DeserializeOwned>(
&self,
) -> Result<Option<T>, serde_json::Error> {
match self.result() {
Some(r) => r.deserialize().map(Some),
None => Ok(None),
}
}

pub(crate) fn inject_tracing_parent(&self) {
if let JobEvent::Initialized {
tracing_context: Some(tracing_context),
Expand Down Expand Up @@ -292,6 +384,20 @@ impl Job {
self.events.push(JobEvent::JobCompleted);
}

/// Attach or overwrite the result value for this job.
///
/// Returns [`Idempotent::AlreadyApplied`] when the new value is identical
/// to the current one, allowing callers to skip the DB round-trip.
pub(crate) fn update_result(&mut self, result: JobResult) -> es_entity::Idempotent<()> {
if let Some(existing) = self.result()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use idempotency_guard!

&& *existing.as_value() == *result.as_value()
{
return es_entity::Idempotent::AlreadyApplied;
}
self.events.push(JobEvent::ResultUpdated { result });
es_entity::Idempotent::Executed(())
}

pub(super) fn maybe_schedule_retry(
&mut self,
now: DateTime<Utc>,
Expand Down Expand Up @@ -362,6 +468,7 @@ impl TryFromEvents<JobEvent> for Job {
JobEvent::ExecutionCompleted => {}
JobEvent::ExecutionAborted { .. } => {}
JobEvent::ExecutionErrored { .. } => {}
JobEvent::ResultUpdated { .. } => {}
JobEvent::JobCompleted => {}
JobEvent::Cancelled => {}
JobEvent::AttemptCounterReset => {}
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum JobError {
JobInitError(String),
#[error("JobError - BadState: {0}")]
CouldNotSerializeExecutionState(serde_json::Error),
#[error("JobError - BadResult: {0}")]
CouldNotSerializeResult(serde_json::Error),
#[error("JobError - BadConfig: {0}")]
CouldNotSerializeConfig(serde_json::Error),
#[error("JobError - NoInitializerPresent")]
Expand Down
20 changes: 13 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ use std::time::Duration;

pub use config::*;
pub use current::*;
pub use entity::{Job, JobTerminalState, JobType};
pub use entity::{Job, JobCompletionResult, JobResult, JobTerminalState, JobType};
pub use es_entity::clock::{Clock, ClockController, ClockHandle};
pub use migrate::*;
pub use registry::*;
Expand Down Expand Up @@ -532,7 +532,8 @@ impl Jobs {
}

/// Block until the given job reaches a terminal state (completed, errored, or
/// cancelled) and return the outcome.
/// cancelled) and return the outcome together with any result value the
/// runner attached via [`CurrentJob::set_result`].
///
/// When `timeout` is `Some(duration)`, the call returns
/// [`JobError::TimedOut`] if the job has not reached a terminal state
Expand All @@ -550,18 +551,23 @@ impl Jobs {
&self,
id: JobId,
timeout: Option<Duration>,
) -> Result<JobTerminalState, JobError> {
) -> Result<JobCompletionResult, JobError> {
// Fail fast if the job doesn't exist — avoids a 5-minute silent hang
// in the waiter manager for a JobId that will never resolve.
self.find(id).await?;
let rx = self.router.wait_for_terminal(id);
match timeout {
let state = match timeout {
Some(duration) => tokio::time::timeout(duration, rx)
.await
.map_err(|_| JobError::TimedOut(id))?
.map_err(|_| JobError::AwaitCompletionShutdown(id)),
None => rx.await.map_err(|_| JobError::AwaitCompletionShutdown(id)),
}
.map_err(|_| JobError::AwaitCompletionShutdown(id))?,
None => rx
.await
.map_err(|_| JobError::AwaitCompletionShutdown(id))?,
};
// Load job to retrieve any result value set by the runner
let job = self.find(id).await?;
Ok(JobCompletionResult::new(state, job.result().cloned()))
}

/// Non-blocking check for job completion.
Expand Down
Loading
Loading