Skip to content
Merged
6 changes: 6 additions & 0 deletions crates/broker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ pub struct MarketConf {
/// - "lock_cycle_price": Process lock-and-fulfill orders by highest ETH price per cycle, then fulfill-after-lock-expire randomly
#[serde(default, alias = "expired_order_fulfillment_priority")]
pub order_commitment_priority: OrderCommitmentPriority,
/// Whether to cancel Bento proving sessions when the order is no longer actionable
/// If false (default), Bento proving continues even if the order cannot be fulfilled in the
/// market. This should remain false to avoid losing partial PoVW jobs.
#[serde(default)]
pub cancel_proving_expired_orders: bool,
}

impl Default for MarketConf {
Expand Down Expand Up @@ -321,6 +326,7 @@ impl Default for MarketConf {
max_concurrent_preflights: defaults::max_concurrent_preflights(),
order_pricing_priority: OrderPricingPriority::default(),
order_commitment_priority: OrderCommitmentPriority::default(),
cancel_proving_expired_orders: false,
}
}
}
Expand Down
194 changes: 107 additions & 87 deletions crates/broker/src/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::pending;
use std::time::Duration;

use crate::{
Expand All @@ -37,11 +36,11 @@ pub enum ProvingErr {
#[error("{code} Proving failed after retries: {0:#}", code = self.code())]
ProvingFailed(anyhow::Error),

#[error("{code} Request fulfilled by another prover", code = self.code())]
ExternallyFulfilled,
#[error("{code} Order not actionable, should cancel: {0}", code = self.code())]
ShouldCancel(&'static str),

#[error("{code} Proving timed out", code = self.code())]
ProvingTimedOut,
#[error("{code} Proof completed but order not actionable: {0}", code = self.code())]
CompletedNotActionable(&'static str),

#[error("{code} Unexpected error: {0:#}", code = self.code())]
UnexpectedError(#[from] anyhow::Error),
Expand All @@ -53,8 +52,8 @@ impl CodedError for ProvingErr {
fn code(&self) -> &str {
match self {
ProvingErr::ProvingFailed(_) => "[B-PRO-501]",
ProvingErr::ExternallyFulfilled => "[B-PRO-502]",
ProvingErr::ProvingTimedOut => "[B-PRO-503]",
ProvingErr::ShouldCancel(_) => "[B-PRO-502]",
ProvingErr::CompletedNotActionable(_) => "[B-PRO-503]",
ProvingErr::UnexpectedError(_) => "[B-PRO-500]",
}
}
Expand All @@ -80,18 +79,6 @@ impl ProvingService {
Ok(Self { db, prover, config, order_state_tx, priority_requestors })
}

async fn cancel_stark_session(&self, proof_id: &str, order_id: &str, reason: &str) {
if let Err(err) = self.prover.cancel_stark(proof_id).await {
tracing::warn!(
"Failed to cancel proof {} for {} order {}: {}",
proof_id,
reason,
order_id,
err
);
}
}

async fn monitor_proof_internal(
&self,
order_id: &str,
Expand Down Expand Up @@ -208,42 +195,46 @@ impl ProvingService {

let proof_id = order.proof_id.as_ref().context("Order should have proof ID")?;

// Check config: should we cancel jobs when orders become not actionable?
let should_cancel_on_not_actionable =
self.config.lock_all().map(|c| c.market.cancel_proving_expired_orders).unwrap_or(false);

let timeout_duration = {
let expiry_timestamp_secs =
order.expire_timestamp.expect("Order should have expiry set");
let expiry_timestamp_secs = order.request.expires_at();
let now = now_timestamp();
Duration::from_secs(expiry_timestamp_secs.saturating_sub(now))
};
// Only subscribe to order state events for FulfillAfterLockExpire orders
let mut order_state_rx = if matches!(
order.fulfillment_type,
FulfillmentType::FulfillAfterLockExpire
) {
let rx = self.order_state_tx.subscribe();

// Track whether order is not actionable (expired or fulfilled externally)
let mut not_actionable_reason: Option<&'static str> = None;

let mut order_state_rx = self.order_state_tx.subscribe();
if matches!(order.fulfillment_type, FulfillmentType::FulfillAfterLockExpire)
|| order.expire_timestamp.unwrap() < now_timestamp()
{
// Check if the order has already been fulfilled before starting proof
match self.db.is_request_fulfilled(request_id).await {
Ok(true) => {
tracing::debug!(
"Order {} (request {}) was already fulfilled, skipping proof",
"Order {} (request {}) was already fulfilled before monitoring started",
order_id,
request_id
);
self.cancel_stark_session(proof_id, &order_id, "externally fulfilled").await;
return Err(ProvingErr::ExternallyFulfilled);
if should_cancel_on_not_actionable {
return Err(ProvingErr::ShouldCancel("Already fulfilled"));
} else {
not_actionable_reason = Some("Already fulfilled");
}
}
Ok(false) => Some(rx),
Ok(false) => {}
Err(e) => {
tracing::warn!(
"Failed to check fulfillment status for order {}, will continue proving: {e:?}",
order_id,
);
Some(rx)
}
}
} else {
None
};
}

let monitor_task = self.monitor_proof_internal(
&order_id,
Expand All @@ -253,48 +244,83 @@ impl ProvingService {
);
tokio::pin!(monitor_task);

// Note: this timeout may not exactly match the order expiry exactly due to
// discrepancy between wall clock and monotonic clock from the timeout,
// but this time, along with aggregation and submission time, should never
// exceed the actual order expiry.
let timeout_future = tokio::time::sleep(timeout_duration);
tokio::pin!(timeout_future);

let order_status = loop {
tokio::select! {
// Proof monitoring completed
res = &mut monitor_task => {
break res.with_context(|| {
let status = res.with_context(|| {
format!("Monitoring proof failed for order {order_id}, proof_id: {proof_id}")
}).map_err(ProvingErr::ProvingFailed)?;

// If order not actionable, return error instead of success
if let Some(reason) = not_actionable_reason {
tracing::info!(
"Proof completed for order {} but order not actionable: {}",
order_id,
reason
);
return Err(ProvingErr::CompletedNotActionable(reason));
}

break status;
}
// Timeout occurred

// Request expiry timeout
_ = &mut timeout_future => {
tracing::debug!(
"Proving timed out for order {}, cancelling proof {}",
order_id,
proof_id
);
self.cancel_stark_session(proof_id, &order_id, "timed out").await;
return Err(ProvingErr::ProvingTimedOut);
}
// External fulfillment notification (only active for FulfillAfterLockExpire orders)
Some(recv_res) = async {
match &mut order_state_rx {
Some(rx) => Some(rx.recv().await),
None => pending::<Option<Result<OrderStateChange, tokio::sync::broadcast::error::RecvError>>>().await,
tracing::debug!("Order {order_id} expired during proving");

if should_cancel_on_not_actionable {
return Err(ProvingErr::ShouldCancel("Order expired"));
} else {
tracing::debug!("Waiting for proof completion for capacity tracking");
not_actionable_reason = Some("Order expired");
// Disarm timeout so it doesn't fire again
timeout_future.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(365 * 24 * 60 * 60));
}
} => {
}

// External fulfillment notification
recv_res = order_state_rx.recv() => {
match recv_res {
Ok(OrderStateChange::Fulfilled { request_id: fulfilled_request_id }) if fulfilled_request_id == request_id => {
tracing::debug!(
"Order {} (request {}) was fulfilled by another prover, cancelling proof {}",
order_id,
request_id,
proof_id
);
self.cancel_stark_session(proof_id, &order_id, "externally fulfilled").await;
return Err(ProvingErr::ExternallyFulfilled);
// Determine if this makes the order not actionable based on order type
let is_not_actionable = match order.fulfillment_type {
FulfillmentType::FulfillAfterLockExpire => {
// Always not actionable - someone else fulfilled it
true
}
FulfillmentType::LockAndFulfill => {
// Only not actionable if lock already expired
now_timestamp() >= order.request.lock_expires_at()
}
FulfillmentType::FulfillWithoutLocking => {
// Always not actionable - someone else fulfilled it
true
}
};

if is_not_actionable {
tracing::debug!(
"Order {} (request {}) fulfilled externally and not actionable",
order_id,
request_id
);

if should_cancel_on_not_actionable {
return Err(ProvingErr::ShouldCancel("Externally fulfilled"));
} else {
tracing::debug!("Waiting for proof completion for capacity tracking");
not_actionable_reason = Some("Externally fulfilled");
}
} else {
tracing::trace!(
"Order {} fulfilled externally but lock not expired yet, continuing",
order_id
);
}
}
Ok(_) => {
// Fulfillment for a different request, continue monitoring
Expand Down Expand Up @@ -356,9 +382,17 @@ impl ProvingService {
tracing::error!("Failed to set aggregation status for order {order_id}: {e:?}");
}
}
Err(ProvingErr::ExternallyFulfilled) => {
tracing::info!("Order {order_id} was fulfilled by another prover, cancelled proof");
handle_order_failure(&self.db, &order_id, "Externally fulfilled").await;
Err(ProvingErr::ShouldCancel(reason)) => {
tracing::info!("Order {order_id} not actionable, cancelling proof: {reason}");
// Config says to cancel - release capacity immediately
cancel_proof_and_fail_order(&self.prover, &self.db, &self.config, &order, reason)
.await;
}
Err(ProvingErr::CompletedNotActionable(reason)) => {
tracing::info!(
"Order {order_id} proof completed but order not actionable: {reason}"
);
handle_order_failure(&self.db, &order_id, reason).await;
}
Err(err) => {
tracing::error!(
Expand All @@ -378,31 +412,17 @@ impl ProvingService {
self.db.get_active_proofs().await.context("Failed to get active proofs")?;

tracing::info!("Found {} proofs currently proving", current_proofs.len());
let now = crate::now_timestamp();
for order in current_proofs {
let order_id = order.id();
if order.expire_timestamp.unwrap() < now {
tracing::warn!("Order {} had expired on proving task start", order_id);
cancel_proof_and_fail_order(
&self.prover,
&self.db,
&order,
"Order expired on startup",
)
.await;
}
let prove_serv = self.clone();

if order.proof_id.is_none() {
tracing::error!("Order in status Proving missing proof_id: {order_id}");
handle_order_failure(&prove_serv.db, &order_id, "Proving status missing proof_id")
.await;
handle_order_failure(&self.db, &order_id, "Proving status missing proof_id").await;
continue;
}

// TODO: Manage these tasks in a joinset?
// They should all be fail-able without triggering a larger failure so it should be
// fine.
// Spawn monitoring task - it will handle expiry/cancellation based on config
let prove_serv = self.clone();
tokio::spawn(async move { prove_serv.prove_and_update_db(order).await });
}

Expand Down Expand Up @@ -753,7 +773,7 @@ mod tests {
let request_id = U256::from(123);
let proof_id = prover.prove_stark(&image_id, &input_id, vec![]).await.unwrap();

// Test 1: FulfillAfterLockExpire order cancelled by matching fulfillment event
// Test 1: FulfillAfterLockExpire order waits for completion when fulfilled externally
let order = create_test_order(
request_id,
image_id.clone(),
Expand All @@ -771,13 +791,13 @@ mod tests {
proving_service_clone.monitor_proof_with_timeout(order_clone).await
});

// Send fulfillment event for the same request - should cancel proof
// Send fulfillment event - should wait for proof completion (default config)
send_order_state_event(order_state_tx.clone(), OrderStateChange::Fulfilled { request_id })
.await;

let result = monitor_task.await.unwrap();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Request fulfilled by another prover"));
assert!(result.unwrap_err().to_string().contains("order not actionable"));

// Test 2: FulfillAfterLockExpire order ignores different request ID
let request_id_2 = U256::from(456);
Expand Down Expand Up @@ -812,6 +832,6 @@ mod tests {
assert!(result_2.is_ok());
assert_eq!(result_2.unwrap(), OrderStatus::PendingAgg);

assert!(logs_contain("was fulfilled by another prover"));
assert!(logs_contain("fulfilled externally"));
}
}
20 changes: 4 additions & 16 deletions crates/broker/src/reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,13 @@ pub enum ReaperError {

#[error("{code} Config error {0}", code = self.code())]
ConfigReadErr(#[from] ConfigErr),

#[error("{code} Failed to update expired order status: {0}", code = self.code())]
UpdateFailed(anyhow::Error),
}

impl CodedError for ReaperError {
fn code(&self) -> &str {
match self {
ReaperError::DbError(_) => "[B-REAP-001]",
ReaperError::ConfigReadErr(_) => "[B-REAP-002]",
ReaperError::UpdateFailed(_) => "[B-REAP-003]",
}
}
}
Expand Down Expand Up @@ -80,19 +76,11 @@ impl ReaperTask {
cancel_proof_and_fail_order(
&self.prover,
&self.db,
&self.config,
&order,
"Order expired in reaper",
)
.await;
match self.db.set_order_failure(&order_id, "Order expired").await {
Ok(()) => {
warn!("Order {} has expired, marked as failed", order_id);
}
Err(err) => {
error!("Failed to update status for expired order {}: {}", order_id, err);
return Err(ReaperError::UpdateFailed(err.into()));
}
}
}
}

Expand Down Expand Up @@ -263,9 +251,9 @@ mod tests {
let stored_expired2 = db.get_order(&expired_order2.id()).await.unwrap().unwrap();

assert_eq!(stored_expired1.status, OrderStatus::Failed);
assert_eq!(stored_expired1.error_msg, Some("Order expired".to_string()));
assert_eq!(stored_expired1.error_msg, Some("Order expired in reaper".to_string()));
assert_eq!(stored_expired2.status, OrderStatus::Failed);
assert_eq!(stored_expired2.error_msg, Some("Order expired".to_string()));
assert_eq!(stored_expired2.error_msg, Some("Order expired in reaper".to_string()));

// Check non-expired orders remain unchanged
let stored_active = db.get_order(&active_order.id()).await.unwrap().unwrap();
Expand Down Expand Up @@ -311,7 +299,7 @@ mod tests {
for order in orders {
let stored_order = db.get_order(&order.id()).await.unwrap().unwrap();
assert_eq!(stored_order.status, OrderStatus::Failed);
assert_eq!(stored_order.error_msg, Some("Order expired".to_string()));
assert_eq!(stored_order.error_msg, Some("Order expired in reaper".to_string()));
}
}
}
Loading
Loading