diff --git a/benches/gigagas.rs b/benches/gigagas.rs index e78bad1..a51bc1d 100644 --- a/benches/gigagas.rs +++ b/benches/gigagas.rs @@ -16,6 +16,11 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use fastrace::{collector::Config, prelude::*}; use fastrace_jaeger::JaegerReporter; use grevm::GrevmScheduler; +use metrics::{SharedString, Unit}; +use metrics_util::{ + debugging::{DebugValue, DebuggingRecorder}, + CompositeKey, MetricKind, +}; use rand::Rng; use revm::primitives::{alloy_primitives::U160, Address, Env, SpecId, TransactTo, TxEnv, U256}; use std::{collections::HashMap, sync::Arc}; @@ -25,49 +30,96 @@ const GIGA_GAS: u64 = 1_000_000_000; #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; +fn get_metrics_counter_value( + snapshot: &HashMap, Option, DebugValue)>, + name: &'static str, +) -> u64 { + match snapshot + .get(&CompositeKey::new(MetricKind::Counter, metrics::Key::from_static_name(name))) + { + Some((_, _, DebugValue::Counter(value))) => *value, + _ => panic!("{:?} not found", name), + } +} + fn bench(c: &mut Criterion, name: &str, db: InMemoryDB, txs: Vec) { let mut env = Env::default(); env.cfg.chain_id = NamedChain::Mainnet.into(); env.block.coinbase = Address::from(U160::from(common::MINER_ADDRESS)); let db = Arc::new(db); + let txs = Arc::new(txs); - let mut group = c.benchmark_group(name); + let mut group = c.benchmark_group(format!("{}({} txs)", name, txs.len())); group.bench_function("Origin Sequential", |b| { b.iter(|| { common::execute_revm_sequential( black_box(db.clone()), black_box(SpecId::LATEST), black_box(env.clone()), - black_box(txs.clone()), + black_box(&*txs), ) }) }); + + let mut num_iter: usize = 0; + let mut execution_time_ns: u64 = 0; group.bench_function("Grevm Parallel", |b| { b.iter(|| { + num_iter += 1; + let recorder = DebuggingRecorder::new(); let root = Span::root(format!("{name} Grevm Parallel"), SpanContext::random()); let _guard = root.set_local_parent(); - let executor = GrevmScheduler::new( - black_box(SpecId::LATEST), - black_box(env.clone()), - black_box(db.clone()), - black_box(txs.clone()), - ); - executor.parallel_execute() + metrics::with_local_recorder(&recorder, || { + let executor = GrevmScheduler::new( + black_box(SpecId::LATEST), + black_box(env.clone()), + black_box(db.clone()), + black_box(txs.clone()), + ); + let _ = executor.parallel_execute(); + + let snapshot = recorder.snapshotter().snapshot().into_hashmap(); + execution_time_ns += + get_metrics_counter_value(&snapshot, "grevm.parallel_execute_time"); + execution_time_ns += get_metrics_counter_value(&snapshot, "grevm.validate_time"); + }); }) }); + println!( + "{} Grevm Parallel average execution time: {:.2} ms", + name, + execution_time_ns as f64 / num_iter as f64 / 1000000.0 + ); + + let mut num_iter: usize = 0; + let mut execution_time_ns: u64 = 0; group.bench_function("Grevm Sequential", |b| { b.iter(|| { + num_iter += 1; + let recorder = DebuggingRecorder::new(); let root = Span::root(format!("{name} Grevm Sequential"), SpanContext::random()); let _guard = root.set_local_parent(); - let executor = GrevmScheduler::new( - black_box(SpecId::LATEST), - black_box(env.clone()), - black_box(db.clone()), - black_box(txs.clone()), - ); - executor.force_sequential_execute() + metrics::with_local_recorder(&recorder, || { + let executor = GrevmScheduler::new( + black_box(SpecId::LATEST), + black_box(env.clone()), + black_box(db.clone()), + black_box(txs.clone()), + ); + let _ = executor.force_sequential_execute(); + + let snapshot = recorder.snapshotter().snapshot().into_hashmap(); + execution_time_ns += + get_metrics_counter_value(&snapshot, "grevm.sequential_execute_time"); + }); }) }); + println!( + "{} Grevm Sequential average execution time: {:.2} ms", + name, + execution_time_ns as f64 / num_iter as f64 / 1000000.0 + ); + group.finish(); } @@ -96,15 +148,19 @@ fn bench_raw_transfers(c: &mut Criterion, db_latency_us: u64) { ); } -fn get_account_idx(num_eoa: usize, hot_start_idx: usize, hot_ratio: f64) -> usize { +fn pick_account_idx(num_eoa: usize, hot_ratio: f64) -> usize { if hot_ratio <= 0.0 { // Uniform workload - rand::random::() % num_eoa - } else if rand::thread_rng().gen_range(0.0..1.0) < hot_ratio { + return rand::random::() % num_eoa; + } + + // Let `hot_ratio` of transactions conducted by 10% of hot accounts + let hot_start_idx = (num_eoa as f64 * 0.9) as usize; + if rand::thread_rng().gen_range(0.0..1.0) < hot_ratio { // Access hot hot_start_idx + rand::random::() % (num_eoa - hot_start_idx) } else { - rand::random::() % (num_eoa - hot_start_idx) + rand::random::() % hot_start_idx } } @@ -119,9 +175,6 @@ fn bench_dependent_raw_transfers( let mut db = InMemoryDB::new(accounts, Default::default(), Default::default()); db.latency_us = db_latency_us; - // Let 10% of the accounts be hot accounts - let hot_start_idx = common::START_ADDRESS + (num_eoa as f64 * 0.9) as usize; - bench( c, "Dependent Raw Transfers", @@ -129,10 +182,10 @@ fn bench_dependent_raw_transfers( (0..block_size) .map(|_| { let from = Address::from(U160::from( - common::START_ADDRESS + get_account_idx(num_eoa, hot_start_idx, hot_ratio), + common::START_ADDRESS + pick_account_idx(num_eoa, hot_ratio), )); let to = Address::from(U160::from( - common::START_ADDRESS + get_account_idx(num_eoa, hot_start_idx, hot_ratio), + common::START_ADDRESS + pick_account_idx(num_eoa, hot_ratio), )); TxEnv { caller: from, @@ -203,12 +256,9 @@ fn benchmark_dependent_erc20( let mut txs = Vec::with_capacity(block_size); let sca = sca[0]; - // Let 10% of the accounts be hot accounts - let hot_start_idx = common::START_ADDRESS + (num_eoa as f64 * 0.9) as usize; - for _ in 0..block_size { - let from = eoa[get_account_idx(num_eoa, hot_start_idx, hot_ratio)]; - let to = eoa[get_account_idx(num_eoa, hot_start_idx, hot_ratio)]; + let from = eoa[pick_account_idx(num_eoa, hot_ratio)]; + let to = eoa[pick_account_idx(num_eoa, hot_ratio)]; let tx = TxEnv { caller: from, transact_to: TransactTo::Call(sca), @@ -250,19 +300,15 @@ fn bench_hybrid(c: &mut Criterion, db_latency_us: u64, num_eoa: usize, hot_ratio (GIGA_GAS as f64 * 0.2 / erc20::ESTIMATED_GAS_USED as f64).ceil() as usize; let num_uniswap = (GIGA_GAS as f64 * 0.2 / uniswap::ESTIMATED_GAS_USED as f64).ceil() as usize; - // Let 10% of the accounts be hot accounts - let hot_start_idx = common::START_ADDRESS + (num_eoa as f64 * 0.9) as usize; let mut state = common::mock_block_accounts(common::START_ADDRESS, num_eoa); let eoa_addresses = state.keys().cloned().collect::>(); let mut txs = Vec::with_capacity(num_native_transfer + num_erc20_transfer + num_uniswap); for _ in 0..num_native_transfer { - let from = Address::from(U160::from( - common::START_ADDRESS + get_account_idx(num_eoa, hot_start_idx, hot_ratio), - )); - let to = Address::from(U160::from( - common::START_ADDRESS + get_account_idx(num_eoa, hot_start_idx, hot_ratio), - )); + let from = + Address::from(U160::from(common::START_ADDRESS + pick_account_idx(num_eoa, hot_ratio))); + let to = + Address::from(U160::from(common::START_ADDRESS + pick_account_idx(num_eoa, hot_ratio))); let tx = TxEnv { caller: from, transact_to: TransactTo::Call(to), @@ -280,10 +326,10 @@ fn bench_hybrid(c: &mut Criterion, db_latency_us: u64, num_eoa: usize, hot_ratio for (sca_addr, _) in erc20_contract_accounts.iter() { for _ in 0..(num_erc20_transfer / NUM_ERC20_SCA) { let from = Address::from(U160::from( - common::START_ADDRESS + get_account_idx(num_eoa, hot_start_idx, hot_ratio), + common::START_ADDRESS + pick_account_idx(num_eoa, hot_ratio), )); let to = Address::from(U160::from( - common::START_ADDRESS + get_account_idx(num_eoa, hot_start_idx, hot_ratio), + common::START_ADDRESS + pick_account_idx(num_eoa, hot_ratio), )); let tx = TxEnv { caller: from, @@ -315,7 +361,7 @@ fn bench_hybrid(c: &mut Criterion, db_latency_us: u64, num_eoa: usize, hot_ratio txs.push(TxEnv { caller: Address::from(U160::from( - common::START_ADDRESS + get_account_idx(num_eoa, hot_start_idx, hot_ratio), + common::START_ADDRESS + pick_account_idx(num_eoa, hot_ratio), )), gas_limit: uniswap::GAS_LIMIT, gas_price: U256::from(0xb2d05e07u64), diff --git a/benches/mainnet.rs b/benches/mainnet.rs index cac2138..1673728 100644 --- a/benches/mainnet.rs +++ b/benches/mainnet.rs @@ -32,6 +32,7 @@ fn benchmark_mainnet(c: &mut Criterion) { } _ => panic!("Missing transaction data"), }; + let txs = Arc::new(txs); let mut env = Env::default(); env.cfg.chain_id = NamedChain::Mainnet.into(); @@ -45,7 +46,7 @@ fn benchmark_mainnet(c: &mut Criterion) { black_box(db.clone()), black_box(spec_id), black_box(env.clone()), - black_box(txs.clone()), + black_box(&*txs), ) }) }); diff --git a/src/scheduler.rs b/src/scheduler.rs index a9c3417..a18dd34 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -59,14 +59,16 @@ struct ExecuteMetrics { parse_hints_time: metrics::Counter, /// Time taken to partition transactions(in nanoseconds). partition_tx_time: metrics::Counter, - /// Time taken to validate transactions(in nanoseconds). + /// Time taken to execute transactions in parallel(in nanoseconds). parallel_execute_time: metrics::Counter, - /// Time taken to execute + /// Time taken to validate transactions(in nanoseconds). validate_time: metrics::Counter, /// Time taken to merge write set. merge_write_set_time: metrics::Counter, /// Time taken to commit transition commit_transition_time: metrics::Counter, + /// Time taken to execute transactions in sequential(in nanoseconds). + sequential_execute_time: metrics::Counter, /// Time taken to build output(in nanoseconds). build_output_time: metrics::Counter, } @@ -93,6 +95,7 @@ impl Default for ExecuteMetrics { validate_time: counter!("grevm.validate_time"), merge_write_set_time: counter!("grevm.merge_write_set_time"), commit_transition_time: counter!("grevm.commit_transition_time"), + sequential_execute_time: counter!("grevm.sequential_execute_time"), build_output_time: counter!("grevm.build_output_time"), } } @@ -226,7 +229,7 @@ where >(boxed) }; let db: DatabaseWrapper = DatabaseWrapper(db); - GrevmScheduler::new(spec_id, env, db, txs) + GrevmScheduler::new(spec_id, env, db, Arc::new(txs)) } impl GrevmScheduler @@ -235,7 +238,7 @@ where DB::Error: Send + Sync, { /// Creates a new GrevmScheduler instance. - pub fn new(spec_id: SpecId, env: Env, db: DB, txs: Vec) -> Self { + pub fn new(spec_id: SpecId, env: Env, db: DB, txs: Arc>) -> Self { let coinbase = env.block.coinbase; let num_partitions = *CPU_CORES * 2 + 1; // 2 * cpu + 1 for initial partition number let num_txs = txs.len(); @@ -244,7 +247,7 @@ where spec_id, env, coinbase, - txs: Arc::new(txs), + txs, database: Arc::new(SchedulerDB::new(db)), tx_dependencies: TxDependency::new(num_txs), tx_states: Arc::new(vec![TxState::new(); num_txs]), @@ -561,6 +564,7 @@ where /// Fall back to sequential execution for the remaining transactions. #[fastrace::trace] fn execute_remaining_sequential(&mut self) -> Result<(), GrevmError> { + let start = Instant::now(); self.metrics.sequential_execute_calls.increment(1); self.metrics.sequential_tx_cnt.increment((self.txs.len() - self.num_finality_txs) as u64); // MUST drop the `PartitionExecutor::scheduler_db` before get mut @@ -585,6 +589,7 @@ where Err(err) => return Err(GrevmError::EvmError(err)), } } + self.metrics.sequential_execute_time.increment(start.elapsed().as_nanos() as u64); Ok(()) } diff --git a/tests/common/execute.rs b/tests/common/execute.rs index 87ed5ed..095ed96 100644 --- a/tests/common/execute.rs +++ b/tests/common/execute.rs @@ -3,7 +3,7 @@ use alloy_rpc_types::Block; use metrics_util::debugging::{DebugValue, DebuggingRecorder}; use alloy_chains::NamedChain; -use grevm::{ExecuteOutput, GrevmError, GrevmScheduler}; +use grevm::{ExecuteOutput, GrevmScheduler}; use revm::{ db::{ states::{bundle_state::BundleRetention, StorageSlot}, @@ -113,17 +113,17 @@ pub(crate) fn compare_evm_execute( env.cfg.chain_id = NamedChain::Mainnet.into(); env.block.coinbase = Address::from(U160::from(MINER_ADDRESS)); let db = Arc::new(db); + let txs = Arc::new(txs); let start = Instant::now(); let sequential = GrevmScheduler::new(SpecId::LATEST, env.clone(), db.clone(), txs.clone()); let sequential_result = sequential.force_sequential_execute(); println!("Grevm sequential execute time: {}ms", start.elapsed().as_millis()); - let mut parallel_result = Err(GrevmError::UnreachableError(String::from("Init"))); - metrics::with_local_recorder(&recorder, || { + let parallel_result = metrics::with_local_recorder(&recorder, || { let start = Instant::now(); let parallel = GrevmScheduler::new(SpecId::LATEST, env.clone(), db.clone(), txs.clone()); // set determined partitions - parallel_result = parallel.force_parallel_execute(with_hints, Some(23)); + let parallel_result = parallel.force_parallel_execute(with_hints, Some(23)); println!("Grevm parallel execute time: {}ms", start.elapsed().as_millis()); let snapshot = recorder.snapshotter().snapshot(); @@ -133,10 +133,11 @@ pub(crate) fn compare_evm_execute( assert_eq!(*metric, value); } } + parallel_result }); let start = Instant::now(); - let reth_result = execute_revm_sequential(db.clone(), SpecId::LATEST, env.clone(), txs.clone()); + let reth_result = execute_revm_sequential(db.clone(), SpecId::LATEST, env.clone(), &*txs); println!("Origin sequential execute time: {}ms", start.elapsed().as_millis()); let mut max_gas_spent = 0; @@ -176,7 +177,7 @@ pub(crate) fn execute_revm_sequential( db: DB, spec_id: SpecId, env: Env, - txs: Vec, + txs: &[TxEnv], ) -> Result> where DB: DatabaseRef, @@ -192,7 +193,7 @@ where let mut results = Vec::with_capacity(txs.len()); for tx in txs { - *evm.tx_mut() = tx; + *evm.tx_mut() = tx.clone(); let result_and_state = evm.transact()?; evm.db_mut().commit(result_and_state.state); results.push(result_and_state.result); diff --git a/tests/ethereum/main.rs b/tests/ethereum/main.rs index 1d97aa2..728af3d 100644 --- a/tests/ethereum/main.rs +++ b/tests/ethereum/main.rs @@ -9,6 +9,8 @@ // "re-testing". // - Help outline the minimal state commitment logic for pevm. +#![allow(missing_docs)] + use crate::common::storage::InMemoryDB; use alloy_chains::NamedChain; use grevm::{GrevmError, GrevmScheduler}; @@ -25,7 +27,7 @@ use revme::cmd::statetest::{ models::{Env, SpecName, TestSuite, TestUnit, TransactionParts, TxPartIndices}, utils::recover_address, }; -use std::{collections::HashMap, fs, path::Path}; +use std::{collections::HashMap, fs, path::Path, sync::Arc}; use walkdir::{DirEntry, WalkDir}; #[path = "../common/mod.rs"] @@ -137,7 +139,7 @@ fn run_test_unit(path: &Path, unit: TestUnit) { match ( test.expect_exception.as_deref(), - GrevmScheduler::new(spec_name.to_spec_id(), env, db.clone(), vec![tx_env.unwrap()]) + GrevmScheduler::new(spec_name.to_spec_id(), env, db.clone(), Arc::new(vec![tx_env.unwrap()])) .parallel_execute(), ) { // EIP-2681 diff --git a/tests/mainnet.rs b/tests/mainnet.rs index a83b402..3d6820a 100644 --- a/tests/mainnet.rs +++ b/tests/mainnet.rs @@ -1,10 +1,12 @@ +#![allow(missing_docs)] + mod common; use std::sync::Arc; use alloy_chains::NamedChain; use alloy_rpc_types::{Block, BlockTransactions}; use common::{compat, storage::InMemoryDB}; -use grevm::{GrevmError, GrevmScheduler}; +use grevm::GrevmScheduler; use metrics_util::debugging::DebuggingRecorder; use revm::primitives::{Env, TxEnv}; @@ -15,6 +17,7 @@ fn test_execute_alloy(block: Block, db: InMemoryDB) { BlockTransactions::Full(txs) => txs.into_iter().map(|tx| compat::get_tx_env(tx)).collect(), _ => panic!("Missing transaction data"), }; + let txs = Arc::new(txs); let mut env = Env::default(); env.cfg.chain_id = NamedChain::Mainnet.into(); @@ -22,20 +25,19 @@ fn test_execute_alloy(block: Block, db: InMemoryDB) { let db = Arc::new(db); - let reth_result = - common::execute_revm_sequential(db.clone(), spec_id, env.clone(), txs.clone()); + let reth_result = common::execute_revm_sequential(db.clone(), spec_id, env.clone(), &*txs); // create registry for metrics let recorder = DebuggingRecorder::new(); - let mut parallel_result = Err(GrevmError::UnreachableError(String::from("Init"))); - metrics::with_local_recorder(&recorder, || { + let parallel_result = metrics::with_local_recorder(&recorder, || { let executor = GrevmScheduler::new(spec_id, env, db, txs); - parallel_result = executor.force_parallel_execute(true, Some(23)); + let parallel_result = executor.force_parallel_execute(true, Some(23)); let snapshot = recorder.snapshotter().snapshot(); for (key, unit, desc, value) in snapshot.into_vec() { println!("metrics: {} => value: {:?}", key.key().name(), value); } + parallel_result }); common::compare_execution_result( diff --git a/tests/native_transfers.rs b/tests/native_transfers.rs index 9f68ee6..5f8dcab 100644 --- a/tests/native_transfers.rs +++ b/tests/native_transfers.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + pub mod common; use crate::common::{MINER_ADDRESS, START_ADDRESS};