Skip to content

Commit

Permalink
optimize state cache
Browse files Browse the repository at this point in the history
  • Loading branch information
nekomoto911 committed Oct 14, 2024
1 parent 7c4a46a commit c143d8d
Show file tree
Hide file tree
Showing 5 changed files with 416 additions and 212 deletions.
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use lazy_static::lazy_static;
use revm::primitives::{Address, EVMError, EVMResultGeneric, ExecutionResult, U256};
use revm::TransitionAccount;
use revm::primitives::{Address, EVMError, EVMResultGeneric, EvmState, ExecutionResult, U256};
use std::cmp::min;
use std::fmt::{Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -13,13 +12,14 @@ mod storage;
mod tx_dependency;

lazy_static! {
static ref CPU_CORES: usize = thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
static ref CPU_CORES: usize =
std::thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
}

lazy_static! {
static ref GREVM_RUNTIME: Runtime = Builder::new_multi_thread()
// .worker_threads(1) // for debug
.worker_threads(thread::available_parallelism().map(|n| n.get() * 2).unwrap_or(8))
.worker_threads(std::thread::available_parallelism().map(|n| n.get() * 2).unwrap_or(8))
.thread_name("grevm-tokio-runtime")
.enable_all()
.build()
Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct ResultAndTransition {
/// Status of execution
pub result: Option<ExecutionResult>,
/// State that got updated
pub transition: Vec<(Address, TransitionAccount)>,
pub transition: EvmState,
/// Rewards to miner
pub rewards: u128,
}
Expand Down
11 changes: 8 additions & 3 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ where
if read_set.is_disjoint(&update_write_set) {
self.read_set.push(read_set);
self.write_set.push(write_set);
evm.db_mut().temporary_commit_transition(&execute_state.transition);
// All accounts should be present inside cache.
for address in execute_state.transition.keys() {
// FIXME(gravity): error handling
let _ = evm.db_mut().load_cache_account(*address);
}
evm.db_mut().temporary_commit(&execute_state.transition);
self.execute_results.push(Ok(execute_state));
should_rerun = false;
self.metrics.reusable_tx_cnt += 1;
Expand All @@ -182,10 +187,10 @@ where
result_and_state.state.remove(&self.coinbase);
}
// temporary commit to cache_db, to make use the remaining txs can read the updated data
let transition = evm.db_mut().temporary_commit(result_and_state.state);
evm.db_mut().temporary_commit(&result_and_state.state);
self.execute_results.push(Ok(ResultAndTransition {
result: Some(result_and_state.result),
transition,
transition: result_and_state.state,
rewards: rewards.unwrap_or(0),
}));
}
Expand Down
89 changes: 53 additions & 36 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,19 @@ use crate::hint::ParallelExecutionHints;
use crate::partition::{
OrderedVectorExt, PartitionExecutor, PreRoundContext, PreUnconfirmedContext,
};
use crate::storage::SchedulerDB;
use crate::storage::{SchedulerDB, StateCache};
use crate::tx_dependency::{DependentTxsVec, TxDependency};
use crate::{
fork_join_util, GrevmError, LocationAndType, PartitionId, TxId, CPU_CORES, GREVM_RUNTIME,
MAX_NUM_ROUND,
fork_join_util, GrevmError, LocationAndType, TxId, CPU_CORES, GREVM_RUNTIME, MAX_NUM_ROUND,
};

use metrics::{counter, gauge, histogram};
use metrics::{counter, gauge};
use revm::db::states::bundle_state::BundleRetention;
use revm::db::BundleState;
use revm::primitives::{
AccountInfo, Address, Bytecode, EVMError, Env, ExecutionResult, SpecId, TxEnv, B256, U256,
};
use revm::{CacheState, DatabaseRef, EvmBuilder};
use revm::{DatabaseRef, EvmBuilder};

pub struct ExecuteMetrics {
/// Number of times parallel execution is called.
Expand Down Expand Up @@ -460,12 +459,20 @@ where
// update and pruning tx dependencies
self.update_and_pruning_dependency();

let partition_state: Vec<CacheState> = self
let partition_state: Vec<StateCache> = self
.partition_executors
.iter()
.map(|executor| {
let mut executor = executor.write().unwrap();
std::mem::take(&mut executor.partition_db.cache)
let mut cache = std::mem::take(&mut executor.partition_db.cache);
if !executor.partition_db.miner_involved {
// Remove miner account in partitions that only treat it as a reward account.
// We have preload it into scheduler state cache, and will handle rewards
// separately. Only when miner account participates in the transactions should
// it be merged into the scheduler state cache.
cache.accounts.remove(&self.coinbase);
}
cache
})
.collect();

Expand All @@ -475,14 +482,40 @@ where
drop(span);
let database = Arc::get_mut(&mut self.database).unwrap();

Self::merge_not_modified_state(&mut database.cache, partition_state);
if self.num_finality_txs == self.txs.len() {
// Merge partition state to scheduler directly if there are no conflicts in the entire
// partition.
let span = Span::enter_with_local_parent("directly merge accounts");
for state in partition_state {
database.cache.merge_accounts(state);
}

let mut rewards: u128 = 0;
for ctx in finality_txs.into_values() {
rewards += ctx.execute_state.rewards;
self.results.push(ctx.execute_state.result.unwrap());
}
database
.increment_balances(vec![(self.coinbase, rewards)])
.map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?;

self.metrics.validate_time.increment(start.elapsed().as_nanos() as u64);
return Ok(());
}

// Must merge loaded state to database cache before commit changes
let span = Span::enter_with_local_parent("merge loaded state");
for state in partition_state {
database.cache.merge_loaded_state(state);
}
std::mem::drop(span);

let span = Span::enter_with_local_parent("commit transition to scheduler db");
let mut rewards: u128 = 0;
for ctx in finality_txs.into_values() {
rewards += ctx.execute_state.rewards;
self.results.push(ctx.execute_state.result.unwrap());
database.commit_transition(ctx.execute_state.transition);
database.commit(&ctx.execute_state.transition);
}
// Each transaction updates three accounts: from, to, and coinbase.
// If every tx updates the coinbase account, it will cause conflicts across all txs.
Expand All @@ -498,27 +531,6 @@ where
Ok(())
}

/// Merge not modified state from partition to scheduler. These data are just loaded from
/// database, so we can merge them to state as original value for next round.
#[fastrace::trace]
fn merge_not_modified_state(state: &mut CacheState, partition_state: Vec<CacheState>) {
for partition in partition_state {
// merge account state that is not modified
for (address, account) in partition.accounts {
if account.status.is_not_modified() && state.accounts.get(&address).is_none() {
state.accounts.insert(address, account);
}
}

// merge contract code
for (hash, code) in partition.contracts {
if state.contracts.get(&hash).is_none() {
state.contracts.insert(hash, code);
}
}
}
}

#[fastrace::trace]
fn execute_remaining_sequential(&mut self) -> Result<(), GrevmError<DB::Error>> {
self.metrics.sequential_execute_calls.increment(1);
Expand All @@ -539,7 +551,7 @@ where
}
match evm.transact() {
Ok(result_and_state) => {
evm.db_mut().commit(result_and_state.state);
evm.db_mut().commit(&result_and_state.state);
self.results.push(result_and_state.result);
}
Err(err) => return Err(GrevmError::EvmError(err)),
Expand All @@ -562,13 +574,18 @@ where
// MUST drop the `PartitionExecutor::scheduler_db` before get mut
this.partition_executors.clear();
let database = Arc::get_mut(&mut this.database).unwrap();
database.merge_transitions(BundleRetention::Reverts);
ExecuteOutput {
state: std::mem::take(&mut database.bundle_state),
results: std::mem::take(&mut this.results),
}
let state = database.create_bundle_state(BundleRetention::Reverts);
ExecuteOutput { state, results: std::mem::take(&mut this.results) }
};

{
// Preload coinbase account
let db = Arc::get_mut(&mut self.database).unwrap();
let _ = db
.load_cache_account(self.coinbase)
.map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?;
}

if self.txs.len() < self.num_partitions && !force_parallel {
self.execute_remaining_sequential()?;
return Ok(build_execute_output(self));
Expand Down
Loading

0 comments on commit c143d8d

Please sign in to comment.