Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grevm(opt): optimize cache state #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use lazy_static::lazy_static;
use revm::primitives::{Address, EVMError, EVMResultGeneric, ExecutionResult, U256};
use revm::TransitionAccount;
use revm::primitives::{Address, EVMError, EVMResultGeneric, EvmState, ExecutionResult, U256};
use std::cmp::min;
use std::fmt::{Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -13,13 +12,14 @@ mod storage;
mod tx_dependency;

lazy_static! {
static ref CPU_CORES: usize = thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
static ref CPU_CORES: usize =
std::thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
}

lazy_static! {
static ref GREVM_RUNTIME: Runtime = Builder::new_multi_thread()
// .worker_threads(1) // for debug
.worker_threads(thread::available_parallelism().map(|n| n.get() * 2).unwrap_or(8))
.worker_threads(std::thread::available_parallelism().map(|n| n.get() * 2).unwrap_or(8))
.thread_name("grevm-tokio-runtime")
.enable_all()
.build()
Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct ResultAndTransition {
/// Status of execution
pub result: Option<ExecutionResult>,
/// State that got updated
pub transition: Vec<(Address, TransitionAccount)>,
pub transition: EvmState,
/// Rewards to miner
pub rewards: u128,
}
Expand Down
11 changes: 8 additions & 3 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ where
if read_set.is_disjoint(&update_write_set) {
self.read_set.push(read_set);
self.write_set.push(write_set);
evm.db_mut().temporary_commit_transition(&execute_state.transition);
// All accounts should be present inside cache.
for address in execute_state.transition.keys() {
// FIXME(gravity): error handling
let _ = evm.db_mut().load_cache_account(*address);
}
evm.db_mut().temporary_commit(&execute_state.transition);
self.execute_results.push(Ok(execute_state));
should_rerun = false;
self.metrics.reusable_tx_cnt += 1;
Expand All @@ -182,10 +187,10 @@ where
result_and_state.state.remove(&self.coinbase);
}
// temporary commit to cache_db, to make use the remaining txs can read the updated data
let transition = evm.db_mut().temporary_commit(result_and_state.state);
evm.db_mut().temporary_commit(&result_and_state.state);
self.execute_results.push(Ok(ResultAndTransition {
result: Some(result_and_state.result),
transition,
transition: result_and_state.state,
rewards: rewards.unwrap_or(0),
}));
}
Expand Down
89 changes: 53 additions & 36 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,19 @@ use crate::hint::ParallelExecutionHints;
use crate::partition::{
OrderedVectorExt, PartitionExecutor, PreRoundContext, PreUnconfirmedContext,
};
use crate::storage::SchedulerDB;
use crate::storage::{SchedulerDB, StateCache};
use crate::tx_dependency::{DependentTxsVec, TxDependency};
use crate::{
fork_join_util, GrevmError, LocationAndType, PartitionId, TxId, CPU_CORES, GREVM_RUNTIME,
MAX_NUM_ROUND,
fork_join_util, GrevmError, LocationAndType, TxId, CPU_CORES, GREVM_RUNTIME, MAX_NUM_ROUND,
};

use metrics::{counter, gauge, histogram};
use metrics::{counter, gauge};
use revm::db::states::bundle_state::BundleRetention;
use revm::db::BundleState;
use revm::primitives::{
AccountInfo, Address, Bytecode, EVMError, Env, ExecutionResult, SpecId, TxEnv, B256, U256,
};
use revm::{CacheState, DatabaseRef, EvmBuilder};
use revm::{DatabaseRef, EvmBuilder};

pub struct ExecuteMetrics {
/// Number of times parallel execution is called.
Expand Down Expand Up @@ -460,12 +459,20 @@ where
// update and pruning tx dependencies
self.update_and_pruning_dependency();

let partition_state: Vec<CacheState> = self
let partition_state: Vec<StateCache> = self
.partition_executors
.iter()
.map(|executor| {
let mut executor = executor.write().unwrap();
std::mem::take(&mut executor.partition_db.cache)
let mut cache = std::mem::take(&mut executor.partition_db.cache);
if !executor.partition_db.miner_involved {
// Remove miner account in partitions that only treat it as a reward account.
// We have preload it into scheduler state cache, and will handle rewards
// separately. Only when miner account participates in the transactions should
// it be merged into the scheduler state cache.
cache.accounts.remove(&self.coinbase);
}
cache
})
.collect();

Expand All @@ -475,14 +482,40 @@ where
drop(span);
let database = Arc::get_mut(&mut self.database).unwrap();

Self::merge_not_modified_state(&mut database.cache, partition_state);
if self.num_finality_txs == self.txs.len() {
// Merge partition state to scheduler directly if there are no conflicts in the entire
// partition.
let span = Span::enter_with_local_parent("directly merge accounts");
for state in partition_state {
database.cache.merge_accounts(state);
}

let mut rewards: u128 = 0;
for ctx in finality_txs.into_values() {
rewards += ctx.execute_state.rewards;
self.results.push(ctx.execute_state.result.unwrap());
}
database
.increment_balances(vec![(self.coinbase, rewards)])
.map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?;

self.metrics.validate_time.increment(start.elapsed().as_nanos() as u64);
return Ok(());
}

// Must merge loaded state to database cache before commit changes
let span = Span::enter_with_local_parent("merge loaded state");
for state in partition_state {
database.cache.merge_loaded_state(state);
}
std::mem::drop(span);

let span = Span::enter_with_local_parent("commit transition to scheduler db");
let mut rewards: u128 = 0;
for ctx in finality_txs.into_values() {
rewards += ctx.execute_state.rewards;
self.results.push(ctx.execute_state.result.unwrap());
database.commit_transition(ctx.execute_state.transition);
database.commit(&ctx.execute_state.transition);
}
// Each transaction updates three accounts: from, to, and coinbase.
// If every tx updates the coinbase account, it will cause conflicts across all txs.
Expand All @@ -498,27 +531,6 @@ where
Ok(())
}

/// Merge not modified state from partition to scheduler. These data are just loaded from
/// database, so we can merge them to state as original value for next round.
#[fastrace::trace]
fn merge_not_modified_state(state: &mut CacheState, partition_state: Vec<CacheState>) {
for partition in partition_state {
// merge account state that is not modified
for (address, account) in partition.accounts {
if account.status.is_not_modified() && state.accounts.get(&address).is_none() {
state.accounts.insert(address, account);
}
}

// merge contract code
for (hash, code) in partition.contracts {
if state.contracts.get(&hash).is_none() {
state.contracts.insert(hash, code);
}
}
}
}

#[fastrace::trace]
fn execute_remaining_sequential(&mut self) -> Result<(), GrevmError<DB::Error>> {
self.metrics.sequential_execute_calls.increment(1);
Expand All @@ -539,7 +551,7 @@ where
}
match evm.transact() {
Ok(result_and_state) => {
evm.db_mut().commit(result_and_state.state);
evm.db_mut().commit(&result_and_state.state);
self.results.push(result_and_state.result);
}
Err(err) => return Err(GrevmError::EvmError(err)),
Expand All @@ -562,13 +574,18 @@ where
// MUST drop the `PartitionExecutor::scheduler_db` before get mut
this.partition_executors.clear();
let database = Arc::get_mut(&mut this.database).unwrap();
database.merge_transitions(BundleRetention::Reverts);
ExecuteOutput {
state: std::mem::take(&mut database.bundle_state),
results: std::mem::take(&mut this.results),
}
let state = database.create_bundle_state(BundleRetention::Reverts);
ExecuteOutput { state, results: std::mem::take(&mut this.results) }
};

{
// Preload coinbase account
let db = Arc::get_mut(&mut self.database).unwrap();
let _ = db
.load_cache_account(self.coinbase)
.map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?;
}

if self.txs.len() < self.num_partitions && !force_parallel {
self.execute_remaining_sequential()?;
return Ok(build_execute_output(self));
Expand Down
Loading
Loading