Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ target/
.env
.bacon-locations
.claude/settings.local.json
.mcp.json
46 changes: 45 additions & 1 deletion src/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use es_entity::clock::ClockHandle;
use serde::{Serialize, de::DeserializeOwned};
use sqlx::PgPool;

use super::{JobId, error::JobError};
use std::sync::Arc;

use super::{JobId, entity::JobResult, error::JobError, repo::JobRepo};

/// Context provided to a [`JobRunner`](crate::JobRunner) while a job is executing.
pub struct CurrentJob {
Expand All @@ -16,6 +18,7 @@ pub struct CurrentJob {
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Receiver<()>>,
>,
clock: ClockHandle,
repo: Arc<JobRepo>,
}

impl CurrentJob {
Expand All @@ -28,6 +31,7 @@ impl CurrentJob {
tokio::sync::mpsc::Sender<tokio::sync::oneshot::Receiver<()>>,
>,
clock: ClockHandle,
repo: Arc<JobRepo>,
) -> Self {
Self {
id,
Expand All @@ -36,6 +40,7 @@ impl CurrentJob {
execution_state_json: execution_state,
shutdown_rx,
clock,
repo,
}
}

Expand Down Expand Up @@ -122,6 +127,45 @@ impl CurrentJob {
Ok(ret)
}

/// Attach or update the result value for this job execution.
///
/// The result is serialized to JSON and persisted to the database
/// immediately. It will be available to callers via
/// [`Jobs::await_completion`](crate::Jobs::await_completion). Each call
/// overwrites the previous value — the **last** value set is what callers
/// see. This allows incremental progress updates; for example, a batch job
/// can call `set_result` after each chunk so that partial progress is
/// preserved even on failure.
pub async fn set_result<T: Serialize>(&self, result: &T) -> Result<(), JobError> {
let job_result = JobResult::try_from(result).map_err(JobError::CouldNotSerializeResult)?;
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let mut job = self.repo.find_by_id_in_op(&mut op, self.id).await?;
if job.update_result(job_result).did_execute() {
self.repo.update_in_op(&mut op, &mut job).await?;
op.commit().await?;
}
Ok(())
}

/// Attach or update the result value as part of an existing database
/// operation.
///
/// This is the composable variant of [`set_result`](Self::set_result) —
/// callers provide an open atomic operation so the result write participates
/// in a larger transaction.
pub async fn set_result_in_op(
&self,
op: &mut impl es_entity::AtomicOperation,
result: &impl Serialize,
) -> Result<(), JobError> {
let job_result = JobResult::try_from(result).map_err(JobError::CouldNotSerializeResult)?;
let mut job = self.repo.find_by_id_in_op(&mut *op, self.id).await?;
if job.update_result(job_result).did_execute() {
self.repo.update_in_op(op, &mut job).await?;
}
Ok(())
}

/// Wait for a shutdown signal. Returns `true` if shutdown was requested.
///
/// Job runners can use this to detect when the application is shutting down
Expand Down
1 change: 1 addition & 0 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl JobDispatcher {
polled_job.data_json,
shutdown_rx,
self.clock.clone(),
Arc::clone(&self.repo),
);
self.tracker.dispatch_job();
match Self::dispatch_job(self.runner.take().expect("runner"), current_job).await {
Expand Down
97 changes: 97 additions & 0 deletions src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,65 @@ use es_entity::{context::TracingContext, *};

use crate::{JobId, error::JobError};

/// Newtype wrapper around a raw JSON value representing the result produced by a
/// job runner via [`CurrentJob::set_result`](crate::CurrentJob::set_result).
///
/// Using a dedicated type instead of bare `serde_json::Value` gives call sites
/// semantic clarity and prevents accidental mix-ups with other JSON payloads
/// (config, execution state, etc.).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct JobResult(serde_json::Value);

impl JobResult {
/// Consume the wrapper and return the inner JSON value.
pub fn into_inner(self) -> serde_json::Value {
self.0
}

/// Return a reference to the inner JSON value.
pub fn as_value(&self) -> &serde_json::Value {
&self.0
}

/// Deserialize the result into a typed struct.
pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
serde_json::from_value(self.0.clone())
}

/// Serialize a value into a `JobResult`.
pub fn try_from<T: Serialize>(value: &T) -> Result<Self, serde_json::Error> {
serde_json::to_value(value).map(Self)
}
}

/// Outcome returned by [`Jobs::await_completion`](crate::Jobs::await_completion),
/// carrying both the terminal state and an optional result value.
#[derive(Debug, Clone)]
pub struct JobCompletionResult {
state: JobTerminalState,
result: Option<JobResult>,
}

impl JobCompletionResult {
pub(crate) fn new(state: JobTerminalState, result: Option<JobResult>) -> Self {
Self { state, result }
}

/// The terminal state the job reached.
pub fn state(&self) -> JobTerminalState {
self.state
}

/// Deserialize the result value into a typed struct.
pub fn result<T: serde::de::DeserializeOwned>(&self) -> Result<Option<T>, serde_json::Error> {
match &self.result {
Some(r) => r.deserialize().map(Some),
None => Ok(None),
}
}
}

/// Terminal outcome of a job lifecycle.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JobTerminalState {
Expand Down Expand Up @@ -79,6 +138,9 @@ pub enum JobEvent {
ExecutionErrored {
error: String,
},
ResultUpdated {
result: JobResult,
},
JobCompleted,
Cancelled,
AttemptCounterReset,
Expand Down Expand Up @@ -223,6 +285,27 @@ impl Job {
}
}

/// Returns the raw result value attached to this job, if any.
///
/// Scans for the latest `ResultUpdated` event (last write wins).
pub(crate) fn raw_result(&self) -> Option<&JobResult> {
self.events.iter_all().rev().find_map(|event| {
if let JobEvent::ResultUpdated { result } = event {
Some(result)
} else {
None
}
})
}

/// Deserialize the result value into a typed struct.
pub fn result<T: serde::de::DeserializeOwned>(&self) -> Result<Option<T>, serde_json::Error> {
match self.raw_result() {
Some(r) => r.deserialize().map(Some),
None => Ok(None),
}
}

pub(crate) fn inject_tracing_parent(&self) {
if let JobEvent::Initialized {
tracing_context: Some(tracing_context),
Expand Down Expand Up @@ -292,6 +375,19 @@ impl Job {
self.events.push(JobEvent::JobCompleted);
}

/// Attach or overwrite the result value for this job.
///
/// Returns [`Idempotent::AlreadyApplied`] when the new value is identical
/// to the current one, allowing callers to skip the DB round-trip.
pub(crate) fn update_result(&mut self, result: JobResult) -> es_entity::Idempotent<()> {
idempotency_guard!(
self.events.iter_all().rev(),
already_applied: JobEvent::ResultUpdated { result: existing } if *existing.as_value() == *result.as_value()
);
self.events.push(JobEvent::ResultUpdated { result });
es_entity::Idempotent::Executed(())
}

pub(super) fn maybe_schedule_retry(
&mut self,
now: DateTime<Utc>,
Expand Down Expand Up @@ -362,6 +458,7 @@ impl TryFromEvents<JobEvent> for Job {
JobEvent::ExecutionCompleted => {}
JobEvent::ExecutionAborted { .. } => {}
JobEvent::ExecutionErrored { .. } => {}
JobEvent::ResultUpdated { .. } => {}
JobEvent::JobCompleted => {}
JobEvent::Cancelled => {}
JobEvent::AttemptCounterReset => {}
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum JobError {
JobInitError(String),
#[error("JobError - BadState: {0}")]
CouldNotSerializeExecutionState(serde_json::Error),
#[error("JobError - BadResult: {0}")]
CouldNotSerializeResult(serde_json::Error),
#[error("JobError - BadConfig: {0}")]
CouldNotSerializeConfig(serde_json::Error),
#[error("JobError - NoInitializerPresent")]
Expand Down
20 changes: 13 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ use std::time::Duration;

pub use config::*;
pub use current::*;
pub use entity::{Job, JobTerminalState, JobType};
pub use entity::{Job, JobCompletionResult, JobResult, JobTerminalState, JobType};
pub use es_entity::clock::{Clock, ClockController, ClockHandle};
pub use migrate::*;
pub use registry::*;
Expand Down Expand Up @@ -532,7 +532,8 @@ impl Jobs {
}

/// Block until the given job reaches a terminal state (completed, errored, or
/// cancelled) and return the outcome.
/// cancelled) and return the outcome together with any result value the
/// runner attached via [`CurrentJob::set_result`].
///
/// When `timeout` is `Some(duration)`, the call returns
/// [`JobError::TimedOut`] if the job has not reached a terminal state
Expand All @@ -550,18 +551,23 @@ impl Jobs {
&self,
id: JobId,
timeout: Option<Duration>,
) -> Result<JobTerminalState, JobError> {
) -> Result<JobCompletionResult, JobError> {
// Fail fast if the job doesn't exist — avoids a 5-minute silent hang
// in the waiter manager for a JobId that will never resolve.
self.find(id).await?;
let rx = self.router.wait_for_terminal(id);
match timeout {
let state = match timeout {
Some(duration) => tokio::time::timeout(duration, rx)
.await
.map_err(|_| JobError::TimedOut(id))?
.map_err(|_| JobError::AwaitCompletionShutdown(id)),
None => rx.await.map_err(|_| JobError::AwaitCompletionShutdown(id)),
}
.map_err(|_| JobError::AwaitCompletionShutdown(id))?,
None => rx
.await
.map_err(|_| JobError::AwaitCompletionShutdown(id))?,
};
// Load job to retrieve any result value set by the runner
let job = self.find(id).await?;
Ok(JobCompletionResult::new(state, job.raw_result().cloned()))
}

/// Non-blocking check for job completion.
Expand Down
Loading
Loading