diff --git a/crates/bundle/src/lib.rs b/crates/bundle/src/lib.rs index ee267132..d8eea553 100644 --- a/crates/bundle/src/lib.rs +++ b/crates/bundle/src/lib.rs @@ -52,5 +52,5 @@ pub use call::{SignetBundleDriver, SignetCallBundle, SignetCallBundleResponse}; mod send; pub use send::{ BundleInspector, BundleRecoverError, RecoverError, RecoveredBundle, SignetEthBundle, - SignetEthBundleDriver, SignetEthBundleError, SignetEthBundleInsp, + SignetEthBundleDriver, SignetEthBundleError, SignetEthBundleInsp, TxRequirement, }; diff --git a/crates/bundle/src/send/bundle.rs b/crates/bundle/src/send/bundle.rs index 66026645..bef4a868 100644 --- a/crates/bundle/src/send/bundle.rs +++ b/crates/bundle/src/send/bundle.rs @@ -16,7 +16,7 @@ use trevm::{ BundleError, }; -use crate::{BundleRecoverError, RecoveredBundle}; +use crate::{BundleRecoverError, RecoverError, RecoveredBundle}; /// The inspector type required by the Signet bundle driver. pub type BundleInspector = Layered; @@ -110,6 +110,10 @@ impl SignetEthBundle { /// Create a [`RecoveredBundle`] from this bundle by decoding and recovering /// all transactions, taking ownership of the bundle. pub fn try_into_recovered(self) -> Result { + if self.txs().is_empty() { + return Err(BundleRecoverError::new(RecoverError::EmptyBundle, false, 0)); + } + let txs = self.recover_txs().collect::, _>>()?; let host_txs = self.recover_host_txs().collect::, _>>()?; diff --git a/crates/bundle/src/send/decoded.rs b/crates/bundle/src/send/decoded.rs index ad0679bb..ceed22b8 100644 --- a/crates/bundle/src/send/decoded.rs +++ b/crates/bundle/src/send/decoded.rs @@ -1,9 +1,20 @@ use alloy::{ - consensus::{transaction::Recovered, TxEnvelope}, - primitives::{Address, TxHash}, + consensus::{transaction::Recovered, Transaction, TxEnvelope}, + primitives::{Address, TxHash, U256}, serde::OtherFields, }; +/// Transaction requirement info for a single transaction. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TxRequirement { + /// Signer address + pub signer: Address, + /// Nonce + pub nonce: u64, + /// Max spend (max_fee_per_gas * gas_limit) + value + pub balance: U256, +} + /// Version of [`SignetEthBundle`] with decoded transactions. /// /// [`SignetEthBundle`]: crate::send::bundle::SignetEthBundle @@ -49,14 +60,15 @@ pub struct RecoveredBundle { impl RecoveredBundle { /// Instantiator. Generally recommend instantiating via conversion from /// [`SignetEthBundle`] via [`SignetEthBundle::try_into_recovered`] or - /// [`SignetEthBundle::try_to_recovered`]. + /// [`SignetEthBundle::try_to_recovered`]. This allows instantiating empty + /// bundles, which are otherwise disallowed and is used for testing. /// /// [`SignetEthBundle`]: crate::send::bundle::SignetEthBundle /// [`SignetEthBundle::try_into_recovered`]: crate::send::bundle::SignetEthBundle::try_into_recovered /// [`SignetEthBundle::try_to_recovered`]: crate::send::bundle::SignetEthBundle::try_to_recovered #[doc(hidden)] #[allow(clippy::too_many_arguments)] - pub const fn new( + pub const fn new_unchecked( txs: Vec>, host_txs: Vec>, block_number: u64, @@ -106,21 +118,68 @@ impl RecoveredBundle { self.host_txs.drain(..) } + /// Get an iterator over the transaction requirements: + /// - signer address + /// - nonce + /// - min_balance ((max_fee_per_gas * gas_limit) + value) + pub fn tx_reqs(&self) -> impl Iterator + '_ { + self.txs.iter().map(|tx| { + let balance = U256::from(tx.max_fee_per_gas() * tx.gas_limit() as u128) + tx.value(); + TxRequirement { signer: tx.signer(), nonce: tx.nonce(), balance } + }) + } + + /// Get an iterator over the host transaction requirements: + /// - signer address + /// - nonce + /// - min_balance ((max_fee_per_gas * gas_limit) + value) + pub fn host_tx_reqs(&self) -> impl Iterator + '_ { + self.host_txs.iter().map(|tx| { + let balance = U256::from(tx.max_fee_per_gas() * tx.gas_limit() as u128) + tx.value(); + TxRequirement { signer: tx.signer(), nonce: tx.nonce(), balance } + }) + } + /// Getter for block_number, a standard bundle prop. pub const fn block_number(&self) -> u64 { self.block_number } + /// Get the valid timestamp range for this bundle. + pub const fn valid_timestamp_range(&self) -> std::ops::RangeInclusive { + let min = if let Some(min) = self.min_timestamp { min } else { 0 }; + let max = if let Some(max) = self.max_timestamp { max } else { u64::MAX }; + min..=max + } + /// Getter for min_timestamp, a standard bundle prop. - pub const fn min_timestamp(&self) -> Option { + pub const fn raw_min_timestamp(&self) -> Option { self.min_timestamp } + /// Getter for [`Self::raw_min_timestamp`], with default of 0. + pub const fn min_timestamp(&self) -> u64 { + if let Some(min) = self.min_timestamp { + min + } else { + 0 + } + } + /// Getter for max_timestamp, a standard bundle prop. - pub const fn max_timestamp(&self) -> Option { + pub const fn raw_max_timestamp(&self) -> Option { self.max_timestamp } + /// Getter for [`Self::raw_max_timestamp`], with default of `u64::MAX`. + pub const fn max_timestamp(&self) -> u64 { + if let Some(max) = self.max_timestamp { + max + } else { + u64::MAX + } + } + /// Getter for reverting_tx_hashes, a standard bundle prop. pub const fn reverting_tx_hashes(&self) -> &[TxHash] { self.reverting_tx_hashes.as_slice() diff --git a/crates/bundle/src/send/error.rs b/crates/bundle/src/send/error.rs index 83b12bb0..fa592a98 100644 --- a/crates/bundle/src/send/error.rs +++ b/crates/bundle/src/send/error.rs @@ -9,9 +9,14 @@ use trevm::{ /// bundles. #[derive(Debug, thiserror::Error)] pub enum RecoverError { + /// Bundle is empty. Bundles must contain at least one RU transaction. + #[error("Bundle must contain at least one RU transaction")] + EmptyBundle, + /// Error occurred while decoding the transaction. #[error(transparent)] Decoding(#[from] Eip2718Error), + /// Error occurred while recovering the signature. #[error(transparent)] Recovering(#[from] alloy::consensus::crypto::RecoveryError), diff --git a/crates/bundle/src/send/mod.rs b/crates/bundle/src/send/mod.rs index 86939da7..244747db 100644 --- a/crates/bundle/src/send/mod.rs +++ b/crates/bundle/src/send/mod.rs @@ -2,7 +2,7 @@ mod bundle; pub use bundle::{BundleInspector, SignetEthBundle}; mod decoded; -pub use decoded::RecoveredBundle; +pub use decoded::{RecoveredBundle, TxRequirement}; mod driver; pub use driver::{SignetEthBundleDriver, SignetEthBundleInsp}; diff --git a/crates/sim/Cargo.toml b/crates/sim/Cargo.toml index 4bd85990..19546507 100644 --- a/crates/sim/Cargo.toml +++ b/crates/sim/Cargo.toml @@ -23,7 +23,8 @@ trevm.workspace = true thiserror.workspace = true parking_lot.workspace = true +lru = "0.16.2" [dev-dependencies] tracing-subscriber.workspace = true -alloy = { workspace = true, features = ["getrandom"] } \ No newline at end of file +alloy = { workspace = true, features = ["getrandom"] } diff --git a/crates/sim/src/built.rs b/crates/sim/src/built.rs index 29bc1383..dbd665a7 100644 --- a/crates/sim/src/built.rs +++ b/crates/sim/src/built.rs @@ -6,7 +6,7 @@ use alloy::{ use core::fmt; use signet_bundle::RecoveredBundle; use signet_zenith::{encode_txns, Alloy2718Coder}; -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; use tracing::trace; /// A block that has been built by the simulator. @@ -137,8 +137,8 @@ impl BuiltBlock { self.host_gas_used += item.host_gas_used; match item.item { - SimItem::Bundle(bundle) => self.ingest_bundle(*bundle), - SimItem::Tx(tx) => self.ingest_tx(*tx), + SimItem::Bundle(bundle) => self.ingest_bundle(Arc::unwrap_or_clone(bundle)), + SimItem::Tx(tx) => self.ingest_tx(Arc::unwrap_or_clone(tx)), } } diff --git a/crates/sim/src/error.rs b/crates/sim/src/cache/error.rs similarity index 100% rename from crates/sim/src/error.rs rename to crates/sim/src/cache/error.rs diff --git a/crates/sim/src/item.rs b/crates/sim/src/cache/item.rs similarity index 50% rename from crates/sim/src/item.rs rename to crates/sim/src/cache/item.rs index 0007bd83..c462d7c0 100644 --- a/crates/sim/src/item.rs +++ b/crates/sim/src/cache/item.rs @@ -1,24 +1,26 @@ -use crate::CacheError; +use crate::{cache::StateSource, CacheError, SimItemValidity}; use alloy::{ consensus::{ transaction::{Recovered, SignerRecoverable}, Transaction, TxEnvelope, }, - primitives::TxHash, + primitives::{Address, TxHash, U256}, }; -use signet_bundle::{RecoveredBundle, SignetEthBundle}; +use signet_bundle::{RecoveredBundle, SignetEthBundle, TxRequirement}; use std::{ borrow::{Borrow, Cow}, + collections::BTreeMap, hash::Hash, + sync::Arc, }; -/// An item that can be simulated. +/// An item that can be simulated, wrapped in an Arc for cheap cloning. #[derive(Debug, Clone, PartialEq, Eq)] pub enum SimItem { /// A bundle to be simulated. - Bundle(Box), + Bundle(Arc), /// A transaction to be simulated. - Tx(Box>), + Tx(Arc>), } impl TryFrom for SimItem { @@ -57,7 +59,7 @@ impl TryFrom for SimItem { impl SimItem { /// Get the bundle if it is a bundle. - pub const fn as_bundle(&self) -> Option<&RecoveredBundle> { + pub fn as_bundle(&self) -> Option<&RecoveredBundle> { match self { Self::Bundle(bundle) => Some(bundle), Self::Tx(_) => None, @@ -65,7 +67,7 @@ impl SimItem { } /// Get the transaction if it is a transaction. - pub const fn as_tx(&self) -> Option<&Recovered> { + pub fn as_tx(&self) -> Option<&Recovered> { match self { Self::Bundle(_) => None, Self::Tx(tx) => Some(tx), @@ -110,6 +112,128 @@ impl SimItem { Self::Tx(tx) => SimIdentifier::Tx(*tx.inner().hash()), } } + + fn check_tx(&self, source: &S) -> Result> + where + S: StateSource, + { + let item = self.as_tx().expect("SimItem is not a Tx"); + + let total = U256::from(item.max_fee_per_gas() * item.gas_limit() as u128) + item.value(); + + source + .map(&item.signer(), |info| { + // if the chain nonce is greater than the tx nonce, it is + // no longer valid + if info.nonce > item.nonce() { + return SimItemValidity::Never; + } + // if the chain nonce is less than the tx nonce, we need to wait + if info.nonce < item.nonce() { + return SimItemValidity::Future; + } + // if the balance is insufficient, we need to wait + if info.balance < total { + return SimItemValidity::Future; + } + // nonce is equal and balance is sufficient + SimItemValidity::Now + }) + .map_err(Into::into) + } + + fn check_bundle_tx_list( + items: impl Iterator, + source: &S, + ) -> Result + where + S: StateSource, + { + // For bundles, we want to check the nonce of each transaction. To do + // this, we build a small in memory cache so that if the same signer + // appears, we can reuse the nonce info. We do not check balances after + // the first tx, as they may have changed due to prior txs in the + // bundle. + + let mut nonce_cache: BTreeMap = BTreeMap::new(); + let mut items = items.peekable(); + + // Peek to perform the balance check for the first tx + if let Some(first) = items.peek() { + let info = source.account_details(&first.signer)?; + + // check balance for the first tx is sufficient + if first.balance > info.balance { + return Ok(SimItemValidity::Future); + } + + // Cache the nonce. This will be used for the first tx. + nonce_cache.insert(first.signer, info.nonce); + } + + for requirement in items { + let state_nonce = match nonce_cache.get(&requirement.signer) { + Some(cached_nonce) => *cached_nonce, + None => { + let nonce = source.nonce(&requirement.signer)?; + nonce_cache.insert(requirement.signer, nonce); + nonce + } + }; + + if requirement.nonce < state_nonce { + return Ok(SimItemValidity::Never); + } + if requirement.nonce > state_nonce { + return Ok(SimItemValidity::Future); + } + + // Increment the cached nonce for the next transaction from this + // signer. Map _must_ have the entry as we just either loaded or + // stored it above + nonce_cache.entry(requirement.signer).and_modify(|n| *n += 1); + } + + // All transactions passed + Ok(SimItemValidity::Now) + } + + fn check_bundle( + &self, + source: &S, + host_source: &S2, + ) -> Result> + where + S: StateSource, + S2: StateSource, + { + let item = self.as_bundle().expect("SimItem is not a Bundle"); + + let ru_tx = Self::check_bundle_tx_list(item.tx_reqs(), source)?; + let host_tx = Self::check_bundle_tx_list(item.host_tx_reqs(), host_source)?; + + // Check both the regular txs and the host txs. + Ok(ru_tx.min(host_tx)) + } + + /// Check if the item is valid against the provided state sources. + /// + /// This will check that nonces and balances are sufficient for the item to + /// be included on the current state. + pub fn check( + &self, + source: &S, + host_source: &S2, + ) -> Result> + where + S: StateSource, + S2: StateSource, + { + match self { + SimItem::Bundle(_) => self.check_bundle(source, host_source), + SimItem::Tx(_) => self.check_tx(source), + } + } } /// A simulation cache item identifier. diff --git a/crates/sim/src/cache/mod.rs b/crates/sim/src/cache/mod.rs new file mode 100644 index 00000000..4ec67e82 --- /dev/null +++ b/crates/sim/src/cache/mod.rs @@ -0,0 +1,14 @@ +mod error; +pub use error::CacheError; + +mod item; +pub use item::{SimIdentifier, SimItem}; + +mod state; +pub use state::StateSource; + +mod store; +pub use store::SimCache; + +mod validity; +pub use validity::SimItemValidity; diff --git a/crates/sim/src/cache/state.rs b/crates/sim/src/cache/state.rs new file mode 100644 index 00000000..d308675e --- /dev/null +++ b/crates/sim/src/cache/state.rs @@ -0,0 +1,53 @@ +use alloy::primitives::{Address, U256}; + +/// Account information including nonce and balance. This is partially modeled +/// after [`revm::AccountInfo`], but only includes the fields we care about. +/// +/// [`revm::AccountInfo`]: trevm::revm::state::AccountInfo +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AcctInfo { + pub nonce: u64, + pub balance: U256, + pub has_code: bool, +} + +/// A source for nonce and balance information. Exists to simplify type bounds +/// in various places. +pub trait StateSource { + /// The error type for state lookups, usually a database error. + type Error: core::error::Error + 'static; + + /// Get account details for an address. + fn account_details(&self, address: &Address) -> Result; + + /// Get the nonce for an address. This should return the NEXT EXPECTED + /// nonce. I.e. `0` for an address that has never sent a transaction, 1 for an address that has sent exactly one transaction, etc. + fn nonce(&self, address: &Address) -> Result { + self.account_details(address).map(|info| info.nonce) + } + + /// Get the balance for an address. + fn balance(&self, address: &Address) -> Result { + self.account_details(address).map(|info| info.balance) + } + + /// Run an arbitrary check on the account details for an address. + fn map T>(&self, address: &Address, f: F) -> Result { + self.account_details(address).map(|info| f(&info)) + } +} + +impl StateSource for Db +where + Db: trevm::revm::DatabaseRef, +{ + type Error = Db::Error; + + fn account_details(&self, address: &Address) -> Result { + let info = self.basic_ref(*address)?.unwrap_or_default(); + + let has_code = info.code_hash() != trevm::revm::primitives::KECCAK_EMPTY; + + Ok(AcctInfo { nonce: info.nonce, balance: info.balance, has_code }) + } +} diff --git a/crates/sim/src/cache.rs b/crates/sim/src/cache/store.rs similarity index 52% rename from crates/sim/src/cache.rs rename to crates/sim/src/cache/store.rs index 97720b7e..d0d0770f 100644 --- a/crates/sim/src/cache.rs +++ b/crates/sim/src/cache/store.rs @@ -1,10 +1,14 @@ -use crate::{item::SimIdentifier, CacheError, SimItem}; +use crate::cache::{CacheError, SimIdentifier, SimItem, StateSource}; use alloy::consensus::{transaction::Recovered, TxEnvelope}; use core::fmt; +use lru::LruCache; use parking_lot::RwLock; use signet_bundle::{RecoveredBundle, SignetEthBundle}; use std::{ collections::{BTreeMap, HashSet}, + mem::MaybeUninit, + num::NonZeroUsize, + ops::Deref, sync::Arc, }; @@ -13,7 +17,7 @@ use std::{ /// This cache is used to store the items that are being simulated. #[derive(Clone)] pub struct SimCache { - inner: Arc>, + inner: Arc>, capacity: usize, } @@ -32,24 +36,125 @@ impl Default for SimCache { impl SimCache { /// Create a new `SimCache` instance, with a default capacity of `100`. pub fn new() -> Self { - Self { inner: Arc::new(RwLock::new(CacheInner::new())), capacity: 100 } + Self { inner: Arc::new(RwLock::new(CacheStore::new())), capacity: 100 } } /// Create a new `SimCache` instance with a given capacity. pub fn with_capacity(capacity: usize) -> Self { - Self { inner: Arc::new(RwLock::new(CacheInner::new())), capacity } + Self { inner: Arc::new(RwLock::new(CacheStore::new())), capacity } + } + + /// Fill a buffer with up to its capacity + pub fn write_best_to(&self, buf: &mut [MaybeUninit<(u128, SimItem)>]) -> usize { + let cache = self.inner.read(); + cache.items.iter().rev().zip(buf.iter_mut()).for_each(|((cache_rank, item), slot)| { + // Cloning the Arc into the MaybeUninit slot + slot.write((*cache_rank, item.clone())); + }); + // We wrote the minimum of what was in the cache and the buffer + std::cmp::min(cache.items.len(), buf.len()) } /// Get an iterator over the best items in the cache. pub fn read_best(&self, n: usize) -> Vec<(u128, SimItem)> { - self.inner - .read() + let mut vec = Vec::with_capacity(n); + let n = self.write_best_to(vec.spare_capacity_mut()); + // SAFETY: We just wrote n items. + unsafe { vec.set_len(n) }; + vec + } + + /// Iter over the best items in the cache, writing only those that pass + /// preflight validity checks (nonce and initial fee) to the buffer. + /// + /// The state sources are used to validate the items against the current + /// nonce and balance, to prevent simulating invalid items. + /// + /// This will additionally remove items that can _never_ be valid from the + /// cache. + /// + /// When an error is encountered, the process stops and the error is + /// returned. At this point, the buffer may be partially written. + pub fn write_best_valid_to( + &self, + buf: &mut [MaybeUninit<(u128, SimItem)>], + source: &S, + host_source: &S2, + ) -> Result> + where + S: StateSource, + S2: StateSource, + { + let mut cache = self.inner.upgradable_read(); + let mut slots = buf.iter_mut(); + let start = slots.len(); + + let mut never = Vec::new(); + + // Traverse the cache in reverse order (best items first), checking + // each item. + // + // Errors are shortcut by `try_for_each`. Passes are written to the + // buffer, consuming slots. Once no slots are left, the try_for_each + // returns early. + let res = cache .items .iter() .rev() - .take(n) - .map(|(cache_rank, item)| (*cache_rank, item.clone())) - .collect() + .map(|(rank, item)| { + item.check(source, host_source).map(|validity| (validity, rank, item)) + }) + .try_for_each(|result| { + if slots.len() == 0 { + return Ok(()); + } + let (validity, rank, item) = result?; + + if validity.is_valid_now() { + slots.next().expect("checked by len").write((*rank, item.clone())); + } + if validity.is_never_valid() { + never.push(*rank); + } + + Ok(()) + }) + .map(|_| start - slots.len()); + + cache.with_upgraded(|cache| { + // Remove never valid items from the cache + never.iter().for_each(|rank| { + cache.remove_and_disallow(*rank); + }); + }); + + res + } + + /// Get up to the `n` best items in the cache that pass preflight validity + /// checks (nonce and initial fee). The returned vector may be smaller than + /// `n` if not enough valid items are found. + /// + /// This will additionally remove items that can _never_ be valid from the + /// cache. + /// + /// The state sources are used to validate the items against the current + /// nonce and balance, to prevent simulating invalid items. + pub fn read_best_valid( + &self, + n: usize, + source: &S, + host_source: &S2, + ) -> Result, Box> + where + S: StateSource, + S2: StateSource, + { + let mut vec = Vec::with_capacity(n); + let n = self.write_best_valid_to(vec.spare_capacity_mut(), source, host_source)?; + // SAFETY: We just wrote n items. + unsafe { vec.set_len(n) }; + Ok(vec) } /// Get the number of items in the cache. @@ -70,33 +175,13 @@ impl SimCache { /// Remove an item by key. pub fn remove(&self, cache_rank: u128) -> Option { let mut inner = self.inner.write(); - if let Some(item) = inner.items.remove(&cache_rank) { - inner.seen.remove(item.identifier().as_bytes()); - Some(item) - } else { - None - } + inner.remove(cache_rank) } - fn add_inner(inner: &mut CacheInner, mut cache_rank: u128, item: SimItem, capacity: usize) { - // Check if we've already seen this item - if so, don't add it - if !inner.seen.insert(item.identifier_owned()) { - return; - } - - // If it has the same cache_rank, we decrement (prioritizing earlier items) - while inner.items.contains_key(&cache_rank) && cache_rank != 0 { - cache_rank = cache_rank.saturating_sub(1); - } - - if inner.items.len() >= capacity { - // If we are at capacity, we need to remove the lowest score - if let Some((_, item)) = inner.items.pop_first() { - inner.seen.remove(&item.identifier_owned()); - } - } - - inner.items.insert(cache_rank, item.clone()); + /// Remove an item by key, and prevent it from being re-added for a while. + pub fn remove_and_disallow(&self, cache_rank: u128) -> Option { + let mut inner = self.inner.write(); + inner.remove_and_disallow(cache_rank) } /// Add a bundle to the cache. @@ -110,7 +195,7 @@ impl SimCache { let cache_rank = item.calculate_total_fee(basefee); let mut inner = self.inner.write(); - Self::add_inner(&mut inner, cache_rank, item, self.capacity); + inner.add_inner(cache_rank, item, self.capacity); Ok(()) } @@ -124,16 +209,7 @@ impl SimCache { Item: Into, { let mut inner = self.inner.write(); - - for item in item.into_iter() { - let item = item.into(); - let Ok(item) = SimItem::try_from(item) else { - // Skip invalid bundles - continue; - }; - let cache_rank = item.calculate_total_fee(basefee); - Self::add_inner(&mut inner, cache_rank, item, self.capacity); - } + inner.add_bundles(item, basefee, self.capacity); } /// Add a transaction to the cache. @@ -142,7 +218,7 @@ impl SimCache { let cache_rank = item.calculate_total_fee(basefee); let mut inner = self.inner.write(); - Self::add_inner(&mut inner, cache_rank, item, self.capacity); + inner.add_inner(cache_rank, item, self.capacity); } /// Add an iterator of transactions to the cache. This locks the cache only once @@ -151,71 +227,166 @@ impl SimCache { I: IntoIterator>, { let mut inner = self.inner.write(); - - for item in item.into_iter() { - let item = SimItem::from(item); - let cache_rank = item.calculate_total_fee(basefee); - Self::add_inner(&mut inner, cache_rank, item, self.capacity); - } + inner.add_txs(item, basefee, self.capacity); } /// Clean the cache by removing bundles that are not valid in the current /// block. pub fn clean(&self, block_number: u64, block_timestamp: u64) { let mut inner = self.inner.write(); - - // Trim to capacity by dropping lower fees. - while inner.items.len() > self.capacity { - if let Some((_, item)) = inner.items.pop_first() { - // Drop the identifier from the seen cache as well. - inner.seen.remove(item.identifier().as_bytes()); - } - } - - let CacheInner { ref mut items, ref mut seen } = *inner; - - items.retain(|_, item| { - // Retain only items that are not bundles or are valid in the current block. - if let SimItem::Bundle(bundle) = item { - let should_keep = bundle.is_valid_at_block_number(block_number) - && bundle.is_valid_at_timestamp(block_timestamp); - - if !should_keep { - seen.remove(item.identifier().as_bytes()); - } - - should_keep - } else { - true // Non-bundle items are retained - } - }); + inner.clean(self.capacity, block_number, block_timestamp); } /// Clear the cache. pub fn clear(&self) { let mut inner = self.inner.write(); - inner.items.clear(); - inner.seen.clear(); + inner.clear(); } } /// Internal cache data, meant to be protected by a lock. -struct CacheInner { +struct CacheStore { /// Key is the cache_rank, unique ID within the cache && the item's order in the cache. Value is [`SimItem`] itself. items: BTreeMap, - /// Key is the unique identifier for the [`SimItem`] - the UUID for bundles, tx hash for transactions. + + /// Key is the unique identifier for the [`SimItem`] - the UUID for + /// bundles, tx hash for transactions. seen: HashSet>, + + /// Identifiers of items that have been removed from the cache, as + /// they will never be valid again + disallowed: LruCache, ()>, } -impl fmt::Debug for CacheInner { +impl fmt::Debug for CacheStore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CacheInner").finish() } } -impl CacheInner { +impl CacheStore { fn new() -> Self { - Self { items: BTreeMap::new(), seen: HashSet::new() } + Self { + items: BTreeMap::new(), + seen: HashSet::new(), + disallowed: LruCache::new(NonZeroUsize::new(128).unwrap()), + } + } + + /// Add an item to the cache. + fn add_inner(&mut self, mut cache_rank: u128, item: SimItem, capacity: usize) { + // If the item is disallowed, we don't add it + if self.disallowed.contains(&item.identifier_owned()) { + return; + } + + // Check if we've already seen this item - if so, don't add it + if !self.seen.insert(item.identifier_owned()) { + return; + } + + // If it has the same cache_rank, we decrement (prioritizing earlier items) + while self.items.contains_key(&cache_rank) && cache_rank != 0 { + cache_rank = cache_rank.saturating_sub(1); + } + + if self.items.len() >= capacity { + // If we are at capacity, we need to remove the lowest score + if let Some((_, item)) = self.items.pop_first() { + self.seen.remove(&item.identifier_owned()); + } + } + + self.items.insert(cache_rank, item.clone()); + } + + fn add_bundles(&mut self, item: I, basefee: u64, capacity: usize) + where + I: IntoIterator, + T: Into, + { + for item in item.into_iter() { + let item = item.into(); + let Ok(item) = SimItem::try_from(item) else { + // Skip invalid bundles + continue; + }; + let cache_rank = item.calculate_total_fee(basefee); + self.add_inner(cache_rank, item, capacity); + } + } + + fn add_txs(&mut self, item: I, basefee: u64, capacity: usize) + where + I: IntoIterator>, + { + for item in item.into_iter() { + let item = SimItem::from(item); + let cache_rank = item.calculate_total_fee(basefee); + self.add_inner(cache_rank, item, capacity); + } + } + + /// Remove an item by key. This will also remove it from the seen set. + fn remove(&mut self, cache_rank: u128) -> Option { + if let Some(item) = self.items.remove(&cache_rank) { + self.seen.remove(item.identifier().as_bytes()); + Some(item) + } else { + None + } + } + /// Remove an item by key, and prevent it from being re-added for a while. + /// This will also remove it from the seen set. + fn remove_and_disallow(&mut self, cache_rank: u128) -> Option { + self.remove(cache_rank).inspect(|item| { + self.disallowed.put(item.identifier_owned(), ()); + }) + } + + /// Clean the cache by evicting the lowest-score items and removing bundles + /// that are not valid in the current block. + fn clean(&mut self, capacity: usize, block_number: u64, block_timestamp: u64) { + // Trim to capacity by dropping lower fees. + while self.items.len() > capacity { + if let Some(key) = self.items.keys().next() { + self.remove_and_disallow(*key); + } + } + + self.items.retain(|_, item| { + // Retain only items that are not bundles or are valid in the current block. + if let SimItem::Bundle(bundle) = item.deref() { + let ts_range = bundle.valid_timestamp_range(); + let bundle_block = bundle.block_number(); + + // NB: we don't need to recheck max_timestamp here, as never + // covers that. + let now = block_number == bundle_block && ts_range.contains(&block_timestamp); + + // Never valid if the block number is past the bundle's target + // block or timestamp is past the bundle's max timestamp + let never = + !now && (block_number > bundle_block || block_timestamp > *ts_range.end()); + + if !now { + self.seen.remove(item.identifier().as_bytes()); + } + + if never { + self.disallowed.put(item.identifier_owned(), ()); + } + + now + } else { + true // Non-bundle items are retained + } + }); + } + + fn clear(&mut self) { + self.items.clear(); + self.seen.clear(); } } @@ -298,7 +469,7 @@ mod test { replacement_uuid: String, ) -> signet_bundle::RecoveredBundle { let tx = invalid_tx_with_score(gas_limit, mpfpg); - signet_bundle::RecoveredBundle::new( + signet_bundle::RecoveredBundle::new_unchecked( vec![tx], vec![], 1, diff --git a/crates/sim/src/cache/validity.rs b/crates/sim/src/cache/validity.rs new file mode 100644 index 00000000..bd420ebd --- /dev/null +++ b/crates/sim/src/cache/validity.rs @@ -0,0 +1,33 @@ +/// The validity status of a simulation item. +/// +/// These are ordered from least to most valid. An item that is `Never` valid +/// is always invalid, an item that is `Future` valid may become valid in the +/// future, and an item that is `Now` valid is currently valid. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum SimItemValidity { + /// The item is invalid and should not be simulated. + Never, + /// The item is currently invalid, but may become valid in the future. + /// + /// For example, this may be due to nonce gaps. + Future, + /// The item is valid and can be simulated. + Now, +} + +impl SimItemValidity { + /// Returns true if the item is valid now. + pub const fn is_valid_now(&self) -> bool { + matches!(self, SimItemValidity::Now) + } + + /// Returns true if the item is never valid. + pub const fn is_never_valid(&self) -> bool { + matches!(self, SimItemValidity::Never) + } + + /// Returns true if the item may be valid in the future. + pub const fn is_future_valid(&self) -> bool { + matches!(self, SimItemValidity::Future) + } +} diff --git a/crates/sim/src/env/sim_env.rs b/crates/sim/src/env/sim_env.rs index 42a05323..e3275792 100644 --- a/crates/sim/src/env/sim_env.rs +++ b/crates/sim/src/env/sim_env.rs @@ -6,7 +6,7 @@ use signet_evm::SignetInspector; use signet_types::constants::SignetSystemConstants; use std::{borrow::Cow, sync::Arc}; use tokio::sync::{mpsc, watch}; -use tracing::{instrument, trace, trace_span}; +use tracing::{instrument, trace, trace_span, warn}; use trevm::{ helpers::Ctx, revm::{ @@ -104,9 +104,9 @@ impl SimEnv { impl SimEnv where - RuDb: DatabaseRef + Send + Sync, + RuDb: DatabaseRef + Send + Sync, RuInsp: Inspector>> + Default + Sync, - HostDb: DatabaseRef + Send + Sync, + HostDb: DatabaseRef + Send + Sync, HostInsp: Inspector>> + Default + Sync, { /// Simulates a transaction in the context of a block. @@ -261,7 +261,17 @@ where best_tx: watch::Sender>, ) { // Pull the `n` best items from the cache. - let active_sim = self.sim_items.read_best(self.concurrency_limit); + let active_sim = match self.sim_items.read_best_valid( + self.concurrency_limit, + &self.rollup_env().db(), + &self.host_env().db(), + ) { + Ok(items) => items, + Err(error) => { + warn!(%error, "State access error during sim round preflight"); + return; + } + }; // Create a channel to send the results back. let (candidates, mut candidates_rx) = mpsc::channel(self.concurrency_limit); diff --git a/crates/sim/src/lib.rs b/crates/sim/src/lib.rs index d55f670a..d841454e 100644 --- a/crates/sim/src/lib.rs +++ b/crates/sim/src/lib.rs @@ -19,17 +19,11 @@ mod built; pub use built::BuiltBlock; mod cache; -pub use cache::SimCache; +pub use cache::{CacheError, SimCache, SimIdentifier, SimItem, SimItemValidity, StateSource}; mod env; pub use env::{HostEnv, RollupEnv, SharedSimEnv, SimEnv}; -mod error; -pub use error::CacheError; - -mod item; -pub use item::{SimIdentifier, SimItem}; - mod outcome; pub use outcome::SimOutcomeWithCache; diff --git a/crates/test-utils/tests/basic_sim.rs b/crates/test-utils/tests/basic_sim.rs index 426d123b..8bdddf85 100644 --- a/crates/test-utils/tests/basic_sim.rs +++ b/crates/test-utils/tests/basic_sim.rs @@ -1,21 +1,70 @@ use alloy::{ consensus::{ - constants::GWEI_TO_WEI, transaction::SignerRecoverable, Signed, TxEip1559, TxEnvelope, + constants::GWEI_TO_WEI, + transaction::{Recovered, SignerRecoverable}, + Signed, Transaction, TxEip1559, TxEnvelope, }, + eips::eip2718::Encodable2718, network::TxSigner, primitives::{Address, TxKind, U256}, + rpc::types::mev::EthSendBundle, signers::Signature, }; +use signet_bundle::SignetEthBundle; use signet_test_utils::{ evm::test_sim_env, test_constants::*, users::{TEST_SIGNERS, TEST_USERS}, }; -use std::time::Instant; +use std::time::{Duration, Instant}; +/// Tests the case where multiple transactions from the same +/// sender with successive nonces are included in the same +/// simulation batch. +/// +/// It'll set up 2 transactions from the same sender with +/// the same nonce, and then 2 more transactions with the next +/// two nonces. It will then verify that the simulation +/// produces a block with 3 successful transactions. +#[tokio::test] +pub async fn successive_nonces() { + let builder = test_sim_env(Instant::now() + Duration::from_millis(200)); + + // Set up 4 sends from the same sender with successive nonces + let sender = &TEST_SIGNERS[0]; + let to = TEST_USERS[1]; + + for nonce in 0..4u64 { + let tx = signed_send_with_mfpg( + sender, + to, + U256::from(1000), + GWEI_TO_WEI as u128 * 10, + nonce.saturating_sub(1), // cute little way to duplicate nonce 0 + ) + .await; + builder.sim_items().add_tx(tx, 0); + } + + // Run the simulator + let built = builder.build().await; + + assert_eq!(built.transactions().len(), 3); + + // This asserts that the builder has sorted the transactions by nonce + assert!(built.transactions().windows(2).all(|w| { + let tx1 = w[0].as_eip1559().unwrap().tx().nonce; + let tx2 = w[1].as_eip1559().unwrap().tx().nonce; + tx1 < tx2 + })); +} + +/// This test simulates a transaction from each of the test signers, +/// with escalating priority fees, and asserts that the simulation +/// orders them correctly by priority fee. #[tokio::test] pub async fn complex_simulation() { - let builder = test_sim_env(Instant::now() + std::time::Duration::from_millis(200)); + let builder = test_sim_env(Instant::now() + Duration::from_millis(200)); // Set up 10 simple sends with escalating priority fee for (i, sender) in TEST_SIGNERS.iter().enumerate() { @@ -24,15 +73,18 @@ pub async fn complex_simulation() { TEST_USERS[i], U256::from(1000), (10 - i) as u128 * GWEI_TO_WEI as u128, + 0, ) .await; - builder.sim_items().add_tx(tx.try_into_recovered().unwrap(), 0); + builder.sim_items().add_tx(tx, 0); } - // Set up the simulator + // Run the simulator let built = builder.build().await; - assert!(!built.transactions().is_empty()); + // Should be 10 if all sim rounds ran, however on CI this can be flaky + // (due to lower resources?), so we assert at least 5 succeeded. + assert!(built.transactions().len() >= 5); // This asserts that the builder has sorted the transactions by priority // fee. @@ -43,6 +95,69 @@ pub async fn complex_simulation() { })); } +/// Test the siulator correctly handles bundle future validity. +/// This will make a bundle, with 2 txs from different senders. One tx will +/// have nonce 0, while the other will have nonce 1. We will also ingest a tx +/// from the second sender with nonce 0 into the simcache. +/// +/// The simulator should output a block containing all 3 transactions. First +/// the solo tx, then the bundle txns. +#[tokio::test] +async fn test_bundle_future_validity() { + signet_test_utils::init_tracing(); + + let builder = test_sim_env(Instant::now() + Duration::from_millis(200)); + + let sender_0 = &TEST_SIGNERS[0]; + let sender_1 = &TEST_SIGNERS[1]; + + let to = TEST_USERS[2]; + + let bare_tx = + signed_send_with_mfpg(sender_0, to, U256::from(1000), GWEI_TO_WEI as u128 * 10, 0).await; + let bundle_tx_0 = + signed_send_with_mfpg(sender_0, to, U256::from(1000), GWEI_TO_WEI as u128 * 10, 1) + .await + .encoded_2718() + .into(); + let bundle_tx_1 = + signed_send_with_mfpg(sender_1, to, U256::from(1000), GWEI_TO_WEI as u128 * 10, 0) + .await + .encoded_2718() + .into(); + + let bundle = SignetEthBundle { + bundle: EthSendBundle { + txs: vec![bundle_tx_0, bundle_tx_1], + replacement_uuid: Some(Default::default()), + ..Default::default() + }, + host_txs: vec![], + }; + + // Add the bundle and bare tx to the simulator + builder.sim_items().add_bundle(bundle, 0).unwrap(); + + // Run the simulator + let cache = builder.sim_items().clone(); + let build_task = tokio::spawn(async move { builder.build().await }); + + // We will inject the bare tx after a short delay to ensure + // it is added during the simulation. This checks that the bundle is + // simulated as "Validity::Future" at least once before the tx is added. + tokio::time::sleep(Duration::from_millis(50)).await; + cache.add_tx(bare_tx, 0); + + let built = build_task.await.unwrap(); + + assert_eq!(built.transactions().len(), 3); + assert_eq!(built.transactions()[0].nonce(), 0); + // Bundle order is preserved + assert_eq!(built.transactions()[1].nonce(), 1); + assert_eq!(built.transactions()[2].nonce(), 0); + assert_eq!(built.transactions()[0].signer(), built.transactions()[1].signer()); +} + // utilities below this point are reproduced from other places, however, // because this test modifies the _db_ rather than the _evm_, // we need to handle them slightly differently here. @@ -50,9 +165,9 @@ pub async fn complex_simulation() { /// Modify an account with a closure and commit the modified account. /// /// This code is reproduced and modified from trevm -fn send_with_mfpg(to: Address, value: U256, mpfpg: u128) -> TxEip1559 { +fn send_with_mfpg(to: Address, value: U256, mpfpg: u128, nonce: u64) -> TxEip1559 { TxEip1559 { - nonce: 0, + nonce, gas_limit: 21_000, to: TxKind::Call(to), value, @@ -68,9 +183,10 @@ async fn signed_send_with_mfpg>( to: Address, value: U256, mpfpg: u128, -) -> TxEnvelope { - let mut tx = send_with_mfpg(to, value, mpfpg); + nonce: u64, +) -> Recovered { + let mut tx = send_with_mfpg(to, value, mpfpg, nonce); let res = from.sign_transaction(&mut tx).await.unwrap(); - Signed::new_unhashed(tx, res).into() + TxEnvelope::from(Signed::new_unhashed(tx, res)).try_into_recovered().unwrap() } diff --git a/crates/types/src/primitives/block.rs b/crates/types/src/primitives/block.rs index cf9bc1a6..8545703c 100644 --- a/crates/types/src/primitives/block.rs +++ b/crates/types/src/primitives/block.rs @@ -146,6 +146,26 @@ impl SealedBlock { pub const fn new_unchecked(header: SealedHeader, body: AlloyBlockBody) -> Self { Self { header, body } } + + /// Create a new empty sealed block for testing. + #[doc(hidden)] + pub fn blank_for_testing() -> Self + where + H: Default, + { + Self { header: SealedHeader::new(H::default()), body: AlloyBlockBody::default() } + } + + /// Create a new empty sealed block with the given header for testing. + #[doc(hidden)] + pub fn blank_with_header(header: H) -> Self { + Self { header: SealedHeader::new(header), body: AlloyBlockBody::default() } + } + + /// Get the transactions in the block. + fn transactions(&self) -> &[T] { + &self.body.transactions + } } impl BlockHeader for SealedBlock { @@ -248,6 +268,26 @@ impl RecoveredBlock { pub const fn new(block: SealedBlock, senders: Vec
) -> Self { Self { block, senders } } + + /// Create a new empty recovered block for testing. + #[doc(hidden)] + pub fn blank_for_testing() -> Self + where + H: Default, + { + Self { block: SealedBlock::blank_for_testing(), senders: Vec::new() } + } + + /// Create a new empty recovered block with the given header for testing. + #[doc(hidden)] + pub fn blank_with_header(header: H) -> Self { + Self { block: SealedBlock::blank_with_header(header), senders: Vec::new() } + } + + /// Get the transactions in the block. + pub fn transactions(&self) -> &[T] { + self.block.transactions() + } } impl BlockHeader for RecoveredBlock {