44
55use super :: Consensus ;
66use crate :: poll_nested_stream_with_budget;
7+ use :: metrics:: counter;
78use alloy_provider:: Provider ;
89use futures:: StreamExt ;
910use reth_chainspec:: EthChainSpec ;
@@ -54,6 +55,9 @@ mod event;
5455pub use event:: RollupManagerEvent ;
5556
5657mod handle;
58+ mod metrics;
59+
60+ use crate :: manager:: metrics:: RollupNodeManagerMetrics ;
5761pub use handle:: RollupManagerHandle ;
5862
5963/// The size of the event channel.
@@ -114,6 +118,8 @@ pub struct RollupNodeManager<
114118 block_building_trigger : Option < Interval > ,
115119 /// The original block time configuration for restoring automatic sequencing.
116120 block_time_config : Option < u64 > ,
121+ // metrics for the rollup node manager.
122+ metrics : RollupNodeManagerMetrics ,
117123}
118124
119125/// The current status of the rollup manager.
@@ -200,6 +206,7 @@ where
200206 None
201207 } ,
202208 block_time_config : block_time,
209+ metrics : RollupNodeManagerMetrics :: default ( ) ,
203210 } ;
204211 ( rnm, RollupManagerHandle :: new ( handle_tx) )
205212 }
@@ -283,10 +290,12 @@ where
283290 // Remove once we implement issue #273.
284291 // Update the derivation pipeline on new finalized batch.
285292 for batch_info in finalized_batches {
293+ self . metrics . handle_finalized_batch_index . set ( batch_info. index as f64 ) ;
286294 self . derivation_pipeline . push_batch ( batch_info, block_number) ;
287295 }
288296 }
289297 ChainOrchestratorEvent :: L1BlockFinalized ( l1_block_number, finalized_batches, ..) => {
298+ self . metrics . handle_l1_finalized_block_number . set ( l1_block_number as f64 ) ;
290299 // update the sequencer's l1 finalized block number.
291300 if let Some ( sequencer) = self . sequencer . as_mut ( ) {
292301 sequencer. set_l1_finalized_block_number ( l1_block_number) ;
@@ -308,6 +317,14 @@ where
308317 l2_head_block_info,
309318 l2_safe_block_info,
310319 } => {
320+ self . metrics . handle_l1_reorg_l1_block_number . set ( l1_block_number as f64 ) ;
321+ self . metrics
322+ . handle_l1_reorg_l2_head_block_number
323+ . set ( l2_head_block_info. as_ref ( ) . map_or ( 0 , |info| info. number ) as f64 ) ;
324+ self . metrics
325+ . handle_l1_reorg_l2_safe_block_number
326+ . set ( l2_safe_block_info. as_ref ( ) . map_or ( 0 , |info| info. number ) as f64 ) ;
327+
311328 // Handle the reorg in the engine driver.
312329 self . engine . handle_l1_reorg (
313330 l1_block_number,
@@ -328,11 +345,17 @@ where
328345 }
329346 }
330347 ChainOrchestratorEvent :: ChainExtended ( chain_import) => {
348+ self . metrics
349+ . handle_chain_import_block_number
350+ . set ( chain_import. chain . last ( ) . unwrap ( ) . number as f64 ) ;
331351 trace ! ( target: "scroll::node::manager" , head = ?chain_import. chain. last( ) . unwrap( ) . header. clone( ) , peer_id = ?chain_import. peer_id. clone( ) , "Received chain extension from peer" ) ;
332352 // Issue the new chain to the engine driver for processing.
333353 self . engine . handle_chain_import ( chain_import)
334354 }
335355 ChainOrchestratorEvent :: ChainReorged ( chain_import) => {
356+ self . metrics
357+ . handle_chain_import_block_number
358+ . set ( chain_import. chain . last ( ) . unwrap ( ) . number as f64 ) ;
336359 trace ! ( target: "scroll::node::manager" , head = ?chain_import. chain. last( ) . unwrap( ) . header, ?chain_import. peer_id, "Received chain reorg from peer" ) ;
337360
338361 // Issue the new chain to the engine driver for processing.
@@ -342,6 +365,8 @@ where
342365 let block_info: BlockInfo = ( & block) . into ( ) ;
343366 trace ! ( target: "scroll::node::manager" , ?block_info, "Received optimistic sync from peer" ) ;
344367
368+ self . metrics . handle_optimistic_syncing_block_number . set ( block_info. number as f64 ) ;
369+
345370 // Issue the new block info to the engine driver for processing.
346371 self . engine . handle_optimistic_sync ( block_info)
347372 }
@@ -360,6 +385,12 @@ where
360385
361386 match err {
362387 ChainOrchestratorError :: L1MessageMismatch { expected, actual } => {
388+ counter ! (
389+ "manager_handle_chain_orchestrator_event_failed" ,
390+ "type" => "l1_message_mismatch" ,
391+ )
392+ . increment ( 1 ) ;
393+
363394 if let Some ( event_sender) = self . event_sender . as_ref ( ) {
364395 event_sender. notify ( RollupManagerEvent :: L1MessageConsolidationError {
365396 expected : * expected,
@@ -368,6 +399,12 @@ where
368399 }
369400 }
370401 ChainOrchestratorError :: DatabaseError ( DatabaseError :: L1MessageNotFound ( start) ) => {
402+ counter ! (
403+ "manager_handle_chain_orchestrator_event_failed" ,
404+ "type" => "l1_message_not_found" ,
405+ )
406+ . increment ( 1 ) ;
407+
371408 if let Some ( event_sender) = self . event_sender . as_ref ( ) {
372409 event_sender. notify ( RollupManagerEvent :: L1MessageMissingInDatabase {
373410 start : start. clone ( ) ,
@@ -428,6 +465,7 @@ where
428465
429466 /// Handles an [`L1Notification`] from the L1 watcher.
430467 fn handle_l1_notification ( & mut self , notification : L1Notification ) {
468+ self . metrics . handle_l1_notification . increment ( 1 ) ;
431469 if let Some ( event_sender) = self . event_sender . as_ref ( ) {
432470 event_sender. notify ( RollupManagerEvent :: L1NotificationEvent ( notification. clone ( ) ) ) ;
433471 }
@@ -499,6 +537,7 @@ where
499537
500538 // Poll the handle receiver for commands.
501539 while let Poll :: Ready ( Some ( command) ) = this. handle_rx . poll_recv ( cx) {
540+ this. metrics . handle_rollup_manager_command . increment ( 1 ) ;
502541 match command {
503542 RollupManagerCommand :: BuildBlock => {
504543 proceed_if ! (
@@ -550,6 +589,7 @@ where
550589
551590 // Drain all EngineDriver events.
552591 while let Poll :: Ready ( Some ( event) ) = this. engine . poll_next_unpin ( cx) {
592+ this. metrics . handle_engine_driver_event . increment ( 1 ) ;
553593 this. handle_engine_driver_event ( event) ;
554594 }
555595
@@ -559,6 +599,7 @@ where
559599 if let Some ( Poll :: Ready ( Some ( attributes) ) ) =
560600 this. sequencer. as_mut( ) . map( |x| x. poll_next_unpin( cx) )
561601 {
602+ this. metrics. handle_new_block_produced. increment( 1 ) ;
562603 this. engine. handle_build_new_payload( attributes) ;
563604 }
564605 ) ;
@@ -580,6 +621,7 @@ where
580621
581622 // Drain all chain orchestrator events.
582623 while let Poll :: Ready ( Some ( result) ) = this. chain . poll_next_unpin ( cx) {
624+ this. metrics . handle_chain_orchestrator_event . increment ( 1 ) ;
583625 match result {
584626 Ok ( event) => this. handle_chain_orchestrator_event ( event) ,
585627 Err ( err) => {
@@ -592,6 +634,7 @@ where
592634 while let Some ( Poll :: Ready ( Some ( event) ) ) =
593635 this. signer . as_mut ( ) . map ( |s| s. poll_next_unpin ( cx) )
594636 {
637+ this. metrics . handle_signer_event . increment ( 1 ) ;
595638 match event {
596639 SignerEvent :: SignedBlock { block, signature } => {
597640 trace ! ( target: "scroll::node::manager" , ?block, ?signature, "Received signed block from signer, announcing to the network" ) ;
@@ -619,6 +662,7 @@ where
619662 this. block_building_trigger. as_mut( ) . map( |trigger| trigger. poll_tick( cx) ) ,
620663 this. sequencer. as_mut( )
621664 ) {
665+ this. metrics. handle_build_new_payload. increment( 1 ) ;
622666 if !this. consensus. should_sequence_block(
623667 this. signer
624668 . as_ref( )
@@ -636,11 +680,13 @@ where
636680
637681 // Poll Derivation Pipeline and push attribute in queue if any.
638682 while let Poll :: Ready ( Some ( attributes) ) = this. derivation_pipeline . poll_next_unpin ( cx) {
683+ this. metrics . handle_l1_consolidation . increment ( 1 ) ;
639684 this. engine . handle_l1_consolidation ( attributes)
640685 }
641686
642687 // Handle network manager events.
643688 while let Poll :: Ready ( Some ( event) ) = this. network . poll_next_unpin ( cx) {
689+ this. metrics . handle_network_manager_event . increment ( 1 ) ;
644690 this. handle_network_manager_event ( event) ;
645691 }
646692
0 commit comments