diff --git a/src/scheduler.rs b/src/scheduler.rs index 9107621..ef8ee8e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -11,8 +11,7 @@ use ahash::{AHashMap as HashMap, AHashSet as HashSet}; use atomic::Atomic; use dashmap::DashSet; use fastrace::Span; -use lazy_static::lazy_static; -use metrics::{gauge, histogram}; +use metrics::{counter, gauge, histogram}; use revm::{ primitives::{ AccountInfo, Address, Bytecode, EVMError, Env, ExecutionResult, SpecId, TxEnv, B256, U256, @@ -32,6 +31,7 @@ use tokio::sync::Notify; use tracing::info; struct ExecuteMetrics { + block_height: metrics::Counter, /// Number of times parallel execution is called. parallel_execute_calls: metrics::Gauge, /// Number of times sequential execution is called. @@ -84,6 +84,7 @@ struct ExecuteMetrics { impl Default for ExecuteMetrics { fn default() -> Self { Self { + block_height: counter!("grevm.block_height"), parallel_execute_calls: gauge!("grevm.parallel_round_calls"), sequential_execute_calls: gauge!("grevm.sequential_execute_calls"), total_tx_cnt: histogram!("grevm.total_tx_cnt"), @@ -113,6 +114,7 @@ impl Default for ExecuteMetrics { /// Collect metrics and report #[derive(Default)] struct ExecuteMetricsCollector { + block_height: u64, parallel_execute_calls: u64, sequential_execute_calls: u64, total_tx_cnt: u64, @@ -140,6 +142,7 @@ struct ExecuteMetricsCollector { impl ExecuteMetricsCollector { fn report(&self) { let execute_metrics = ExecuteMetrics::default(); + execute_metrics.block_height.absolute(self.block_height); execute_metrics.parallel_execute_calls.set(self.parallel_execute_calls as f64); execute_metrics.sequential_execute_calls.set(self.sequential_execute_calls as f64); execute_metrics.total_tx_cnt.record(self.total_tx_cnt as f64); @@ -337,7 +340,6 @@ where let coinbase = env.block.coinbase; let num_partitions = *CPU_CORES * 2 + 1; // 2 * cpu + 1 for initial partition number let num_txs = txs.len(); - info!("Parallel execute {} txs of SpecId {:?}", num_txs, spec_id); Self { spec_id, env, @@ -550,12 +552,12 @@ where /// If the smallest TxID is a conflict transaction, return an error. #[fastrace::trace] fn find_continuous_min_txid(&mut self) -> Result> { - let mut min_execute_time = Duration::from_secs(u64::MAX); + let mut sum_execute_time = Duration::from_secs(0); let mut max_execute_time = Duration::from_secs(0); for executor in &self.partition_executors { let mut executor = executor.write().unwrap(); self.metrics.reusable_tx_cnt += executor.metrics.reusable_tx_cnt; - min_execute_time = min_execute_time.min(executor.metrics.execute_time); + sum_execute_time += executor.metrics.execute_time; max_execute_time = max_execute_time.max(executor.metrics.execute_time); if executor.assigned_txs[0] == self.num_finality_txs && self.tx_states[self.num_finality_txs].tx_status == TransactionStatus::Conflict @@ -568,7 +570,9 @@ where let mut conflict_tx_cnt = 0; let mut unconfirmed_tx_cnt = 0; let mut finality_tx_cnt = 0; - self.metrics.partition_et_diff += (max_execute_time - min_execute_time).as_nanos() as u64; + let avg_execution_time = + sum_execute_time.as_nanos() / self.partition_executors.len() as u128; + self.metrics.partition_et_diff += (max_execute_time.as_nanos() - avg_execution_time) as u64; #[allow(invalid_reference_casting)] let tx_states = unsafe { &mut *(&(*self.tx_states) as *const Vec as *mut Vec) }; @@ -755,6 +759,14 @@ where with_hints: bool, num_partitions: Option, ) -> Result> { + let block_height: u64 = self.env.block.number.try_into().unwrap_or(0); + info!( + "Parallel execute {} txs: block={}, SpecId={:?}", + self.txs.len(), + block_height, + self.spec_id + ); + self.metrics.block_height = block_height; if with_hints { self.parse_hints(); } diff --git a/src/tx_dependency.rs b/src/tx_dependency.rs index 004f184..08a1752 100644 --- a/src/tx_dependency.rs +++ b/src/tx_dependency.rs @@ -8,7 +8,7 @@ use std::{ pub(crate) type DependentTxsVec = SmallVec<[TxId; 1]>; use ahash::{AHashMap as HashMap, AHashSet as HashSet}; -use metrics::counter; +use metrics::{counter, histogram}; const RAW_TRANSFER_WEIGHT: usize = 1; @@ -214,22 +214,26 @@ impl TxDependency { if !(*DEBUG_BOTTLENECK) { return; } - if let Some(0) = self.round { - counter!("grevm.total_block_cnt").increment(1); - } let num_finality_txs = self.num_finality_txs; let num_txs = num_finality_txs + self.tx_dependency.len(); let num_remaining = self.tx_dependency.len(); - if num_txs < 64 || num_remaining < num_txs / 3 { - return; + if let Some(0) = self.round { + counter!("grevm.total_block_cnt").increment(1); } let mut subgraph = BTreeSet::new(); if let Some((_, groups)) = weighted_group.last_key_value() { + if self.round.is_none() { + let largest_ratio = groups[0].len() as f64 / num_remaining as f64; + histogram!("grevm.large_graph_ratio").record(largest_ratio); + if groups[0].len() >= num_remaining / 2 { + counter!("grevm.low_parallelism_cnt").increment(1); + } + } if groups[0].len() >= num_remaining / 3 { subgraph.extend(groups[0].clone()); } } - if subgraph.is_empty() { + if num_txs < 64 || num_remaining < num_txs / 3 || subgraph.is_empty() { return; } @@ -265,7 +269,7 @@ impl TxDependency { if chain_len > graph_len * 2 / 3 { // Long chain counter!("grevm.large_graph", "type" => "chain", "tip" => tip.clone()).increment(1); - } else if chain_len < max(3, graph_len / 8) { + } else if chain_len < max(3, graph_len / 6) { // Star Graph counter!("grevm.large_graph", "type" => "star", "tip" => tip.clone()).increment(1); } else {