Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,23 @@ impl Task for MyTask {
}
```

### User Errors

Return user errors with structured data using `TaskError::user()`:

```rust
// With structured data (message extracted from "message" field if present)
Err(TaskError::user(json!({"message": "Not found", "code": 404})))

// With any serializable type
Err(TaskError::user(MyError { code: 404, details: "..." }))

// Simple string message
Err(TaskError::user_message("Something went wrong"))
```

The error data is serialized to JSON and stored in the database for debugging and analysis.

### TaskContext

The [`TaskContext`] provides methods for durable execution:
Expand Down Expand Up @@ -287,7 +304,9 @@ This is useful when you need to guarantee that a task is only enqueued if relate
| [`Task`] | Trait for defining task types |
| [`TaskContext`] | Context passed to task execution |
| [`TaskResult<T>`] | Result type alias for task returns |
| [`TaskError`] | Error type with control flow signals |
| [`TaskError`] | Error type with control flow signals and user errors |
| [`TaskError::user()`] | Helper to create user errors with JSON data |
| [`TaskError::user_message()`] | Helper to create string user errors |
| [`TaskHandle<T>`] | Handle to a spawned subtask (returned by `ctx.spawn()`) |

### Configuration
Expand Down
64 changes: 61 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,22 @@ pub enum TaskError {
message: String,
},

/// An error from user task code.
/// A user error from task code.
///
/// This is the catch-all variant for errors returned by task implementations.
/// Use `anyhow::anyhow!()` or `?` on any error type to create this variant.
/// This variant stores a serialized user error for persistence and retrieval.
/// Created via [`TaskError::user()`] or [`TaskError::user_message()`].
#[error("{message}")]
User {
/// The error message (extracted from "message" field or stringified data)
message: String,
/// Serialized error data for storage/retrieval
error_data: JsonValue,
},

/// An internal error from user task code.
///
/// This is the catch-all variant for errors propagated via `?` on anyhow errors.
/// For structured user errors, prefer using [`TaskError::user()`].
#[error(transparent)]
TaskInternal(#[from] anyhow::Error),
}
Expand All @@ -118,6 +130,42 @@ pub enum TaskError {
/// Use this as the return type for [`Task::run`](crate::Task::run) implementations.
pub type TaskResult<T> = Result<T, TaskError>;

impl TaskError {
/// Create a user error from arbitrary JSON data.
///
/// If the JSON is an object with a "message" field, that's used for display.
/// Otherwise, the JSON is stringified for the display message.
///
/// ```ignore
/// // With structured data
/// Err(TaskError::user(json!({"message": "Not found", "code": 404})))
///
/// // With any serializable type
/// Err(TaskError::user(MyError { code: 404, details: "..." }))
/// ```
pub fn user(error_data: impl serde::Serialize) -> Self {
let error_data = serde_json::to_value(&error_data).unwrap_or(serde_json::Value::Null);
let message = error_data
.get("message")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| error_data.to_string());
TaskError::User {
message,
error_data,
}
}

/// Create a user error from just a message string.
pub fn user_message(message: impl Into<String>) -> Self {
let message = message.into();
TaskError::User {
error_data: serde_json::Value::String(message.clone()),
message,
}
}
}

impl From<serde_json::Error> for TaskError {
fn from(err: serde_json::Error) -> Self {
TaskError::Serialization(err)
Expand Down Expand Up @@ -192,6 +240,16 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue {
"message": message,
})
}
TaskError::User {
message,
error_data,
} => {
serde_json::json!({
"name": "User",
"message": message,
"error_data": error_data,
})
}
TaskError::TaskInternal(e) => {
serde_json::json!({
"name": "TaskInternal",
Expand Down
10 changes: 8 additions & 2 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ use crate::error::{TaskError, TaskResult};
///
/// async fn run(url: Self::Params, mut ctx: TaskContext, state: AppState) -> TaskResult<Self::Output> {
/// let body = ctx.step("fetch", || async {
/// state.http_client.get(&url).send().await?.text().await
/// .map_err(|e| anyhow::anyhow!(e))
/// state.http_client.get(&url).send().await
/// .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))?
/// .text().await
/// .map_err(|e| anyhow::anyhow!("HTTP error: {}", e))
/// }).await?;
/// Ok(body)
/// }
Expand All @@ -82,6 +84,10 @@ where
/// Use `?` freely - errors will propagate and the task will be retried
/// according to its [`RetryStrategy`](crate::RetryStrategy).
///
/// For user errors with structured data, use `TaskError::user(data)` where
/// data is any serializable value. For simple message errors, use
/// `TaskError::user_message("message")`.
///
/// The [`TaskContext`] provides methods for checkpointing, sleeping,
/// and waiting for events. See [`TaskContext`] for details.
///
Expand Down
33 changes: 14 additions & 19 deletions tests/common/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ impl Task<()> for FailingTask {
type Output = ();

async fn run(params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
Err(TaskError::TaskInternal(anyhow::anyhow!(
"{}",
params.error_message
)))
Err(TaskError::user_message(params.error_message.to_string()))
}
}

Expand Down Expand Up @@ -290,9 +287,9 @@ impl Task<()> for StepCountingTask {
.await?;

if params.fail_after_step2 {
return Err(TaskError::TaskInternal(anyhow::anyhow!(
"Intentional failure after step2"
)));
return Err(TaskError::user_message(
"Intentional failure after step2".to_string(),
));
}

let step3_value: String = ctx
Expand Down Expand Up @@ -512,9 +509,9 @@ impl Task<()> for FailingChildTask {
type Output = ();

async fn run(_params: Self::Params, _ctx: TaskContext, _state: ()) -> TaskResult<Self::Output> {
Err(TaskError::TaskInternal(anyhow::anyhow!(
"Child task failed intentionally"
)))
Err(TaskError::user_message(
"Child task failed intentionally".to_string(),
))
}
}

Expand Down Expand Up @@ -1031,9 +1028,7 @@ impl Task<()> for DeterministicReplayTask {
});

if should_fail {
return Err(TaskError::TaskInternal(anyhow::anyhow!(
"First attempt failure"
)));
return Err(TaskError::user_message("First attempt failure".to_string()));
}
}

Expand Down Expand Up @@ -1095,9 +1090,9 @@ impl Task<()> for EventThenFailTask {
});

if should_fail {
return Err(TaskError::TaskInternal(anyhow::anyhow!(
"First attempt failure after event"
)));
return Err(TaskError::user_message(
"First attempt failure after event".to_string(),
));
}

// Second attempt succeeds with the same payload (from checkpoint)
Expand Down Expand Up @@ -1241,9 +1236,9 @@ impl Task<()> for SpawnThenFailTask {
});

if should_fail {
return Err(TaskError::TaskInternal(anyhow::anyhow!(
"First attempt failure after spawn"
)));
return Err(TaskError::user_message(
"First attempt failure after spawn".to_string(),
));
}

// Second attempt - join child
Expand Down