diff --git a/consensus/src/consensus_observer/common/metrics.rs b/consensus/src/consensus_observer/common/metrics.rs index 250a8034f18b8..e15eea03a1bfe 100644 --- a/consensus/src/consensus_observer/common/metrics.rs +++ b/consensus/src/consensus_observer/common/metrics.rs @@ -18,7 +18,9 @@ pub const CREATED_SUBSCRIPTION_LABEL: &str = "created_subscription"; pub const ORDERED_BLOCK_ENTRIES_LABEL: &str = "ordered_block_entries"; pub const ORDERED_BLOCK_LABEL: &str = "ordered_block"; pub const ORDERED_BLOCK_WITH_WINDOW_LABEL: &str = "ordered_block_with_window"; +pub const PENDING_BLOCK_ENTRIES_BY_HASH_LABEL: &str = "pending_block_by_hash_entries"; pub const PENDING_BLOCK_ENTRIES_LABEL: &str = "pending_block_entries"; +pub const PENDING_BLOCKS_BY_HASH_LABEL: &str = "pending_blocks_by_hash"; pub const PENDING_BLOCKS_LABEL: &str = "pending_blocks"; pub const STORED_PAYLOADS_LABEL: &str = "stored_payloads"; diff --git a/consensus/src/consensus_observer/observer/consensus_observer.rs b/consensus/src/consensus_observer/observer/consensus_observer.rs index ee1ae756c8715..3f51b9e551b41 100644 --- a/consensus/src/consensus_observer/observer/consensus_observer.rs +++ b/consensus/src/consensus_observer/observer/consensus_observer.rs @@ -734,7 +734,7 @@ impl ConsensusObserver { // Create a new pending block with metadata let observed_ordered_block = ObservedOrderedBlock::new(ordered_block); - let pending_block_with_metadata = PendingBlockWithMetadata::new( + let pending_block_with_metadata = PendingBlockWithMetadata::new_with_arc( peer_network_id, message_received_time, observed_ordered_block, @@ -756,11 +756,11 @@ impl ConsensusObserver { /// has been sanity checked and that all payloads exist. async fn process_ordered_block( &mut self, - pending_block_with_metadata: PendingBlockWithMetadata, + pending_block_with_metadata: Arc, ) { // Unpack the pending block let (peer_network_id, message_received_time, observed_ordered_block) = - pending_block_with_metadata.into_parts(); + pending_block_with_metadata.unpack(); let ordered_block = observed_ordered_block.ordered_block().clone(); // Verify the ordered block proof diff --git a/consensus/src/consensus_observer/observer/pending_blocks.rs b/consensus/src/consensus_observer/observer/pending_blocks.rs index 978c2baab290a..657d6e441d3cc 100644 --- a/consensus/src/consensus_observer/observer/pending_blocks.rs +++ b/consensus/src/consensus_observer/observer/pending_blocks.rs @@ -10,8 +10,9 @@ use crate::consensus_observer::{ observer::{execution_pool::ObservedOrderedBlock, payload_store::BlockPayloadStore}, }; use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId}; +use aptos_crypto::HashValue; use aptos_infallible::Mutex; -use aptos_logger::{info, warn}; +use aptos_logger::{error, info, warn}; use aptos_types::block_info::Round; use std::{ collections::{btree_map::Entry, BTreeMap}, @@ -28,24 +29,26 @@ pub struct PendingBlockWithMetadata { } impl PendingBlockWithMetadata { - pub fn new( + pub fn new_with_arc( peer_network_id: PeerNetworkId, block_receipt_time: Instant, observed_ordered_block: ObservedOrderedBlock, - ) -> Self { - Self { + ) -> Arc { + let pending_block_with_metadata = Self { peer_network_id, block_receipt_time, observed_ordered_block, - } + }; + Arc::new(pending_block_with_metadata) } - /// Unpacks the block with metadata into its components - pub fn into_parts(self) -> (PeerNetworkId, Instant, ObservedOrderedBlock) { + /// Unpacks the block with metadata into its components. + /// Note: this will copy/clone all components. + pub fn unpack(&self) -> (PeerNetworkId, Instant, ObservedOrderedBlock) { ( self.peer_network_id, self.block_receipt_time, - self.observed_ordered_block, + self.observed_ordered_block.clone(), ) } @@ -62,7 +65,12 @@ pub struct PendingBlockStore { // A map of ordered blocks that are without payloads. The key is // the (epoch, round) of the first block in the ordered block. - blocks_without_payloads: BTreeMap<(u64, Round), PendingBlockWithMetadata>, + blocks_without_payloads: BTreeMap<(u64, Round), Arc>, + + // A map of ordered blocks that are without payloads. The key is + // the hash of the first block in the ordered block. + // Note: this is the same as blocks_without_payloads, but with a different key. + blocks_without_payloads_by_hash: BTreeMap>, } impl PendingBlockStore { @@ -70,12 +78,14 @@ impl PendingBlockStore { Self { consensus_observer_config, blocks_without_payloads: BTreeMap::new(), + blocks_without_payloads_by_hash: BTreeMap::new(), } } /// Clears all missing blocks from the store pub fn clear_missing_blocks(&mut self) { self.blocks_without_payloads.clear(); + self.blocks_without_payloads_by_hash.clear(); } /// Returns true iff the store contains an entry for the given ordered block @@ -84,18 +94,28 @@ impl PendingBlockStore { let first_block = ordered_block.first_block(); let first_block_epoch_round = (first_block.epoch(), first_block.round()); - // Check if the block is already in the store + // Check if the block is already in the store by epoch and round self.blocks_without_payloads .contains_key(&first_block_epoch_round) } + /// Returns the pending block with the given hash (if it exists) + pub fn get_pending_block_by_hash( + &self, + block_hash: HashValue, + ) -> Option> { + self.blocks_without_payloads_by_hash + .get(&block_hash) + .cloned() + } + /// Inserts a pending block (without payloads) into the store - pub fn insert_pending_block(&mut self, pending_block_with_metadata: PendingBlockWithMetadata) { - // Get the epoch and round of the first block - let first_block = pending_block_with_metadata.ordered_block().first_block(); - let first_block_epoch_round = (first_block.epoch(), first_block.round()); + pub fn insert_pending_block(&mut self, pending_block: Arc) { + // Get the first block in the ordered blocks + let first_block = pending_block.ordered_block().first_block(); - // Insert the block into the store using the round of the first block + // Insert the block into the store using the epoch round of the first block + let first_block_epoch_round = (first_block.epoch(), first_block.round()); match self.blocks_without_payloads.entry(first_block_epoch_round) { Entry::Occupied(_) => { // The block is already in the store @@ -108,7 +128,25 @@ impl PendingBlockStore { }, Entry::Vacant(entry) => { // Insert the block into the store - entry.insert(pending_block_with_metadata); + entry.insert(pending_block.clone()); + }, + } + + // Insert the block into the hash store using the hash of the first block + let first_block_hash = first_block.id(); + match self.blocks_without_payloads_by_hash.entry(first_block_hash) { + Entry::Occupied(_) => { + // The block is already in the hash store + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "A pending block was already found for the given block hash: {:?}", + first_block_hash + )) + ); + }, + Entry::Vacant(entry) => { + // Insert the block into the hash store + entry.insert(pending_block); }, } @@ -119,32 +157,53 @@ impl PendingBlockStore { /// Garbage collects the pending blocks store by removing /// the oldest blocks if the store is too large. fn garbage_collect_pending_blocks(&mut self) { - // Calculate the number of blocks to remove + // Verify that both stores have the same number of entries. + // If not, log an error as this should never happen. let num_pending_blocks = self.blocks_without_payloads.len() as u64; + let num_pending_blocks_by_hash = self.blocks_without_payloads_by_hash.len() as u64; + if num_pending_blocks != num_pending_blocks_by_hash { + error!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "The pending block stores have different numbers of entries: {:?} and {:?} (by hash)", + num_pending_blocks, num_pending_blocks_by_hash + )) + ); + } + + // Calculate the number of blocks to remove let max_pending_blocks = self.consensus_observer_config.max_num_pending_blocks; let num_blocks_to_remove = num_pending_blocks.saturating_sub(max_pending_blocks); // Remove the oldest blocks if the store is too large for _ in 0..num_blocks_to_remove { - if let Some((oldest_epoch_round, _)) = self.blocks_without_payloads.pop_first() { + if let Some((oldest_epoch_round, pending_block)) = + self.blocks_without_payloads.pop_first() + { + // Log a warning message for the removed block warn!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "The pending block store is too large: {:?} blocks. Removing the block for the oldest epoch and round: {:?}", num_pending_blocks, oldest_epoch_round )) ); + + // Remove the block from the hash store + let first_block = pending_block.ordered_block().first_block(); + self.blocks_without_payloads_by_hash + .remove(&first_block.id()); } } } /// Removes and returns the block from the store that is now ready /// to be processed (after the new payload has been received). + // TODO: identify how this will work with execution pool blocks! pub fn remove_ready_block( &mut self, received_payload_epoch: u64, received_payload_round: Round, block_payload_store: Arc>, - ) -> Option { + ) -> Option> { // Calculate the round at which to split the blocks let split_round = received_payload_round.saturating_add(1); @@ -156,28 +215,23 @@ impl PendingBlockStore { // Check if the last block is ready (this should be the only ready block). // Any earlier blocks are considered out-of-date and will be dropped. let mut ready_block = None; - if let Some((epoch_and_round, pending_block_with_metadata)) = - self.blocks_without_payloads.pop_last() - { + if let Some((epoch_and_round, pending_block)) = self.blocks_without_payloads.pop_last() { // If all payloads exist for the block, then the block is ready if block_payload_store .lock() - .all_payloads_exist(pending_block_with_metadata.ordered_block().blocks()) + .all_payloads_exist(pending_block.ordered_block().blocks()) { - ready_block = Some(pending_block_with_metadata); + ready_block = Some(pending_block); } else { // Otherwise, check if we're still waiting for higher payloads for the block - let last_pending_block_round = pending_block_with_metadata - .ordered_block() - .last_block() - .round(); + let last_pending_block_round = pending_block.ordered_block().last_block().round(); if last_pending_block_round > received_payload_round { - blocks_at_higher_rounds.insert(epoch_and_round, pending_block_with_metadata); + blocks_at_higher_rounds.insert(epoch_and_round, pending_block); } } } - // Check if any out-of-date blocks were dropped + // Check if any out-of-date blocks are going to be dropped if !self.blocks_without_payloads.is_empty() { info!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( @@ -188,8 +242,16 @@ impl PendingBlockStore { ); } - // Update the pending blocks to only include the blocks at higher rounds + // Clear all blocks from the pending block stores + self.clear_missing_blocks(); + + // Update the pending block stores to only include the blocks at higher rounds self.blocks_without_payloads = blocks_at_higher_rounds; + for pending_block in self.blocks_without_payloads.values() { + let first_block = pending_block.ordered_block().first_block(); + self.blocks_without_payloads_by_hash + .insert(first_block.id(), pending_block.clone()); + } // Return the ready block (if one exists) ready_block @@ -205,6 +267,14 @@ impl PendingBlockStore { num_entries, ); + // Update the number of pending block by hash entries + let num_entries_by_hash = self.blocks_without_payloads_by_hash.len() as u64; + metrics::set_gauge_with_label( + &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, + metrics::PENDING_BLOCK_ENTRIES_BY_HASH_LABEL, + num_entries_by_hash, + ); + // Update the total number of pending blocks let num_pending_blocks = self .blocks_without_payloads @@ -217,6 +287,18 @@ impl PendingBlockStore { num_pending_blocks, ); + // Update the total number of pending blocks by hash + let num_pending_blocks_by_hash = self + .blocks_without_payloads_by_hash + .values() + .map(|block| block.ordered_block().blocks().len() as u64) + .sum(); + metrics::set_gauge_with_label( + &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, + metrics::PENDING_BLOCKS_BY_HASH_LABEL, + num_pending_blocks_by_hash, + ); + // Update the highest round for the pending blocks let highest_pending_round = self .blocks_without_payloads @@ -290,6 +372,12 @@ mod test { .lock() .blocks_without_payloads .is_empty()); + + // Verify that the hash store is now empty + assert!(pending_block_store + .lock() + .blocks_without_payloads_by_hash + .is_empty()); } #[test] @@ -317,9 +405,17 @@ mod test { // Verify that all blocks were inserted correctly for pending_block in &pending_blocks { + // Verify that the block is in the store assert!(pending_block_store .lock() .existing_pending_block(pending_block)); + + // Verify that the block is in the store by hash + let block_hash = pending_block.first_block().id(); + assert!(pending_block_store + .lock() + .get_pending_block_by_hash(block_hash) + .is_some()); } // Create a new block payload store and insert payloads for the second block @@ -346,9 +442,17 @@ mod test { // Verify that the first and second blocks are no longer in the store for pending_block in &pending_blocks[..2] { + // Verify that the block is not in the store assert!(!pending_block_store .lock() .existing_pending_block(pending_block)); + + // Verify that the block is not in the store by hash + let block_hash = pending_block.first_block().id(); + assert!(pending_block_store + .lock() + .get_pending_block_by_hash(block_hash) + .is_none()); } } @@ -468,14 +572,22 @@ mod test { &new_pending_block, ); - // Get the round of the oldest block (that was garbage collected) + // Get the oldest block (that was garbage collected) let oldest_block = pending_blocks.remove(0); - let oldest_block_round = oldest_block.first_block().round(); // Verify that the oldest block was garbage collected + let oldest_block_round = oldest_block.first_block().round(); let blocks_without_payloads = pending_block_store.lock().blocks_without_payloads.clone(); assert!(!blocks_without_payloads.contains_key(&(current_epoch, oldest_block_round))); + + // Verify that the oldest block was garbage collected by hash + let oldest_block_hash = oldest_block.first_block().id(); + let blocks_without_payloads_by_hash = pending_block_store + .lock() + .blocks_without_payloads_by_hash + .clone(); + assert!(!blocks_without_payloads_by_hash.contains_key(&oldest_block_hash)); } // Insert multiple blocks into the store (for the next epoch) and @@ -499,14 +611,22 @@ mod test { &new_pending_block, ); - // Get the round of the oldest block (that was garbage collected) + // Get the oldest block (that was garbage collected) let oldest_block = pending_blocks.remove(0); - let oldest_block_round = oldest_block.first_block().round(); // Verify that the oldest block was garbage collected + let oldest_block_round = oldest_block.first_block().round(); let blocks_without_payloads = pending_block_store.lock().blocks_without_payloads.clone(); assert!(!blocks_without_payloads.contains_key(&(current_epoch, oldest_block_round))); + + // Verify that the oldest block was garbage collected by hash + let oldest_block_hash = oldest_block.first_block().id(); + let blocks_without_payloads_by_hash = pending_block_store + .lock() + .blocks_without_payloads_by_hash + .clone(); + assert!(!blocks_without_payloads_by_hash.contains_key(&oldest_block_hash)); } } @@ -530,7 +650,7 @@ mod test { let observed_ordered_block = ObservedOrderedBlock::new(ordered_block.clone()); // Create a pending block with metadata - let pending_block_with_metadata = PendingBlockWithMetadata::new( + let pending_block_with_metadata = PendingBlockWithMetadata::new_with_arc( PeerNetworkId::random(), Instant::now(), observed_ordered_block.clone(), @@ -560,7 +680,7 @@ mod test { for expected_block_with_metadata in pending_blocks_with_metadata { // Unpack the expected block with metadata into its components let (expected_peer_network_id, expected_block_receipt_time, expected_ordered_block) = - expected_block_with_metadata.into_parts(); + expected_block_with_metadata.unpack(); // Remove the pending block from the store let first_block = expected_ordered_block.ordered_block().first_block(); @@ -919,7 +1039,7 @@ mod test { ObservedOrderedBlock::new_for_testing(ordered_block.clone()); // Create a pending block with metadata - let pending_block_with_metadata = PendingBlockWithMetadata::new( + let pending_block_with_metadata = PendingBlockWithMetadata::new_with_arc( PeerNetworkId::random(), Instant::now(), observed_ordered_block, @@ -1014,7 +1134,16 @@ mod test { num_expected_blocks ); - // Check that all pending blocks are in the store + // Check the number of pending blocks by hash + assert_eq!( + pending_block_store + .lock() + .blocks_without_payloads_by_hash + .len(), + num_expected_blocks + ); + + // Check that all pending blocks are in the stores for pending_block in pending_blocks { // Lock the pending block store let pending_block_store = pending_block_store.lock(); @@ -1028,6 +1157,16 @@ mod test { // Verify that the pending block is in the store assert_eq!(block_in_store.ordered_block(), pending_block); + + // Get the pending block in the store by hash + let first_block_hash = first_block.id(); + let block_in_store = pending_block_store + .blocks_without_payloads_by_hash + .get(&first_block_hash) + .unwrap(); + + // Verify that the pending block is in the store by hash + assert_eq!(block_in_store.ordered_block(), pending_block); } } }