Skip to content

Commit ef3f2cb

Browse files
authored
opt: Concurrent build the output and reverts (#19)
Two optimization: 1. Concurrent build the output and reverts. According to the trace, currently building reverts does not consume time, but it takes time to transmit and insert `HashMap` through channels. 2. Use `AHashMap` instead of `HashMap`, because HashMap's performance is unstable.
1 parent 7284b13 commit ef3f2cb

File tree

6 files changed

+125
-33
lines changed

6 files changed

+125
-33
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ description = "Create Parallel EVM"
88
revm = "14.0.0"
99
fastrace = "0.7"
1010
tracing = "0.1.40"
11+
ahash = { version = "0.8.11", features = ["serde"] }
12+
rayon = "1.10.0"
1113

1214
# Alloy
1315
alloy-chains = "0.1.18"

src/lib.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use revm::{
1919
use std::{
2020
cmp::min,
2121
fmt::{Display, Formatter},
22-
sync::atomic::{AtomicUsize, Ordering},
2322
thread,
2423
};
2524
use tokio::runtime::{Builder, Runtime};
@@ -28,6 +27,7 @@ mod partition;
2827
mod scheduler;
2928
mod storage;
3029
mod tx_dependency;
30+
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
3131

3232
lazy_static! {
3333
static ref CPU_CORES: usize = thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
@@ -161,20 +161,14 @@ where
161161
F: Fn(usize, usize, usize) + Send + Sync + 'scope,
162162
{
163163
let parallel_cnt = num_partitions.unwrap_or(*CPU_CORES * 2 + 1);
164-
let index = AtomicUsize::new(0);
165164
let remaining = num_elements % parallel_cnt;
166165
let chunk_size = num_elements / parallel_cnt;
167-
thread::scope(|scope| {
168-
for _ in 0..parallel_cnt {
169-
scope.spawn(|| {
170-
let index = index.fetch_add(1, Ordering::SeqCst);
171-
let start_pos = chunk_size * index + min(index, remaining);
172-
let mut end_pos = start_pos + chunk_size;
173-
if index < remaining {
174-
end_pos += 1;
175-
}
176-
f(start_pos, end_pos, index);
177-
});
166+
(0..parallel_cnt).into_par_iter().for_each(|index| {
167+
let start_pos = chunk_size * index + min(index, remaining);
168+
let mut end_pos = start_pos + chunk_size;
169+
if index < remaining {
170+
end_pos += 1;
178171
}
172+
f(start_pos, end_pos, index);
179173
});
180174
}

src/partition.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ use crate::{
33
LocationAndType, PartitionId, ResultAndTransition, SharedTxStates, TransactionStatus, TxId,
44
TxState,
55
};
6+
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
67
use revm::{
78
primitives::{Address, EVMError, Env, ResultAndState, SpecId, TxEnv, TxKind},
89
DatabaseRef, EvmBuilder,
910
};
1011
use std::{
11-
collections::{BTreeSet, HashMap, HashSet},
12+
collections::BTreeSet,
1213
sync::Arc,
1314
time::{Duration, Instant},
1415
};

src/scheduler.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
1-
use std::{
2-
collections::{BTreeSet, HashMap, HashSet},
3-
ops::DerefMut,
4-
sync::{Arc, RwLock},
5-
time::{Duration, Instant},
6-
};
7-
81
use crate::{
92
fork_join_util,
103
hint::ParallelExecutionHints,
@@ -14,7 +7,15 @@ use crate::{
147
GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES,
158
GREVM_RUNTIME, MAX_NUM_ROUND,
169
};
10+
use fastrace::Span;
11+
use std::{
12+
collections::BTreeSet,
13+
ops::DerefMut,
14+
sync::{Arc, RwLock},
15+
time::{Duration, Instant},
16+
};
1717

18+
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
1819
use metrics::{counter, gauge};
1920
use revm::{
2021
db::{states::bundle_state::BundleRetention, BundleState},
@@ -238,6 +239,7 @@ where
238239
DB::Error: Send + Sync,
239240
{
240241
/// Creates a new GrevmScheduler instance.
242+
#[fastrace::trace]
241243
pub fn new(spec_id: SpecId, env: Env, db: DB, txs: Arc<Vec<TxEnv>>) -> Self {
242244
let coinbase = env.block.coinbase;
243245
let num_partitions = *CPU_CORES * 2 + 1; // 2 * cpu + 1 for initial partition number
@@ -507,12 +509,16 @@ where
507509
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };
508510
let mut miner_updates = Vec::with_capacity(finality_tx_cnt);
509511
let start_txid = self.num_finality_txs - finality_tx_cnt;
512+
513+
let span = Span::enter_with_local_parent("database commit transitions");
510514
for txid in start_txid..self.num_finality_txs {
511515
miner_updates.push(tx_states[txid].execute_result.miner_update.clone());
512516
database
513517
.commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition));
514518
self.results.push(tx_states[txid].execute_result.result.clone().unwrap());
515519
}
520+
drop(span);
521+
516522
// Each transaction updates three accounts: from, to, and coinbase.
517523
// If every tx updates the coinbase account, it will cause conflicts across all txs.
518524
// Therefore, we handle miner rewards separately. We don't record miner’s address in r/w

src/storage.rs

Lines changed: 98 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1-
use crate::{LocationAndType, LocationSet};
1+
use crate::{fork_join_util, LocationAndType, LocationSet};
2+
use ahash::{AHashMap, AHashSet};
3+
use fastrace::Span;
24
use revm::{
35
db::{
46
states::{bundle_state::BundleRetention, CacheAccount},
5-
BundleState, PlainAccount,
7+
AccountRevert, BundleAccount, BundleState, PlainAccount,
68
},
79
precompile::Address,
810
primitives::{Account, AccountInfo, Bytecode, EvmState, B256, BLOCK_HASH_HISTORY, U256},
911
CacheState, Database, DatabaseRef, TransitionAccount, TransitionState,
1012
};
1113
use std::{
12-
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet},
13-
sync::Arc,
14+
collections::{btree_map, hash_map, BTreeMap, HashMap},
15+
sync::{
16+
atomic::{AtomicUsize, Ordering},
17+
Arc, Mutex,
18+
},
1419
};
1520

1621
/// LazyUpdateValue is used to update the balance of the miner's account.
@@ -68,6 +73,87 @@ impl LazyUpdateValue {
6873
}
6974
}
7075

76+
trait ParallelBundleState {
77+
fn parallel_apply_transitions_and_create_reverts(
78+
&mut self,
79+
transitions: TransitionState,
80+
retention: BundleRetention,
81+
);
82+
}
83+
84+
impl ParallelBundleState for BundleState {
85+
#[fastrace::trace]
86+
fn parallel_apply_transitions_and_create_reverts(
87+
&mut self,
88+
transitions: TransitionState,
89+
retention: BundleRetention,
90+
) {
91+
assert!(self.state.is_empty());
92+
let include_reverts = retention.includes_reverts();
93+
// pessimistically pre-allocate assuming _all_ accounts changed.
94+
let reverts_capacity = if include_reverts { transitions.transitions.len() } else { 0 };
95+
let transitions = transitions.transitions;
96+
let addresses: Vec<Address> = transitions.keys().cloned().collect();
97+
let reverts: Vec<Option<(Address, AccountRevert)>> = vec![None; reverts_capacity];
98+
let bundle_state: Vec<Option<(Address, BundleAccount)>> = vec![None; transitions.len()];
99+
let state_size = AtomicUsize::new(0);
100+
let contracts = Mutex::new(HashMap::new());
101+
102+
let span = Span::enter_with_local_parent("parallel create reverts");
103+
fork_join_util(transitions.len(), None, |start_pos, end_pos, _| {
104+
#[allow(invalid_reference_casting)]
105+
let reverts = unsafe {
106+
&mut *(&reverts as *const Vec<Option<(Address, AccountRevert)>>
107+
as *mut Vec<Option<(Address, AccountRevert)>>)
108+
};
109+
#[allow(invalid_reference_casting)]
110+
let addresses =
111+
unsafe { &mut *(&addresses as *const Vec<Address> as *mut Vec<Address>) };
112+
#[allow(invalid_reference_casting)]
113+
let bundle_state = unsafe {
114+
&mut *(&bundle_state as *const Vec<Option<(Address, BundleAccount)>>
115+
as *mut Vec<Option<(Address, BundleAccount)>>)
116+
};
117+
118+
for pos in start_pos..end_pos {
119+
let address = addresses[pos];
120+
let transition = transitions.get(&address).cloned().unwrap();
121+
// add new contract if it was created/changed.
122+
if let Some((hash, new_bytecode)) = transition.has_new_contract() {
123+
contracts.lock().unwrap().insert(hash, new_bytecode.clone());
124+
}
125+
let present_bundle = transition.present_bundle_account();
126+
let revert = transition.create_revert();
127+
if let Some(revert) = revert {
128+
state_size.fetch_add(present_bundle.size_hint(), Ordering::Relaxed);
129+
bundle_state[pos] = Some((address, present_bundle));
130+
if include_reverts {
131+
reverts[pos] = Some((address, revert));
132+
}
133+
}
134+
}
135+
});
136+
self.state_size = state_size.load(Ordering::Acquire);
137+
drop(span);
138+
139+
// much faster than bundle_state.into_iter().filter_map(|r| r).collect()
140+
self.state.reserve(transitions.len());
141+
for bundle in bundle_state {
142+
if let Some((address, state)) = bundle {
143+
self.state.insert(address, state);
144+
}
145+
}
146+
let mut final_reverts = Vec::with_capacity(reverts_capacity);
147+
for revert in reverts {
148+
if let Some(r) = revert {
149+
final_reverts.push(r);
150+
}
151+
}
152+
self.reverts.push(final_reverts);
153+
self.contracts = contracts.into_inner().unwrap();
154+
}
155+
}
156+
71157
/// SchedulerDB is a database wrapper that manages state transitions and caching for the EVM.
72158
/// It maintains a cache of committed data, a transition state for ongoing transactions, and a
73159
/// bundle state for finalizing block state changes. It also tracks block hashes for quick access.
@@ -143,7 +229,8 @@ impl<DB> SchedulerDB<DB> {
143229
#[fastrace::trace]
144230
pub(crate) fn merge_transitions(&mut self, retention: BundleRetention) {
145231
if let Some(transition_state) = self.transition_state.as_mut().map(TransitionState::take) {
146-
self.bundle_state.apply_transitions_and_create_reverts(transition_state, retention);
232+
self.bundle_state
233+
.parallel_apply_transitions_and_create_reverts(transition_state, retention);
147234
}
148235
}
149236
}
@@ -345,7 +432,7 @@ pub(crate) struct PartitionDB<DB> {
345432
pub block_hashes: BTreeMap<u64, B256>,
346433

347434
/// Record the read set of current tx, will be consumed after the execution of each tx
348-
tx_read_set: HashMap<LocationAndType, Option<U256>>,
435+
tx_read_set: AHashMap<LocationAndType, Option<U256>>,
349436
}
350437

351438
impl<DB> PartitionDB<DB> {
@@ -355,12 +442,12 @@ impl<DB> PartitionDB<DB> {
355442
cache: CacheState::new(false),
356443
scheduler_db,
357444
block_hashes: BTreeMap::new(),
358-
tx_read_set: HashMap::new(),
445+
tx_read_set: AHashMap::new(),
359446
}
360447
}
361448

362449
/// consume the read set after evm.transact() for each tx
363-
pub(crate) fn take_read_set(&mut self) -> HashMap<LocationAndType, Option<U256>> {
450+
pub(crate) fn take_read_set(&mut self) -> AHashMap<LocationAndType, Option<U256>> {
364451
core::mem::take(&mut self.tx_read_set)
365452
}
366453

@@ -373,7 +460,7 @@ impl<DB> PartitionDB<DB> {
373460
) -> (LocationSet, LazyUpdateValue, bool) {
374461
let mut miner_update = LazyUpdateValue::default();
375462
let mut remove_miner = true;
376-
let mut write_set = HashSet::new();
463+
let mut write_set = AHashSet::new();
377464
for (address, account) in &mut *changes {
378465
if account.is_selfdestructed() {
379466
write_set.insert(LocationAndType::Code(*address));
@@ -483,9 +570,9 @@ where
483570
/// transaction.
484571
pub(crate) fn check_read_set(
485572
&mut self,
486-
read_set: &HashMap<LocationAndType, Option<U256>>,
573+
read_set: &AHashMap<LocationAndType, Option<U256>>,
487574
) -> bool {
488-
let mut visit_account = HashSet::new();
575+
let mut visit_account = AHashSet::new();
489576
for (location, _) in read_set {
490577
match location {
491578
LocationAndType::Basic(address) => {

src/tx_dependency.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use smallvec::SmallVec;
22
use std::{
33
cmp::{min, Reverse},
4-
collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, VecDeque},
4+
collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque},
55
};
66

77
use crate::{fork_join_util, LocationAndType, SharedTxStates, TxId};
88

99
pub(crate) type DependentTxsVec = SmallVec<[TxId; 1]>;
1010

11+
use ahash::AHashMap as HashMap;
12+
1113
const RAW_TRANSFER_WEIGHT: usize = 1;
1214

1315
/// TxDependency is used to store the dependency relationship between transactions.

0 commit comments

Comments
 (0)