Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/sim/src/env/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ where
trace!(score = %best.as_ref().map(|candidate| candidate.score).unwrap_or_default(), "Read outcome from channel");
let outcome = best.as_ref()?;

// Remove the item from the cache.
let item = self.sim_items().remove(outcome.cache_rank)?;
// Remove the item from the cache. Disallow it, as it will never be
// valid again.
let item = self.sim_items().remove_and_disallow(outcome.cache_rank)?;

// We can expect here as all of our simulations are done and cleaned up.
let inner = Arc::get_mut(&mut self.inner).expect("sims dropped already");
Expand Down
37 changes: 23 additions & 14 deletions crates/sim/src/env/sim_env.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{env::RollupEnv, HostEnv, SimCache, SimDb, SimItem, SimOutcomeWithCache};
use alloy::{consensus::TxEnvelope, hex};
use core::fmt;
use parking_lot::Mutex;
use signet_bundle::{RecoveredBundle, SignetEthBundleDriver, SignetEthBundleError};
use signet_evm::SignetInspector;
use signet_types::constants::SignetSystemConstants;
use std::{borrow::Cow, sync::Arc};
use std::borrow::Cow;
use tokio::sync::{mpsc, watch};
use tracing::{instrument, trace, trace_span, warn};
use trevm::{
Expand Down Expand Up @@ -33,6 +34,9 @@ pub struct SimEnv<RuDb, HostDb, RuInsp = NoOpInspector, HostInsp = NoOpInspector

/// The maximum number of concurrent simulations to run.
concurrency_limit: usize,

/// A buffer for simulation results.
sim_buffer: Mutex<Vec<(u128, SimItem)>>,
}

impl<RuDb, HostDb, RuInsp, HostInsp> fmt::Debug for SimEnv<RuDb, HostDb, RuInsp, HostInsp> {
Expand All @@ -46,14 +50,21 @@ impl<RuDb, HostDb, RuInsp, HostInsp> fmt::Debug for SimEnv<RuDb, HostDb, RuInsp,

impl<RuDb, HostDb, RuInsp, HostInsp> SimEnv<RuDb, HostDb, RuInsp, HostInsp> {
/// Create a new `SimEnv` instance.
pub const fn new(
pub fn new(
rollup: RollupEnv<RuDb, RuInsp>,
host: HostEnv<HostDb, HostInsp>,
finish_by: std::time::Instant,
concurrency_limit: usize,
sim_items: SimCache,
) -> Self {
Self { rollup, host, finish_by, concurrency_limit, sim_items }
Self {
rollup,
host,
finish_by,
concurrency_limit,
sim_items,
sim_buffer: Mutex::new(Vec::with_capacity(concurrency_limit)),
}
}

/// Get a reference to the rollup environment.
Expand Down Expand Up @@ -255,14 +266,16 @@ where
}

pub(crate) fn sim_round(
self: Arc<Self>,
&self,
max_gas: u64,
max_host_gas: u64,
best_tx: watch::Sender<Option<SimOutcomeWithCache>>,
) {
// Pull the `n` best items from the cache.
let active_sim = match self.sim_items.read_best_valid(
self.concurrency_limit,
// This lock is cheap. There should never be contention.
let mut active_sim = self.sim_buffer.lock();

match self.sim_items.write_best_valid_to(
active_sim.spare_capacity_mut(),
&self.rollup_env().db(),
&self.host_env().db(),
) {
Expand All @@ -280,22 +293,18 @@ where
let outer_ref = &outer;
let _og = outer.enter();

// to be used in the scope
let this_ref = self.clone();

std::thread::scope(|scope| {
// Spawn a thread per bundle to simulate.
for (cache_rank, item) in active_sim.into_iter() {
for (cache_rank, item) in active_sim.drain(..) {
let c = candidates.clone();
let this_ref = this_ref.clone();
scope.spawn(move || {
let identifier = item.identifier();
let _ig = trace_span!(parent: outer_ref, "sim_task", %identifier).entered();

// If simulation is succesful, send the outcome via the
// channel.

match this_ref.simulate(cache_rank, &item) {
match self.simulate(cache_rank, &item) {
Ok(candidate) if candidate.score.is_zero() => {
trace!("zero score candidate, skipping");
}
Expand All @@ -320,7 +329,7 @@ where
};
// fall through applies to all errors, occurs if
// the simulation fails or the gas limit is exceeded.
this_ref.sim_items.remove(cache_rank);
self.sim_items.remove(cache_rank);
});
}
// Drop the TX so that the channel is closed when all threads
Expand Down
Loading