From 6b48c0ed7d174f41c212fb1c8361b1a764626a5d Mon Sep 17 00:00:00 2001 From: James Date: Wed, 7 Jan 2026 13:37:06 -0500 Subject: [PATCH] refactor: clean up sim round, remove an allocation --- crates/sim/src/env/shared.rs | 5 +++-- crates/sim/src/env/sim_env.rs | 37 ++++++++++++++++++++++------------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/crates/sim/src/env/shared.rs b/crates/sim/src/env/shared.rs index 6dc966ed..924d8d3e 100644 --- a/crates/sim/src/env/shared.rs +++ b/crates/sim/src/env/shared.rs @@ -119,8 +119,9 @@ where trace!(score = %best.as_ref().map(|candidate| candidate.score).unwrap_or_default(), "Read outcome from channel"); let outcome = best.as_ref()?; - // Remove the item from the cache. - let item = self.sim_items().remove(outcome.cache_rank)?; + // Remove the item from the cache. Disallow it, as it will never be + // valid again. + let item = self.sim_items().remove_and_disallow(outcome.cache_rank)?; // We can expect here as all of our simulations are done and cleaned up. let inner = Arc::get_mut(&mut self.inner).expect("sims dropped already"); diff --git a/crates/sim/src/env/sim_env.rs b/crates/sim/src/env/sim_env.rs index e3275792..aa892602 100644 --- a/crates/sim/src/env/sim_env.rs +++ b/crates/sim/src/env/sim_env.rs @@ -1,10 +1,11 @@ use crate::{env::RollupEnv, HostEnv, SimCache, SimDb, SimItem, SimOutcomeWithCache}; use alloy::{consensus::TxEnvelope, hex}; use core::fmt; +use parking_lot::Mutex; use signet_bundle::{RecoveredBundle, SignetEthBundleDriver, SignetEthBundleError}; use signet_evm::SignetInspector; use signet_types::constants::SignetSystemConstants; -use std::{borrow::Cow, sync::Arc}; +use std::borrow::Cow; use tokio::sync::{mpsc, watch}; use tracing::{instrument, trace, trace_span, warn}; use trevm::{ @@ -33,6 +34,9 @@ pub struct SimEnv>, } impl fmt::Debug for SimEnv { @@ -46,14 +50,21 @@ impl fmt::Debug for SimEnv SimEnv { /// Create a new `SimEnv` instance. - pub const fn new( + pub fn new( rollup: RollupEnv, host: HostEnv, finish_by: std::time::Instant, concurrency_limit: usize, sim_items: SimCache, ) -> Self { - Self { rollup, host, finish_by, concurrency_limit, sim_items } + Self { + rollup, + host, + finish_by, + concurrency_limit, + sim_items, + sim_buffer: Mutex::new(Vec::with_capacity(concurrency_limit)), + } } /// Get a reference to the rollup environment. @@ -255,14 +266,16 @@ where } pub(crate) fn sim_round( - self: Arc, + &self, max_gas: u64, max_host_gas: u64, best_tx: watch::Sender>, ) { - // Pull the `n` best items from the cache. - let active_sim = match self.sim_items.read_best_valid( - self.concurrency_limit, + // This lock is cheap. There should never be contention. + let mut active_sim = self.sim_buffer.lock(); + + match self.sim_items.write_best_valid_to( + active_sim.spare_capacity_mut(), &self.rollup_env().db(), &self.host_env().db(), ) { @@ -280,14 +293,10 @@ where let outer_ref = &outer; let _og = outer.enter(); - // to be used in the scope - let this_ref = self.clone(); - std::thread::scope(|scope| { // Spawn a thread per bundle to simulate. - for (cache_rank, item) in active_sim.into_iter() { + for (cache_rank, item) in active_sim.drain(..) { let c = candidates.clone(); - let this_ref = this_ref.clone(); scope.spawn(move || { let identifier = item.identifier(); let _ig = trace_span!(parent: outer_ref, "sim_task", %identifier).entered(); @@ -295,7 +304,7 @@ where // If simulation is succesful, send the outcome via the // channel. - match this_ref.simulate(cache_rank, &item) { + match self.simulate(cache_rank, &item) { Ok(candidate) if candidate.score.is_zero() => { trace!("zero score candidate, skipping"); } @@ -320,7 +329,7 @@ where }; // fall through applies to all errors, occurs if // the simulation fails or the gas limit is exceeded. - this_ref.sim_items.remove(cache_rank); + self.sim_items.remove(cache_rank); }); } // Drop the TX so that the channel is closed when all threads