@@ -11,7 +11,8 @@ use ahash::{AHashMap as HashMap, AHashSet as HashSet};
11
11
use atomic:: Atomic ;
12
12
use dashmap:: DashSet ;
13
13
use fastrace:: Span ;
14
- use metrics:: { counter, gauge} ;
14
+ use lazy_static:: lazy_static;
15
+ use metrics:: { gauge, histogram} ;
15
16
use revm:: {
16
17
primitives:: {
17
18
AccountInfo , Address , Bytecode , EVMError , Env , ExecutionResult , SpecId , TxEnv , B256 , U256 ,
@@ -32,77 +33,128 @@ use tracing::info;
32
33
33
34
struct ExecuteMetrics {
34
35
/// Number of times parallel execution is called.
35
- parallel_execute_calls : metrics:: Counter ,
36
+ parallel_execute_calls : metrics:: Gauge ,
36
37
/// Number of times sequential execution is called.
37
- sequential_execute_calls : metrics:: Counter ,
38
+ sequential_execute_calls : metrics:: Gauge ,
38
39
39
40
/// Total number of transactions.
40
- total_tx_cnt : metrics:: Counter ,
41
+ total_tx_cnt : metrics:: Histogram ,
41
42
/// Number of transactions executed in parallel.
42
- parallel_tx_cnt : metrics:: Counter ,
43
+ parallel_tx_cnt : metrics:: Histogram ,
43
44
/// Number of transactions executed sequentially.
44
- sequential_tx_cnt : metrics:: Counter ,
45
+ sequential_tx_cnt : metrics:: Histogram ,
45
46
/// Number of transactions that encountered conflicts.
46
- conflict_tx_cnt : metrics:: Counter ,
47
+ conflict_tx_cnt : metrics:: Histogram ,
47
48
/// Number of transactions that reached finality.
48
- finality_tx_cnt : metrics:: Counter ,
49
+ finality_tx_cnt : metrics:: Histogram ,
49
50
/// Number of transactions that are unconfirmed.
50
- unconfirmed_tx_cnt : metrics:: Counter ,
51
+ unconfirmed_tx_cnt : metrics:: Histogram ,
51
52
/// Number of reusable transactions.
52
- reusable_tx_cnt : metrics:: Counter ,
53
+ reusable_tx_cnt : metrics:: Histogram ,
53
54
/// Number of transactions that skip validation
54
- skip_validation_cnt : metrics:: Counter ,
55
+ skip_validation_cnt : metrics:: Histogram ,
55
56
56
57
/// Number of concurrent partitions.
57
- concurrent_partition_num : metrics:: Gauge ,
58
+ concurrent_partition_num : metrics:: Histogram ,
58
59
/// Execution time difference between partitions(in nanoseconds).
59
- partition_et_diff : metrics:: Gauge ,
60
+ partition_et_diff : metrics:: Histogram ,
60
61
/// Number of transactions difference between partitions.
61
- partition_tx_diff : metrics:: Gauge ,
62
+ partition_tx_diff : metrics:: Histogram ,
62
63
63
64
/// Time taken to parse execution hints(in nanoseconds).
64
- parse_hints_time : metrics:: Counter ,
65
+ parse_hints_time : metrics:: Histogram ,
65
66
/// Time taken to partition transactions(in nanoseconds).
66
- partition_tx_time : metrics:: Counter ,
67
+ partition_tx_time : metrics:: Histogram ,
67
68
/// Time taken to execute transactions in parallel(in nanoseconds).
68
- parallel_execute_time : metrics:: Counter ,
69
+ parallel_execute_time : metrics:: Histogram ,
69
70
/// Time taken to validate transactions(in nanoseconds).
70
- validate_time : metrics:: Counter ,
71
+ validate_time : metrics:: Histogram ,
71
72
/// Time taken to merge write set.
72
- merge_write_set_time : metrics:: Counter ,
73
+ merge_write_set_time : metrics:: Histogram ,
73
74
/// Time taken to commit transition
74
- commit_transition_time : metrics:: Counter ,
75
+ commit_transition_time : metrics:: Histogram ,
75
76
/// Time taken to execute transactions in sequential(in nanoseconds).
76
- sequential_execute_time : metrics:: Counter ,
77
+ sequential_execute_time : metrics:: Histogram ,
77
78
}
78
79
79
80
impl Default for ExecuteMetrics {
80
81
fn default ( ) -> Self {
81
82
Self {
82
- parallel_execute_calls : counter ! ( "grevm.parallel_round_calls" ) ,
83
- sequential_execute_calls : counter ! ( "grevm.sequential_execute_calls" ) ,
84
- total_tx_cnt : counter ! ( "grevm.total_tx_cnt" ) ,
85
- parallel_tx_cnt : counter ! ( "grevm.parallel_tx_cnt" ) ,
86
- sequential_tx_cnt : counter ! ( "grevm.sequential_tx_cnt" ) ,
87
- finality_tx_cnt : counter ! ( "grevm.finality_tx_cnt" ) ,
88
- conflict_tx_cnt : counter ! ( "grevm.conflict_tx_cnt" ) ,
89
- unconfirmed_tx_cnt : counter ! ( "grevm.unconfirmed_tx_cnt" ) ,
90
- reusable_tx_cnt : counter ! ( "grevm.reusable_tx_cnt" ) ,
91
- skip_validation_cnt : counter ! ( "grevm.skip_validation_cnt" ) ,
92
- concurrent_partition_num : gauge ! ( "grevm.concurrent_partition_num" ) ,
93
- partition_et_diff : gauge ! ( "grevm.partition_execution_time_diff" ) ,
94
- partition_tx_diff : gauge ! ( "grevm.partition_num_tx_diff" ) ,
95
- parse_hints_time : counter ! ( "grevm.parse_hints_time" ) ,
96
- partition_tx_time : counter ! ( "grevm.partition_tx_time" ) ,
97
- parallel_execute_time : counter ! ( "grevm.parallel_execute_time" ) ,
98
- validate_time : counter ! ( "grevm.validate_time" ) ,
99
- merge_write_set_time : counter ! ( "grevm.merge_write_set_time" ) ,
100
- commit_transition_time : counter ! ( "grevm.commit_transition_time" ) ,
101
- sequential_execute_time : counter ! ( "grevm.sequential_execute_time" ) ,
83
+ parallel_execute_calls : gauge ! ( "grevm.parallel_round_calls" ) ,
84
+ sequential_execute_calls : gauge ! ( "grevm.sequential_execute_calls" ) ,
85
+ total_tx_cnt : histogram ! ( "grevm.total_tx_cnt" ) ,
86
+ parallel_tx_cnt : histogram ! ( "grevm.parallel_tx_cnt" ) ,
87
+ sequential_tx_cnt : histogram ! ( "grevm.sequential_tx_cnt" ) ,
88
+ finality_tx_cnt : histogram ! ( "grevm.finality_tx_cnt" ) ,
89
+ conflict_tx_cnt : histogram ! ( "grevm.conflict_tx_cnt" ) ,
90
+ unconfirmed_tx_cnt : histogram ! ( "grevm.unconfirmed_tx_cnt" ) ,
91
+ reusable_tx_cnt : histogram ! ( "grevm.reusable_tx_cnt" ) ,
92
+ skip_validation_cnt : histogram ! ( "grevm.skip_validation_cnt" ) ,
93
+ concurrent_partition_num : histogram ! ( "grevm.concurrent_partition_num" ) ,
94
+ partition_et_diff : histogram ! ( "grevm.partition_execution_time_diff" ) ,
95
+ partition_tx_diff : histogram ! ( "grevm.partition_num_tx_diff" ) ,
96
+ parse_hints_time : histogram ! ( "grevm.parse_hints_time" ) ,
97
+ partition_tx_time : histogram ! ( "grevm.partition_tx_time" ) ,
98
+ parallel_execute_time : histogram ! ( "grevm.parallel_execute_time" ) ,
99
+ validate_time : histogram ! ( "grevm.validate_time" ) ,
100
+ merge_write_set_time : histogram ! ( "grevm.merge_write_set_time" ) ,
101
+ commit_transition_time : histogram ! ( "grevm.commit_transition_time" ) ,
102
+ sequential_execute_time : histogram ! ( "grevm.sequential_execute_time" ) ,
102
103
}
103
104
}
104
105
}
105
106
107
+ /// Collect metrics and report
108
+ #[ derive( Default ) ]
109
+ struct ExecuteMetricsCollector {
110
+ parallel_execute_calls : u64 ,
111
+ sequential_execute_calls : u64 ,
112
+ total_tx_cnt : u64 ,
113
+ parallel_tx_cnt : u64 ,
114
+ sequential_tx_cnt : u64 ,
115
+ conflict_tx_cnt : u64 ,
116
+ finality_tx_cnt : u64 ,
117
+ unconfirmed_tx_cnt : u64 ,
118
+ reusable_tx_cnt : u64 ,
119
+ skip_validation_cnt : u64 ,
120
+ concurrent_partition_num : u64 ,
121
+ partition_et_diff : u64 ,
122
+ partition_tx_diff : u64 ,
123
+ parse_hints_time : u64 ,
124
+ partition_tx_time : u64 ,
125
+ parallel_execute_time : u64 ,
126
+ validate_time : u64 ,
127
+ merge_write_set_time : u64 ,
128
+ commit_transition_time : u64 ,
129
+ sequential_execute_time : u64 ,
130
+ }
131
+
132
+ impl ExecuteMetricsCollector {
133
+ fn report ( & self ) {
134
+ let execute_metrics = ExecuteMetrics :: default ( ) ;
135
+ execute_metrics. parallel_execute_calls . set ( self . parallel_execute_calls as f64 ) ;
136
+ execute_metrics. sequential_execute_calls . set ( self . sequential_execute_calls as f64 ) ;
137
+ execute_metrics. total_tx_cnt . record ( self . total_tx_cnt as f64 ) ;
138
+ execute_metrics. parallel_tx_cnt . record ( self . parallel_tx_cnt as f64 ) ;
139
+ execute_metrics. sequential_tx_cnt . record ( self . sequential_tx_cnt as f64 ) ;
140
+ execute_metrics. conflict_tx_cnt . record ( self . conflict_tx_cnt as f64 ) ;
141
+ execute_metrics. finality_tx_cnt . record ( self . finality_tx_cnt as f64 ) ;
142
+ execute_metrics. unconfirmed_tx_cnt . record ( self . unconfirmed_tx_cnt as f64 ) ;
143
+ execute_metrics. reusable_tx_cnt . record ( self . reusable_tx_cnt as f64 ) ;
144
+ execute_metrics. skip_validation_cnt . record ( self . skip_validation_cnt as f64 ) ;
145
+ execute_metrics. concurrent_partition_num . record ( self . concurrent_partition_num as f64 ) ;
146
+ execute_metrics. partition_et_diff . record ( self . partition_et_diff as f64 ) ;
147
+ execute_metrics. partition_tx_diff . record ( self . partition_tx_diff as f64 ) ;
148
+ execute_metrics. parse_hints_time . record ( self . parse_hints_time as f64 ) ;
149
+ execute_metrics. partition_tx_time . record ( self . partition_tx_time as f64 ) ;
150
+ execute_metrics. parallel_execute_time . record ( self . parallel_execute_time as f64 ) ;
151
+ execute_metrics. validate_time . record ( self . validate_time as f64 ) ;
152
+ execute_metrics. merge_write_set_time . record ( self . merge_write_set_time as f64 ) ;
153
+ execute_metrics. commit_transition_time . record ( self . commit_transition_time as f64 ) ;
154
+ execute_metrics. sequential_execute_time . record ( self . sequential_execute_time as f64 ) ;
155
+ }
156
+ }
157
+
106
158
/// The output of the execution of a block.
107
159
#[ derive( Debug ) ]
108
160
pub struct ExecuteOutput {
@@ -203,7 +255,7 @@ where
203
255
204
256
rewards_accumulators : Arc < RewardsAccumulators > ,
205
257
206
- metrics : ExecuteMetrics ,
258
+ metrics : ExecuteMetricsCollector ,
207
259
}
208
260
209
261
/// A wrapper for DatabaseRef.
@@ -311,15 +363,15 @@ where
311
363
min = partition. len ( ) ;
312
364
}
313
365
}
314
- self . metrics . partition_tx_diff . set ( ( max - min) as f64 ) ;
315
- self . metrics . concurrent_partition_num . set ( self . num_partitions as f64 ) ;
316
- self . metrics . partition_tx_time . increment ( start. elapsed ( ) . as_nanos ( ) as u64 ) ;
366
+ self . metrics . partition_tx_diff += ( max - min) as u64 ;
367
+ self . metrics . concurrent_partition_num = self . num_partitions as u64 ;
368
+ self . metrics . partition_tx_time += start. elapsed ( ) . as_nanos ( ) as u64 ;
317
369
}
318
370
319
371
/// Execute transactions in parallel.
320
372
#[ fastrace:: trace]
321
373
fn round_execute ( & mut self ) -> Result < ( ) , GrevmError < DB :: Error > > {
322
- self . metrics . parallel_execute_calls . increment ( 1 ) ;
374
+ self . metrics . parallel_execute_calls += 1 ;
323
375
self . partition_executors . clear ( ) ;
324
376
for partition_id in 0 ..self . num_partitions {
325
377
let executor = PartitionExecutor :: new (
@@ -349,7 +401,7 @@ where
349
401
futures:: future:: join_all ( tasks) . await ;
350
402
} )
351
403
} ) ;
352
- self . metrics . parallel_execute_time . increment ( start. elapsed ( ) . as_nanos ( ) as u64 ) ;
404
+ self . metrics . parallel_execute_time += start. elapsed ( ) . as_nanos ( ) as u64 ;
353
405
354
406
self . validate_transactions ( )
355
407
}
@@ -377,7 +429,7 @@ where
377
429
}
378
430
}
379
431
}
380
- self . metrics . merge_write_set_time . increment ( start. elapsed ( ) . as_nanos ( ) as u64 ) ;
432
+ self . metrics . merge_write_set_time += start. elapsed ( ) . as_nanos ( ) as u64 ;
381
433
( end_skip_id, merged_write_set)
382
434
}
383
435
@@ -417,7 +469,7 @@ where
417
469
fn generate_unconfirmed_txs ( & mut self ) -> Vec < TxId > {
418
470
let num_partitions = self . num_partitions ;
419
471
let ( end_skip_id, merged_write_set) = self . merge_write_set ( ) ;
420
- self . metrics . skip_validation_cnt . increment ( ( end_skip_id - self . num_finality_txs ) as u64 ) ;
472
+ self . metrics . skip_validation_cnt += ( end_skip_id - self . num_finality_txs ) as u64 ;
421
473
let miner_location = LocationAndType :: Basic ( self . coinbase ) ;
422
474
let miner_involved_txs = DashSet :: new ( ) ;
423
475
fork_join_util ( num_partitions, Some ( num_partitions) , |_, _, part| {
@@ -483,7 +535,7 @@ where
483
535
let mut max_execute_time = Duration :: from_secs ( 0 ) ;
484
536
for executor in & self . partition_executors {
485
537
let mut executor = executor. write ( ) . unwrap ( ) ;
486
- self . metrics . reusable_tx_cnt . increment ( executor. metrics . reusable_tx_cnt ) ;
538
+ self . metrics . reusable_tx_cnt += executor. metrics . reusable_tx_cnt ;
487
539
min_execute_time = min_execute_time. min ( executor. metrics . execute_time ) ;
488
540
max_execute_time = max_execute_time. max ( executor. metrics . execute_time ) ;
489
541
if executor. assigned_txs [ 0 ] == self . num_finality_txs &&
@@ -497,7 +549,7 @@ where
497
549
let mut conflict_tx_cnt = 0 ;
498
550
let mut unconfirmed_tx_cnt = 0 ;
499
551
let mut finality_tx_cnt = 0 ;
500
- self . metrics . partition_et_diff . set ( ( max_execute_time - min_execute_time) . as_nanos ( ) as f64 ) ;
552
+ self . metrics . partition_et_diff += ( max_execute_time - min_execute_time) . as_nanos ( ) as u64 ;
501
553
#[ allow( invalid_reference_casting) ]
502
554
let tx_states =
503
555
unsafe { & mut * ( & ( * self . tx_states ) as * const Vec < TxState > as * mut Vec < TxState > ) } ;
@@ -523,14 +575,14 @@ where
523
575
}
524
576
}
525
577
}
526
- self . metrics . conflict_tx_cnt . increment ( conflict_tx_cnt as u64 ) ;
527
- self . metrics . unconfirmed_tx_cnt . increment ( unconfirmed_tx_cnt as u64 ) ;
528
- self . metrics . finality_tx_cnt . increment ( finality_tx_cnt as u64 ) ;
578
+ self . metrics . conflict_tx_cnt += conflict_tx_cnt ;
579
+ self . metrics . unconfirmed_tx_cnt += unconfirmed_tx_cnt ;
580
+ self . metrics . finality_tx_cnt += finality_tx_cnt ;
529
581
info ! (
530
582
"Find continuous finality txs: conflict({}), unconfirmed({}), finality({})" ,
531
583
conflict_tx_cnt, unconfirmed_tx_cnt, finality_tx_cnt
532
584
) ;
533
- return Ok ( finality_tx_cnt) ;
585
+ return Ok ( finality_tx_cnt as usize ) ;
534
586
}
535
587
536
588
/// Commit the transition of the finality transactions, and update the minner's rewards.
@@ -586,7 +638,7 @@ where
586
638
database
587
639
. increment_balances ( vec ! [ ( self . coinbase, rewards) ] )
588
640
. map_err ( |err| GrevmError :: EvmError ( EVMError :: Database ( err) ) ) ?;
589
- self . metrics . commit_transition_time . increment ( start. elapsed ( ) . as_nanos ( ) as u64 ) ;
641
+ self . metrics . commit_transition_time += start. elapsed ( ) . as_nanos ( ) as u64 ;
590
642
Ok ( ( ) )
591
643
}
592
644
@@ -609,7 +661,7 @@ where
609
661
}
610
662
}
611
663
self . rewards_accumulators = Arc :: new ( rewards_accumulators) ;
612
- self . metrics . validate_time . increment ( start. elapsed ( ) . as_nanos ( ) as u64 ) ;
664
+ self . metrics . validate_time += start. elapsed ( ) . as_nanos ( ) as u64 ;
613
665
Ok ( ( ) )
614
666
}
615
667
@@ -638,8 +690,8 @@ where
638
690
#[ fastrace:: trace]
639
691
fn execute_remaining_sequential ( & mut self ) -> Result < ( ) , GrevmError < DB :: Error > > {
640
692
let start = Instant :: now ( ) ;
641
- self . metrics . sequential_execute_calls . increment ( 1 ) ;
642
- self . metrics . sequential_tx_cnt . increment ( ( self . txs . len ( ) - self . num_finality_txs ) as u64 ) ;
693
+ self . metrics . sequential_execute_calls += 1 ;
694
+ self . metrics . sequential_tx_cnt += ( self . txs . len ( ) - self . num_finality_txs ) as u64 ;
643
695
// MUST drop the `PartitionExecutor::scheduler_db` before get mut
644
696
self . partition_executors . clear ( ) ;
645
697
let database = Arc :: get_mut ( & mut self . database ) . unwrap ( ) ;
@@ -662,7 +714,7 @@ where
662
714
Err ( err) => return Err ( GrevmError :: EvmError ( err) ) ,
663
715
}
664
716
}
665
- self . metrics . sequential_execute_time . increment ( start. elapsed ( ) . as_nanos ( ) as u64 ) ;
717
+ self . metrics . sequential_execute_time += start. elapsed ( ) . as_nanos ( ) as u64 ;
666
718
Ok ( ( ) )
667
719
}
668
720
@@ -672,7 +724,7 @@ where
672
724
let hints = ParallelExecutionHints :: new ( self . tx_states . clone ( ) ) ;
673
725
hints. parse_hints ( self . txs . clone ( ) ) ;
674
726
self . tx_dependencies . init_tx_dependency ( self . tx_states . clone ( ) ) ;
675
- self . metrics . parse_hints_time . increment ( start. elapsed ( ) . as_nanos ( ) as u64 ) ;
727
+ self . metrics . parse_hints_time += start. elapsed ( ) . as_nanos ( ) as u64 ;
676
728
}
677
729
678
730
#[ fastrace:: trace]
@@ -689,7 +741,7 @@ where
689
741
self . num_partitions = num_partitions;
690
742
}
691
743
692
- self . metrics . total_tx_cnt . increment ( self . txs . len ( ) as u64 ) ;
744
+ self . metrics . total_tx_cnt += self . txs . len ( ) as u64 ;
693
745
let force_parallel = !force_sequential. unwrap_or ( true ) ; // adaptive false
694
746
let force_sequential = force_sequential. unwrap_or ( false ) ; // adaptive false
695
747
@@ -712,13 +764,14 @@ where
712
764
break ;
713
765
}
714
766
}
715
- self . metrics . parallel_tx_cnt . increment ( self . num_finality_txs as u64 ) ;
767
+ self . metrics . parallel_tx_cnt += self . num_finality_txs as u64 ;
716
768
}
717
769
718
770
if self . num_finality_txs < self . txs . len ( ) {
719
771
info ! ( "Sequential execute {} remaining txs" , self . txs. len( ) - self . num_finality_txs) ;
720
772
self . execute_remaining_sequential ( ) ?;
721
773
}
774
+ self . metrics . report ( ) ;
722
775
723
776
Ok ( ExecuteOutput { results : std:: mem:: take ( & mut self . results ) } )
724
777
}
0 commit comments