diff --git a/bento/crates/taskdb/src/lib.rs b/bento/crates/taskdb/src/lib.rs index 9287993f9..2fc39cef6 100644 --- a/bento/crates/taskdb/src/lib.rs +++ b/bento/crates/taskdb/src/lib.rs @@ -153,6 +153,30 @@ pub async fn create_job( } // TODO: fix this clippy allow +pub async fn reset_failed_tasks(pool: &PgPool, job_id: &Uuid) -> Result { + let updated = sqlx::query( + r#" + UPDATE tasks + SET state = 'ready', waiting_on = 0, updated_at = now() + WHERE job_id = $1 + AND state IN ('failed','pending') + AND job_id IN (SELECT id FROM jobs WHERE state != 'done') + AND NOT EXISTS ( + SELECT 1 + FROM tasks p + WHERE p.task_id = ANY(SELECT jsonb_array_elements_text(tasks.prerequisites)) + AND p.job_id = tasks.job_id + AND p.state NOT IN ('done') + ) + "# + ) + .bind(job_id) + .execute(pool) + .await? + .rows_affected(); + Ok(updated) +} + #[allow(clippy::too_many_arguments)] pub async fn create_task( pool: &PgPool, diff --git a/bento/crates/workflow/src/tasks/mod.rs b/bento/crates/workflow/src/tasks/mod.rs index 904d823f4..847fa6c72 100644 --- a/bento/crates/workflow/src/tasks/mod.rs +++ b/bento/crates/workflow/src/tasks/mod.rs @@ -43,6 +43,15 @@ pub(crate) fn serialize_obj(item: &T) -> Result> { /// Deserializes a an encoded function pub(crate) fn deserialize_obj Deserialize<'de>>(encoded: &[u8]) -> Result { - let decoded = bincode::deserialize(encoded)?; - Ok(decoded) + match bincode::deserialize(encoded) { + Ok(decoded) => Ok(decoded), + Err(e) => { + let msg = format!("{:?}", e); + if msg.contains("EOF") || msg.contains("end of file") { + // Add explicit marker so task runners can detect & trigger selective reset + return Err(anyhow::anyhow!("[B-TASK-EOF] Corrupted segment data: {}", msg)); + } + Err(anyhow::anyhow!(e)) + } + } } diff --git a/crates/broker/src/db/mod.rs b/crates/broker/src/db/mod.rs index 0c382d0a3..7a1762869 100644 --- a/crates/broker/src/db/mod.rs +++ b/crates/broker/src/db/mod.rs @@ -192,6 +192,15 @@ pub trait BrokerDb { ) -> Result<(), DbError>; async fn get_batch(&self, batch_id: usize) -> Result; + /// Reset an order back to PendingProving and clear its proof_id. + /// Also remove associated tasks/jobs in the taskdb for the given job_id. + /// + /// This mirrors the old reset-order.sh: + /// 1. Find proof_id (job_id) from SQLite + /// 2. Delete job, tasks, task_deps from Postgres + /// 3. Reset SQLite order status and clear proof_id + async fn reset_order(&self, order_id: &str) -> Result<(), DbError>; + #[cfg(test)] async fn add_order(&self, order: &Order) -> Result<(), DbError>; #[cfg(test)] @@ -310,6 +319,64 @@ struct DbLockedRequest { #[async_trait] impl BrokerDb for SqliteDb { + async fn reset_order(&self, order_id: &str) -> Result<(), DbError> { + // Step 1: Lookup proof_id (job_id) from SQLite + let job_id: Option = sqlx::query_scalar( + r#"SELECT json_extract(data, '$.proof_id') FROM orders WHERE id = ?"#, + ) + .bind(order_id) + .fetch_optional(&self.pool) + .await + .map_err(DbError::from)?; + + let Some(job_id) = job_id else { + return Err(DbError::NotFound(format!( + "Order {order_id} not found or missing proof_id" + ))); + }; + + // Step 2: Delete job, tasks, and deps from Postgres + sqlx::query("DELETE FROM public.task_deps WHERE job_id = $1") + .bind(&job_id) + .execute(&self.pg_pool) + .await + .map_err(DbError::from)?; + sqlx::query("DELETE FROM public.tasks WHERE job_id = $1") + .bind(&job_id) + .execute(&self.pg_pool) + .await + .map_err(DbError::from)?; + sqlx::query("DELETE FROM public.jobs WHERE id = $1") + .bind(&job_id) + .execute(&self.pg_pool) + .await + .map_err(DbError::from)?; + + // Step 3: Reset SQLite order row back to PendingProving and clear proof_id + let res = sqlx::query( + r#" + UPDATE orders + SET data = json_set( + json_set(data, '$.status', 'PendingProving'), + '$.proof_id', NULL + ) + WHERE id = ?; + "#, + ) + .bind(order_id) + .execute(&self.pool) + .await?; + + if res.rows_affected() == 0 { + return Err(DbError::OrderNotFound(order_id.to_string())); + } + + tracing::info!( + "[reset-order] order {order_id} reset to PendingProving; job {job_id} deleted in taskdb" + ); + + Ok(()) + } #[cfg(test)] #[instrument(level = "trace", skip_all, fields(id = %format!("{}", order.id())))] async fn add_order(&self, order: &Order) -> Result<(), DbError> { diff --git a/crates/broker/src/proving.rs b/crates/broker/src/proving.rs index 0066f8463..f0394413a 100644 --- a/crates/broker/src/proving.rs +++ b/crates/broker/src/proving.rs @@ -347,7 +347,78 @@ impl ProvingService { proof_retry_count ); - handle_order_failure(&self.db, &order_id, "Proving failed").await; + let err_str = format!("{err:?}"); + if err_str.contains("[B-BON-005] Prover failure: SessionId") { + if let Some(job_id) = order.proof_id.clone() { + tracing::warn!("[auto-reset] Detected prover failure [B-BON-005], resetting order {order_id} job {job_id}"); + if let Err(e) = self.db.reset_order(&order_id).await { + tracing::error!( + "[auto-reset] Failed to reset order {order_id} with job {job_id}: {:?}", + e + ); + } + } else { + tracing::warn!("[auto-reset] Prover failure triggered but no proof_id present for order {order_id}"); + } + } else if err_str.contains("[B-TASK-EOF]") { + use std::collections::hash_map::Entry; + use tokio::time::{sleep, Duration}; + static MAX_SEGMENT_RETRIES: usize = 3; + + thread_local! { + static SEGMENT_RETRY_MAP: std::cell::RefCell> = + std::cell::RefCell::new(std::collections::HashMap::new()); + } + + if let Some(job_id) = order.proof_id.clone() { + let mut exceeded = false; + SEGMENT_RETRY_MAP.with(|map| { + let mut map = map.borrow_mut(); + match map.entry(format!("{order_id}:{job_id}")) { + Entry::Occupied(mut e) => { + let count = e.get_mut(); + *count += 1; + if *count > MAX_SEGMENT_RETRIES { + exceeded = true; + e.remove(); + } + } + Entry::Vacant(v) => { + v.insert(1); + } + } + }); + + if exceeded { + tracing::warn!( + "[segment-retry] Order {order_id}, job {job_id} exceeded {MAX_SEGMENT_RETRIES} retries — escalating to full reset" + ); + if let Err(e) = self.db.reset_order(&order_id).await { + tracing::error!( + "[segment-retry] Failed to reset order {order_id} after exceeded retries: {:?}", + e + ); + } + } else { + tracing::warn!( + "[segment-retry] Detected corrupted segment data for order {order_id}, job {job_id}. Retrying failed task (attempt <= {MAX_SEGMENT_RETRIES})" + ); + sleep(Duration::from_secs(2)).await; + if let Err(e) = self.db.retry_corrupted_task(&job_id).await { + tracing::error!( + "[segment-retry] Failed to retry corrupted segment for job {job_id}: {:?}", + e + ); + } + } + } else { + tracing::warn!( + "[segment-retry] Corrupted task marker detected but no proof_id/job_id present for order {order_id}" + ); + } + } else { + handle_order_failure(&self.db, &order_id, "Proving failed").await; + } } } }