Skip to content

Commit 3e8cc66

Browse files
committed
metrics: add metrics to ananlze bottleneck
1 parent fd3d73c commit 3e8cc66

File tree

7 files changed

+144
-34
lines changed

7 files changed

+144
-34
lines changed

.github/workflows/grevm-fmt.yml

+3
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,6 @@ jobs:
2828

2929
- name: Run cargo fmt
3030
run: cargo +nightly fmt --check
31+
32+
- name: Run cargo check
33+
run: RUSTFLAGS="-D warnings" cargo check

src/hint.rs

+22-8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
use crate::{fork_join_util, LocationAndType, SharedTxStates, TxState};
12
use revm::primitives::{
23
alloy_primitives::U160, keccak256, ruint::UintTryFrom, Address, Bytes, TxEnv, TxKind, B256,
34
U256,
45
};
5-
use std::sync::Arc;
6-
7-
use crate::{fork_join_util, LocationAndType, SharedTxStates, TxState};
6+
use std::sync::{
7+
atomic::{AtomicU64, Ordering},
8+
Arc,
9+
};
810

911
/// This module provides functionality for parsing and handling execution hints
1012
/// for parallel transaction execution in the context of Ethereum-like blockchains.
@@ -80,11 +82,13 @@ pub(crate) struct ParallelExecutionHints {
8082
/// Shared transaction states that will be updated with read/write sets
8183
/// based on the contract interactions.
8284
tx_states: SharedTxStates,
85+
pub unknown_contract_tx_cnt: u64,
86+
pub raw_tx_cnt: u64,
8387
}
8488

8589
impl ParallelExecutionHints {
8690
pub(crate) fn new(tx_states: SharedTxStates) -> Self {
87-
Self { tx_states }
91+
Self { tx_states, unknown_contract_tx_cnt: 0, raw_tx_cnt: 0 }
8892
}
8993

9094
/// Obtain a mutable reference to shared transaction states, and parse execution hints for each
@@ -98,7 +102,9 @@ impl ParallelExecutionHints {
98102
/// no conflicts between transactions, making the `Mutex` approach unnecessarily verbose and
99103
/// cumbersome.
100104
#[fastrace::trace]
101-
pub(crate) fn parse_hints(&self, txs: Arc<Vec<TxEnv>>) {
105+
pub(crate) fn parse_hints(&mut self, txs: Arc<Vec<TxEnv>>) {
106+
let num_unknown_contract_tx = AtomicU64::new(0);
107+
let num_raw_tx = AtomicU64::new(0);
102108
// Utilize fork-join utility to process transactions in parallel
103109
fork_join_util(txs.len(), None, |start_tx, end_tx, _| {
104110
#[allow(invalid_reference_casting)]
@@ -122,18 +128,26 @@ impl ParallelExecutionHints {
122128
&tx_env.data,
123129
rw_set,
124130
) {
131+
num_unknown_contract_tx.fetch_add(1, Ordering::Relaxed);
125132
rw_set.insert_location(
126133
LocationAndType::Basic(to_address),
127134
RWType::WriteOnly,
128135
);
129136
}
130-
} else if to_address != tx_env.caller {
131-
rw_set
132-
.insert_location(LocationAndType::Basic(to_address), RWType::ReadWrite);
137+
} else {
138+
num_raw_tx.fetch_add(1, Ordering::Relaxed);
139+
if to_address != tx_env.caller {
140+
rw_set.insert_location(
141+
LocationAndType::Basic(to_address),
142+
RWType::ReadWrite,
143+
);
144+
}
133145
}
134146
}
135147
}
136148
});
149+
self.unknown_contract_tx_cnt = num_unknown_contract_tx.load(Ordering::Acquire);
150+
self.raw_tx_cnt = num_raw_tx.load(Ordering::Acquire);
137151
}
138152

139153
/// This function computes the storage slot using the provided slot number and a vector of

src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ pub mod storage;
3030
mod tx_dependency;
3131
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
3232

33+
lazy_static! {
34+
static ref DEBUG_BOTTLENECK: bool =
35+
std::env::var("DEBUG_BOTTLENECK").map_or(false, |v| v == "on");
36+
}
37+
3338
lazy_static! {
3439
static ref CPU_CORES: usize = thread::available_parallelism().map(|n| n.get()).unwrap_or(8);
3540
}

src/partition.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,10 @@ where
122122
*evm.tx_mut() = tx.clone();
123123
evm.db_mut().current_txid = txid;
124124
evm.db_mut().raw_transfer = true; // no need to wait miner rewards
125-
let mut raw_transfer = true;
126-
if let Ok(Some(info)) = evm.db_mut().basic(tx.caller) {
127-
raw_transfer = info.is_empty_code_hash();
128-
}
125+
let mut raw_transfer = false;
129126
if let TxKind::Call(to) = tx.transact_to {
130127
if let Ok(Some(info)) = evm.db_mut().basic(to) {
131-
raw_transfer &= info.is_empty_code_hash();
128+
raw_transfer = info.is_empty_code_hash();
132129
}
133130
}
134131
evm.db_mut().raw_transfer = raw_transfer;

src/scheduler.rs

+35-14
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
storage::{SchedulerDB, State},
66
tx_dependency::{DependentTxsVec, TxDependency},
77
GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, CPU_CORES,
8-
GREVM_RUNTIME, MAX_NUM_ROUND,
8+
DEBUG_BOTTLENECK, GREVM_RUNTIME, MAX_NUM_ROUND,
99
};
1010
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
1111
use atomic::Atomic;
@@ -75,6 +75,10 @@ struct ExecuteMetrics {
7575
commit_transition_time: metrics::Histogram,
7676
/// Time taken to execute transactions in sequential(in nanoseconds).
7777
sequential_execute_time: metrics::Histogram,
78+
/// Number of transactions that failed to parse contract
79+
unknown_contract_tx_cnt: metrics::Histogram,
80+
/// Number of raw transactions
81+
raw_tx_cnt: metrics::Histogram,
7882
}
7983

8084
impl Default for ExecuteMetrics {
@@ -100,6 +104,8 @@ impl Default for ExecuteMetrics {
100104
merge_write_set_time: histogram!("grevm.merge_write_set_time"),
101105
commit_transition_time: histogram!("grevm.commit_transition_time"),
102106
sequential_execute_time: histogram!("grevm.sequential_execute_time"),
107+
unknown_contract_tx_cnt: histogram!("grevm.unknown_contract_tx_cnt"),
108+
raw_tx_cnt: histogram!("grevm.raw_tx_cnt"),
103109
}
104110
}
105111
}
@@ -127,6 +133,8 @@ struct ExecuteMetricsCollector {
127133
merge_write_set_time: u64,
128134
commit_transition_time: u64,
129135
sequential_execute_time: u64,
136+
unknown_contract_tx_cnt: u64,
137+
raw_tx_cnt: u64,
130138
}
131139

132140
impl ExecuteMetricsCollector {
@@ -152,6 +160,8 @@ impl ExecuteMetricsCollector {
152160
execute_metrics.merge_write_set_time.record(self.merge_write_set_time as f64);
153161
execute_metrics.commit_transition_time.record(self.commit_transition_time as f64);
154162
execute_metrics.sequential_execute_time.record(self.sequential_execute_time as f64);
163+
execute_metrics.unknown_contract_tx_cnt.record(self.unknown_contract_tx_cnt as f64);
164+
execute_metrics.raw_tx_cnt.record(self.raw_tx_cnt as f64);
155165
}
156166
}
157167

@@ -348,9 +358,10 @@ where
348358

349359
/// Get the partitioned transactions by dependencies.
350360
#[fastrace::trace]
351-
pub(crate) fn partition_transactions(&mut self) {
361+
pub(crate) fn partition_transactions(&mut self, round: usize) {
352362
// compute and assign partitioned_txs
353363
let start = Instant::now();
364+
self.tx_dependencies.round = Some(round);
354365
self.partitioned_txs = self.tx_dependencies.fetch_best_partitions(self.num_partitions);
355366
self.num_partitions = self.partitioned_txs.len();
356367
let mut max = 0;
@@ -412,13 +423,15 @@ where
412423
let start = Instant::now();
413424
let mut merged_write_set: HashMap<LocationAndType, BTreeSet<TxId>> = HashMap::new();
414425
let mut end_skip_id = self.num_finality_txs;
415-
for txid in self.num_finality_txs..self.tx_states.len() {
416-
if self.tx_states[txid].tx_status == TransactionStatus::SkipValidation &&
417-
end_skip_id == txid
418-
{
419-
end_skip_id += 1;
420-
} else {
421-
break;
426+
if !(*DEBUG_BOTTLENECK) {
427+
for txid in self.num_finality_txs..self.tx_states.len() {
428+
if self.tx_states[txid].tx_status == TransactionStatus::SkipValidation &&
429+
end_skip_id == txid
430+
{
431+
end_skip_id += 1;
432+
} else {
433+
break;
434+
}
422435
}
423436
}
424437
if end_skip_id != self.tx_states.len() {
@@ -438,8 +451,7 @@ where
438451
/// and there is no need to record the dependency and dependent relationships of these
439452
/// transactions. Thus achieving the purpose of pruning.
440453
#[fastrace::trace]
441-
fn update_and_pruning_dependency(&mut self) {
442-
let num_finality_txs = self.num_finality_txs;
454+
fn update_and_pruning_dependency(&mut self, num_finality_txs: usize) {
443455
if num_finality_txs == self.txs.len() {
444456
return;
445457
}
@@ -524,6 +536,13 @@ where
524536
}
525537
}
526538
});
539+
if *DEBUG_BOTTLENECK && self.num_finality_txs == 0 {
540+
// Use the read-write set to build accurate dependencies,
541+
// and try to find the bottleneck
542+
self.update_and_pruning_dependency(0);
543+
self.tx_dependencies.round = None;
544+
self.tx_dependencies.fetch_best_partitions(self.num_partitions);
545+
}
527546
miner_involved_txs.into_iter().collect()
528547
}
529548

@@ -651,7 +670,7 @@ where
651670
let miner_involved_txs = self.generate_unconfirmed_txs();
652671
let finality_tx_cnt = self.find_continuous_min_txid()?;
653672
// update and pruning tx dependencies
654-
self.update_and_pruning_dependency();
673+
self.update_and_pruning_dependency(self.num_finality_txs);
655674
self.commit_transition(finality_tx_cnt)?;
656675
let mut rewards_accumulators = RewardsAccumulators::new();
657676
for txid in miner_involved_txs {
@@ -721,10 +740,12 @@ where
721740
#[fastrace::trace]
722741
fn parse_hints(&mut self) {
723742
let start = Instant::now();
724-
let hints = ParallelExecutionHints::new(self.tx_states.clone());
743+
let mut hints = ParallelExecutionHints::new(self.tx_states.clone());
725744
hints.parse_hints(self.txs.clone());
726745
self.tx_dependencies.init_tx_dependency(self.tx_states.clone());
727746
self.metrics.parse_hints_time += start.elapsed().as_nanos() as u64;
747+
self.metrics.unknown_contract_tx_cnt += hints.unknown_contract_tx_cnt;
748+
self.metrics.raw_tx_cnt += hints.raw_tx_cnt;
728749
}
729750

730751
#[fastrace::trace]
@@ -754,7 +775,7 @@ where
754775
let mut round = 0;
755776
while round < MAX_NUM_ROUND {
756777
if self.num_finality_txs < self.txs.len() {
757-
self.partition_transactions();
778+
self.partition_transactions(round);
758779
if self.num_partitions == 1 && !force_parallel {
759780
break;
760781
}

src/tx_dependency.rs

+76-6
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1+
use crate::{fork_join_util, LocationAndType, SharedTxStates, TxId, DEBUG_BOTTLENECK};
12
use smallvec::SmallVec;
23
use std::{
3-
cmp::{min, Reverse},
4+
cmp::{max, min, Reverse},
45
collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque},
56
};
67

7-
use crate::{fork_join_util, LocationAndType, SharedTxStates, TxId};
8-
98
pub(crate) type DependentTxsVec = SmallVec<[TxId; 1]>;
109

11-
use ahash::AHashMap as HashMap;
10+
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
11+
use metrics::counter;
1212

1313
const RAW_TRANSFER_WEIGHT: usize = 1;
1414

@@ -31,6 +31,8 @@ pub(crate) struct TxDependency {
3131
// type, while in the second round, weights can be assigned based on tx_running_time.
3232
#[allow(dead_code)]
3333
tx_weight: Option<Vec<usize>>,
34+
35+
pub round: Option<usize>,
3436
}
3537

3638
impl TxDependency {
@@ -40,19 +42,21 @@ impl TxDependency {
4042
num_finality_txs: 0,
4143
tx_running_time: None,
4244
tx_weight: None,
45+
round: None,
4346
}
4447
}
4548

4649
#[fastrace::trace]
4750
pub fn init_tx_dependency(&mut self, tx_states: SharedTxStates) {
4851
let mut last_write_tx: HashMap<LocationAndType, TxId> = HashMap::new();
4952
for (txid, rw_set) in tx_states.iter().enumerate() {
50-
let dependencies = &mut self.tx_dependency[txid];
53+
let mut dependencies = HashSet::new();
5154
for (location, _) in rw_set.read_set.iter() {
5255
if let Some(previous) = last_write_tx.get(location) {
53-
dependencies.push(*previous);
56+
dependencies.insert(*previous);
5457
}
5558
}
59+
self.tx_dependency[txid] = dependencies.into_iter().collect();
5660
for location in rw_set.write_set.iter() {
5761
last_write_tx.insert(location.clone(), txid);
5862
}
@@ -139,6 +143,7 @@ impl TxDependency {
139143
}
140144
txid -= 1;
141145
}
146+
self.skew_analyze(&weighted_group);
142147

143148
let num_partitions = min(partition_count, num_group);
144149
if num_partitions == 0 {
@@ -204,4 +209,69 @@ impl TxDependency {
204209
self.tx_dependency = tx_dependency;
205210
self.num_finality_txs = num_finality_txs;
206211
}
212+
213+
fn skew_analyze(&self, weighted_group: &BTreeMap<usize, Vec<DependentTxsVec>>) {
214+
if !(*DEBUG_BOTTLENECK) {
215+
return;
216+
}
217+
if let Some(0) = self.round {
218+
counter!("grevm.total_block_cnt").increment(1);
219+
}
220+
let num_finality_txs = self.num_finality_txs;
221+
let num_txs = num_finality_txs + self.tx_dependency.len();
222+
let num_remaining = self.tx_dependency.len();
223+
if num_txs < 64 || num_remaining < num_txs / 3 {
224+
return;
225+
}
226+
let mut subgraph = BTreeSet::new();
227+
if let Some((_, groups)) = weighted_group.last_key_value() {
228+
if groups[0].len() >= num_remaining / 3 {
229+
subgraph.extend(groups[0].clone());
230+
}
231+
}
232+
if subgraph.is_empty() {
233+
return;
234+
}
235+
236+
// ChainLength -> ChainNumber
237+
let mut chains = BTreeMap::new();
238+
let mut visited = HashSet::new();
239+
for txid in subgraph.iter().rev() {
240+
if !visited.contains(txid) {
241+
let mut txid = *txid;
242+
let mut chain_len = 0;
243+
while !visited.contains(&txid) {
244+
chain_len += 1;
245+
visited.insert(txid);
246+
let dep: BTreeSet<TxId> =
247+
self.tx_dependency[txid - num_finality_txs].clone().into_iter().collect();
248+
for dep_id in dep.into_iter().rev() {
249+
if !visited.contains(&dep_id) {
250+
txid = dep_id;
251+
break;
252+
}
253+
}
254+
}
255+
let chain_num = chains.get(&chain_len).cloned().unwrap_or(0) + 1;
256+
chains.insert(chain_len, chain_num);
257+
}
258+
}
259+
260+
let graph_len = subgraph.len();
261+
let tip = self.round.map(|r| format!("round{}", r)).unwrap_or(String::from("none"));
262+
counter!("grevm.large_graph_block_cnt", "tip" => tip.clone()).increment(1);
263+
if let Some((chain_len, _)) = chains.last_key_value() {
264+
let chain_len = *chain_len;
265+
if chain_len > graph_len * 2 / 3 {
266+
// Long chain
267+
counter!("grevm.large_graph", "type" => "chain", "tip" => tip.clone()).increment(1);
268+
} else if chain_len < max(3, graph_len / 8) {
269+
// Star Graph
270+
counter!("grevm.large_graph", "type" => "star", "tip" => tip.clone()).increment(1);
271+
} else {
272+
// Fork Graph
273+
counter!("grevm.large_graph", "type" => "fork", "tip" => tip.clone()).increment(1);
274+
}
275+
}
276+
}
207277
}

tests/common/execute.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ pub(crate) fn compare_evm_execute<DB>(
144144

145145
let snapshot = recorder.snapshotter().snapshot();
146146
for (key, _, _, value) in snapshot.into_vec() {
147-
println!("metrics: {} => value: {:?}", key.key().name(), value);
147+
println!("metrics: {} => value: {:?}", key.key(), value);
148148
if let Some(metric) = parallel_metrics.get(key.key().name()) {
149149
let v = match value {
150150
DebugValue::Counter(v) => v as usize,

0 commit comments

Comments
 (0)