Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions bento/crates/taskdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ pub async fn create_job(
}

// TODO: fix this clippy allow
pub async fn reset_failed_tasks(pool: &PgPool, job_id: &Uuid) -> Result<u64, TaskDbErr> {
let updated = sqlx::query(
r#"
UPDATE tasks
SET state = 'ready', waiting_on = 0, updated_at = now()
WHERE job_id = $1
AND state IN ('failed','pending')
AND job_id IN (SELECT id FROM jobs WHERE state != 'done')
AND NOT EXISTS (
SELECT 1
FROM tasks p
WHERE p.task_id = ANY(SELECT jsonb_array_elements_text(tasks.prerequisites))
AND p.job_id = tasks.job_id
AND p.state NOT IN ('done')
)
"#
)
.bind(job_id)
.execute(pool)
.await?
.rows_affected();
Ok(updated)
}

#[allow(clippy::too_many_arguments)]
pub async fn create_task(
pool: &PgPool,
Expand Down
13 changes: 11 additions & 2 deletions bento/crates/workflow/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ pub(crate) fn serialize_obj<T: Serialize>(item: &T) -> Result<Vec<u8>> {

/// Deserializes a an encoded function
pub(crate) fn deserialize_obj<T: for<'de> Deserialize<'de>>(encoded: &[u8]) -> Result<T> {
let decoded = bincode::deserialize(encoded)?;
Ok(decoded)
match bincode::deserialize(encoded) {
Ok(decoded) => Ok(decoded),
Err(e) => {
let msg = format!("{:?}", e);
if msg.contains("EOF") || msg.contains("end of file") {
// Add explicit marker so task runners can detect & trigger selective reset
return Err(anyhow::anyhow!("[B-TASK-EOF] Corrupted segment data: {}", msg));
}
Err(anyhow::anyhow!(e))
}
}
}
67 changes: 67 additions & 0 deletions crates/broker/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ pub trait BrokerDb {
) -> Result<(), DbError>;
async fn get_batch(&self, batch_id: usize) -> Result<Batch, DbError>;

/// Reset an order back to PendingProving and clear its proof_id.
/// Also remove associated tasks/jobs in the taskdb for the given job_id.
///
/// This mirrors the old reset-order.sh:
/// 1. Find proof_id (job_id) from SQLite
/// 2. Delete job, tasks, task_deps from Postgres
/// 3. Reset SQLite order status and clear proof_id
async fn reset_order(&self, order_id: &str) -> Result<(), DbError>;

#[cfg(test)]
async fn add_order(&self, order: &Order) -> Result<(), DbError>;
#[cfg(test)]
Expand Down Expand Up @@ -310,6 +319,64 @@ struct DbLockedRequest {

#[async_trait]
impl BrokerDb for SqliteDb {
async fn reset_order(&self, order_id: &str) -> Result<(), DbError> {
// Step 1: Lookup proof_id (job_id) from SQLite
let job_id: Option<String> = sqlx::query_scalar(
r#"SELECT json_extract(data, '$.proof_id') FROM orders WHERE id = ?"#,
)
.bind(order_id)
.fetch_optional(&self.pool)
.await
.map_err(DbError::from)?;

let Some(job_id) = job_id else {
return Err(DbError::NotFound(format!(
"Order {order_id} not found or missing proof_id"
)));
};

// Step 2: Delete job, tasks, and deps from Postgres
sqlx::query("DELETE FROM public.task_deps WHERE job_id = $1")
.bind(&job_id)
.execute(&self.pg_pool)
.await
.map_err(DbError::from)?;
sqlx::query("DELETE FROM public.tasks WHERE job_id = $1")
.bind(&job_id)
.execute(&self.pg_pool)
.await
.map_err(DbError::from)?;
sqlx::query("DELETE FROM public.jobs WHERE id = $1")
.bind(&job_id)
.execute(&self.pg_pool)
.await
.map_err(DbError::from)?;

// Step 3: Reset SQLite order row back to PendingProving and clear proof_id
let res = sqlx::query(
r#"
UPDATE orders
SET data = json_set(
json_set(data, '$.status', 'PendingProving'),
'$.proof_id', NULL
)
WHERE id = ?;
"#,
)
.bind(order_id)
.execute(&self.pool)
.await?;

if res.rows_affected() == 0 {
return Err(DbError::OrderNotFound(order_id.to_string()));
}

tracing::info!(
"[reset-order] order {order_id} reset to PendingProving; job {job_id} deleted in taskdb"
);

Ok(())
}
#[cfg(test)]
#[instrument(level = "trace", skip_all, fields(id = %format!("{}", order.id())))]
async fn add_order(&self, order: &Order) -> Result<(), DbError> {
Expand Down
73 changes: 72 additions & 1 deletion crates/broker/src/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,78 @@ impl ProvingService {
proof_retry_count
);

handle_order_failure(&self.db, &order_id, "Proving failed").await;
let err_str = format!("{err:?}");
if err_str.contains("[B-BON-005] Prover failure: SessionId") {
if let Some(job_id) = order.proof_id.clone() {
tracing::warn!("[auto-reset] Detected prover failure [B-BON-005], resetting order {order_id} job {job_id}");
if let Err(e) = self.db.reset_order(&order_id).await {
tracing::error!(
"[auto-reset] Failed to reset order {order_id} with job {job_id}: {:?}",
e
);
}
} else {
tracing::warn!("[auto-reset] Prover failure triggered but no proof_id present for order {order_id}");
}
} else if err_str.contains("[B-TASK-EOF]") {
use std::collections::hash_map::Entry;
use tokio::time::{sleep, Duration};
static MAX_SEGMENT_RETRIES: usize = 3;

thread_local! {
static SEGMENT_RETRY_MAP: std::cell::RefCell<std::collections::HashMap<String, usize>> =
std::cell::RefCell::new(std::collections::HashMap::new());
}

if let Some(job_id) = order.proof_id.clone() {
let mut exceeded = false;
SEGMENT_RETRY_MAP.with(|map| {
let mut map = map.borrow_mut();
match map.entry(format!("{order_id}:{job_id}")) {
Entry::Occupied(mut e) => {
let count = e.get_mut();
*count += 1;
if *count > MAX_SEGMENT_RETRIES {
exceeded = true;
e.remove();
}
}
Entry::Vacant(v) => {
v.insert(1);
}
}
});

if exceeded {
tracing::warn!(
"[segment-retry] Order {order_id}, job {job_id} exceeded {MAX_SEGMENT_RETRIES} retries — escalating to full reset"
);
if let Err(e) = self.db.reset_order(&order_id).await {
tracing::error!(
"[segment-retry] Failed to reset order {order_id} after exceeded retries: {:?}",
e
);
}
} else {
tracing::warn!(
"[segment-retry] Detected corrupted segment data for order {order_id}, job {job_id}. Retrying failed task (attempt <= {MAX_SEGMENT_RETRIES})"
);
sleep(Duration::from_secs(2)).await;
if let Err(e) = self.db.retry_corrupted_task(&job_id).await {
tracing::error!(
"[segment-retry] Failed to retry corrupted segment for job {job_id}: {:?}",
e
);
}
}
} else {
tracing::warn!(
"[segment-retry] Corrupted task marker detected but no proof_id/job_id present for order {order_id}"
);
}
} else {
handle_order_failure(&self.db, &order_id, "Proving failed").await;
}
}
}
}
Expand Down