Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ edition = "2021"
description = "Create Parallel EVM"

[dependencies]
revm = "14.0.0"
revm = { package = "revm", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use the custom revm project

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At present, we need to add serde traits to some basic types of revm for test.
In the future, there will be more places where we need to apply patches (e.g. add some debug log) to the revm version we are using.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe consider maintainning revm repo in Galxe organization.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe consider maintainning revm repo in Galxe organization.

agree, let's do it

revm-primitives = { package = "revm-primitives", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" }
fastrace = "0.7"
tracing = "0.1.40"
ahash = { version = "0.8.11", features = ["serde"] }
Expand Down Expand Up @@ -33,7 +34,7 @@ criterion = "0.5.1"
metrics-util = "0.17.0"
walkdir = "2.5.0"
rayon = "1.10.0"
revme = "0.10.3"
revme = { package = "revme", git = "https://github.com/nekomoto911/revm", branch = "v14.0.0" }
fastrace = { version = "0.7", features = ["enable"] }
fastrace-jaeger = "0.7"
tikv-jemallocator = "0.5.0"
Expand Down
6 changes: 4 additions & 2 deletions benches/gigagas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ fn bench(c: &mut Criterion, name: &str, db: InMemoryDB, txs: Vec<TxEnv>) {
let root = Span::root(format!("{name} Grevm Parallel"), SpanContext::random());
let _guard = root.set_local_parent();
metrics::with_local_recorder(&recorder, || {
let executor = GrevmScheduler::new(
let mut executor = GrevmScheduler::new(
black_box(SpecId::LATEST),
black_box(env.clone()),
black_box(db.clone()),
black_box(txs.clone()),
None,
);
let _ = executor.parallel_execute();

Expand All @@ -92,11 +93,12 @@ fn bench(c: &mut Criterion, name: &str, db: InMemoryDB, txs: Vec<TxEnv>) {
let root = Span::root(format!("{name} Grevm Sequential"), SpanContext::random());
let _guard = root.set_local_parent();
metrics::with_local_recorder(&recorder, || {
let executor = GrevmScheduler::new(
let mut executor = GrevmScheduler::new(
black_box(SpecId::LATEST),
black_box(env.clone()),
black_box(db.clone()),
black_box(txs.clone()),
None,
);
let _ = executor.force_sequential_execute();

Expand Down
6 changes: 4 additions & 2 deletions benches/mainnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,25 @@ fn benchmark_mainnet(c: &mut Criterion) {

group.bench_function("Grevm Parallel", |b| {
b.iter(|| {
let executor = GrevmScheduler::new(
let mut executor = GrevmScheduler::new(
black_box(spec_id),
black_box(env.clone()),
black_box(db.clone()),
black_box(txs.clone()),
None,
);
executor.parallel_execute()
})
});

group.bench_function("Grevm Sequential", |b| {
b.iter(|| {
let executor = GrevmScheduler::new(
let mut executor = GrevmScheduler::new(
black_box(spec_id),
black_box(env.clone()),
black_box(db.clone()),
black_box(txs.clone()),
None,
);
executor.force_sequential_execute()
})
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use tokio::runtime::{Builder, Runtime};
mod hint;
mod partition;
mod scheduler;
mod storage;
/// Manages storage-related operations.
pub mod storage;
mod tx_dependency;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};

Expand Down
82 changes: 40 additions & 42 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
fork_join_util,
hint::ParallelExecutionHints,
partition::PartitionExecutor,
storage::{LazyUpdateValue, SchedulerDB},
storage::{LazyUpdateValue, SchedulerDB, State},
tx_dependency::{DependentTxsVec, TxDependency},
GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES,
GREVM_RUNTIME, MAX_NUM_ROUND,
Expand All @@ -18,11 +18,10 @@ use std::{
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use metrics::{counter, gauge};
use revm::{
db::{states::bundle_state::BundleRetention, BundleState},
primitives::{
AccountInfo, Address, Bytecode, EVMError, Env, ExecutionResult, SpecId, TxEnv, B256, U256,
},
CacheState, DatabaseRef, EvmBuilder,
CacheState, DatabaseCommit, DatabaseRef, EvmBuilder,
};
use tracing::info;

Expand Down Expand Up @@ -70,8 +69,6 @@ struct ExecuteMetrics {
commit_transition_time: metrics::Counter,
/// Time taken to execute transactions in sequential(in nanoseconds).
sequential_execute_time: metrics::Counter,
/// Time taken to build output(in nanoseconds).
build_output_time: metrics::Counter,
}

impl Default for ExecuteMetrics {
Expand All @@ -97,16 +94,13 @@ impl Default for ExecuteMetrics {
merge_write_set_time: counter!("grevm.merge_write_set_time"),
commit_transition_time: counter!("grevm.commit_transition_time"),
sequential_execute_time: counter!("grevm.sequential_execute_time"),
build_output_time: counter!("grevm.build_output_time"),
}
}
}

/// The output of the execution of a block.
#[derive(Debug)]
pub struct ExecuteOutput {
/// The changed state of the block after execution.
pub state: BundleState,
/// All the results of the transactions in the block.
pub results: Vec<ExecutionResult>,
}
Expand Down Expand Up @@ -161,7 +155,7 @@ where
/// The database utilized by the scheduler.
/// It is shared among the partition executors,
/// allowing them to read the final state from previous rounds.
database: Arc<SchedulerDB<DB>>,
pub database: Arc<SchedulerDB<DB>>,

/// The dependency relationship between transactions.
/// Used to construct the next round of transaction partitions.
Expand Down Expand Up @@ -208,12 +202,14 @@ impl<Error> DatabaseRef for DatabaseWrapper<Error> {
}
}

/// Creates a new GrevmScheduler instance.
/// Creates a new GrevmScheduler instance using DB type without 'static constraint.
/// If `state` is not None, it will be used as the initial state before the block execution.
pub fn new_grevm_scheduler<DB>(
spec_id: SpecId,
env: Env,
db: DB,
txs: Vec<TxEnv>,
state: Option<Box<State>>,
) -> GrevmScheduler<DatabaseWrapper<DB::Error>>
where
DB: DatabaseRef + Send + Sync,
Expand All @@ -230,7 +226,7 @@ where
>(boxed)
};
let db: DatabaseWrapper<DB::Error> = DatabaseWrapper(db);
GrevmScheduler::new(spec_id, env, db, Arc::new(txs))
GrevmScheduler::new(spec_id, env, db, Arc::new(txs), state)
}

impl<DB> GrevmScheduler<DB>
Expand All @@ -239,8 +235,14 @@ where
DB::Error: Send + Sync,
{
/// Creates a new GrevmScheduler instance.
#[fastrace::trace]
pub fn new(spec_id: SpecId, env: Env, db: DB, txs: Arc<Vec<TxEnv>>) -> Self {
/// If `state` is not None, it will be used as the initial state before the block execution.
pub fn new(
spec_id: SpecId,
env: Env,
db: DB,
txs: Arc<Vec<TxEnv>>,
state: Option<Box<State>>,
) -> Self {
let coinbase = env.block.coinbase;
let num_partitions = *CPU_CORES * 2 + 1; // 2 * cpu + 1 for initial partition number
let num_txs = txs.len();
Expand All @@ -250,7 +252,7 @@ where
env,
coinbase,
txs,
database: Arc::new(SchedulerDB::new(db)),
database: Arc::new(SchedulerDB::new(state.unwrap_or_default(), db)),
tx_dependencies: TxDependency::new(num_txs),
tx_states: Arc::new(vec![TxState::new(); num_txs]),
num_partitions,
Expand Down Expand Up @@ -303,13 +305,18 @@ where
}

let start = Instant::now();
GREVM_RUNTIME.block_on(async {
let mut tasks = vec![];
for executor in &self.partition_executors {
let executor = executor.clone();
tasks.push(GREVM_RUNTIME.spawn(async move { executor.write().unwrap().execute() }));
}
futures::future::join_all(tasks).await;
// Do not block tokio runtime if we are in async context
tokio::task::block_in_place(|| {
GREVM_RUNTIME.block_on(async {
let mut tasks = vec![];
for executor in &self.partition_executors {
let executor = executor.clone();
tasks.push(
GREVM_RUNTIME.spawn(async move { executor.write().unwrap().execute() }),
);
}
futures::future::join_all(tasks).await;
})
});
self.metrics.parallel_execute_time.increment(start.elapsed().as_nanos() as u64);

Expand Down Expand Up @@ -501,7 +508,7 @@ where
let database = Arc::get_mut(&mut self.database).unwrap();
if self.num_finality_txs < self.txs.len() {
// Merging these states is only useful when there is a next round of execution.
Self::merge_not_modified_state(&mut database.cache, partition_state);
Self::merge_not_modified_state(&mut database.state.cache, partition_state);
}

#[allow(invalid_reference_casting)]
Expand Down Expand Up @@ -599,21 +606,6 @@ where
Ok(())
}

#[fastrace::trace]
fn build_output(&mut self) -> ExecuteOutput {
let start = Instant::now();
// MUST drop the `PartitionExecutor::scheduler_db` before get mut
self.partition_executors.clear();
let database = Arc::get_mut(&mut self.database).unwrap();
database.merge_transitions(BundleRetention::Reverts);
let output = ExecuteOutput {
state: std::mem::take(&mut database.bundle_state),
results: std::mem::take(&mut self.results),
};
self.metrics.build_output_time.increment(start.elapsed().as_nanos() as u64);
output
}

#[fastrace::trace]
fn parse_hints(&mut self) {
let start = Instant::now();
Expand Down Expand Up @@ -643,7 +635,7 @@ where

if self.txs.len() < self.num_partitions && !force_parallel {
self.execute_remaining_sequential()?;
return Ok(self.build_output());
return Ok(ExecuteOutput { results: std::mem::take(&mut self.results) });
}

if !force_sequential {
Expand All @@ -668,25 +660,31 @@ where
self.execute_remaining_sequential()?;
}

Ok(self.build_output())
Ok(ExecuteOutput { results: std::mem::take(&mut self.results) })
}

/// Execute transactions in parallel.
pub fn parallel_execute(mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
pub fn parallel_execute(&mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(None, true, None)
}

/// Execute transactions parallelly with or without hints.
pub fn force_parallel_execute(
mut self,
&mut self,
with_hints: bool,
num_partitions: Option<usize>,
) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(Some(false), with_hints, num_partitions)
}

/// Execute transactions sequentially.
pub fn force_sequential_execute(mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
pub fn force_sequential_execute(&mut self) -> Result<ExecuteOutput, GrevmError<DB::Error>> {
self.evm_execute(Some(true), false, None)
}

/// Take the state of the scheduler.
/// It is typically called after the execution.
pub fn take_state(self) -> Box<State> {
Arc::try_unwrap(self.database).ok().unwrap().state
}
}
Loading
Loading