diff --git a/consensus/src/consensus_observer/common/metrics.rs b/consensus/src/consensus_observer/common/metrics.rs index 250a8034f18b8..e15eea03a1bfe 100644 --- a/consensus/src/consensus_observer/common/metrics.rs +++ b/consensus/src/consensus_observer/common/metrics.rs @@ -18,7 +18,9 @@ pub const CREATED_SUBSCRIPTION_LABEL: &str = "created_subscription"; pub const ORDERED_BLOCK_ENTRIES_LABEL: &str = "ordered_block_entries"; pub const ORDERED_BLOCK_LABEL: &str = "ordered_block"; pub const ORDERED_BLOCK_WITH_WINDOW_LABEL: &str = "ordered_block_with_window"; +pub const PENDING_BLOCK_ENTRIES_BY_HASH_LABEL: &str = "pending_block_by_hash_entries"; pub const PENDING_BLOCK_ENTRIES_LABEL: &str = "pending_block_entries"; +pub const PENDING_BLOCKS_BY_HASH_LABEL: &str = "pending_blocks_by_hash"; pub const PENDING_BLOCKS_LABEL: &str = "pending_blocks"; pub const STORED_PAYLOADS_LABEL: &str = "stored_payloads"; diff --git a/consensus/src/consensus_observer/observer/active_state.rs b/consensus/src/consensus_observer/observer/active_state.rs index e044f109a98d0..55e118c9f3a8e 100644 --- a/consensus/src/consensus_observer/observer/active_state.rs +++ b/consensus/src/consensus_observer/observer/active_state.rs @@ -353,8 +353,9 @@ fn handle_committed_blocks( #[cfg(test)] mod test { use super::*; - use crate::consensus_observer::network::observer_message::{ - BlockPayload, BlockTransactionPayload, OrderedBlock, + use crate::consensus_observer::{ + network::observer_message::{BlockPayload, BlockTransactionPayload, OrderedBlock}, + observer::execution_pool::ObservedOrderedBlock, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_consensus_types::{ @@ -573,10 +574,14 @@ mod test { create_ledger_info(epoch, i as aptos_consensus_types::common::Round); let ordered_block = OrderedBlock::new(blocks, ordered_proof); + // Create an observed ordered block + let observed_ordered_block = + ObservedOrderedBlock::new_for_testing(ordered_block.clone()); + // Insert the block into the ordered block store ordered_block_store .lock() - .insert_ordered_block(ordered_block.clone()); + .insert_ordered_block(observed_ordered_block.clone()); // Add the block to the ordered blocks ordered_blocks.push(ordered_block); diff --git a/consensus/src/consensus_observer/observer/consensus_observer.rs b/consensus/src/consensus_observer/observer/consensus_observer.rs index ce6f39d84d728..3f51b9e551b41 100644 --- a/consensus/src/consensus_observer/observer/consensus_observer.rs +++ b/consensus/src/consensus_observer/observer/consensus_observer.rs @@ -17,6 +17,7 @@ use crate::{ }, observer::{ active_state::ActiveObserverState, + execution_pool::ObservedOrderedBlock, fallback_manager::ObserverFallbackManager, ordered_blocks::OrderedBlockStore, payload_store::BlockPayloadStore, @@ -732,8 +733,12 @@ impl ConsensusObserver { update_metrics_for_ordered_block_message(peer_network_id, &ordered_block); // Create a new pending block with metadata - let pending_block_with_metadata = - PendingBlockWithMetadata::new(peer_network_id, message_received_time, ordered_block); + let observed_ordered_block = ObservedOrderedBlock::new(ordered_block); + let pending_block_with_metadata = PendingBlockWithMetadata::new_with_arc( + peer_network_id, + message_received_time, + observed_ordered_block, + ); // If all payloads exist, process the block. Otherwise, store it // in the pending block store and wait for the payloads to arrive. @@ -751,11 +756,12 @@ impl ConsensusObserver { /// has been sanity checked and that all payloads exist. async fn process_ordered_block( &mut self, - pending_block_with_metadata: PendingBlockWithMetadata, + pending_block_with_metadata: Arc, ) { // Unpack the pending block - let (peer_network_id, message_received_time, ordered_block) = - pending_block_with_metadata.into_parts(); + let (peer_network_id, message_received_time, observed_ordered_block) = + pending_block_with_metadata.unpack(); + let ordered_block = observed_ordered_block.ordered_block().clone(); // Verify the ordered block proof let epoch_state = self.get_epoch_state(); @@ -831,7 +837,7 @@ impl ConsensusObserver { // Insert the ordered block into the pending blocks self.ordered_block_store .lock() - .insert_ordered_block(ordered_block.clone()); + .insert_ordered_block(observed_ordered_block.clone()); // If state sync is not syncing to a commit, finalize the ordered blocks if !self.state_sync_manager.is_syncing_to_commit() { @@ -1079,8 +1085,9 @@ impl ConsensusObserver { // Process all the newly ordered blocks let all_ordered_blocks = self.ordered_block_store.lock().get_all_ordered_blocks(); - for (_, (ordered_block, commit_decision)) in all_ordered_blocks { + for (_, (observed_ordered_block, commit_decision)) in all_ordered_blocks { // Finalize the ordered block + let ordered_block = observed_ordered_block.consume_ordered_block(); self.finalize_ordered_block(ordered_block).await; // If a commit decision is available, forward it to the execution pipeline diff --git a/consensus/src/consensus_observer/observer/execution_pool.rs b/consensus/src/consensus_observer/observer/execution_pool.rs new file mode 100644 index 0000000000000..2430861e850aa --- /dev/null +++ b/consensus/src/consensus_observer/observer/execution_pool.rs @@ -0,0 +1,64 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(test)] +use crate::consensus_observer::network::observer_message::ExecutionPoolWindow; +use crate::consensus_observer::network::observer_message::{OrderedBlock, OrderedBlockWithWindow}; +#[cfg(test)] +use rand::{rngs::OsRng, Rng}; + +/// A simple enum wrapper that holds an observed ordered block, allowing +/// self-contained ordered blocks and ordered blocks with execution pool windows. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ObservedOrderedBlock { + Ordered(OrderedBlock), + OrderedWithWindow(OrderedBlockWithWindow), +} + +impl ObservedOrderedBlock { + /// Creates a new observed ordered block + pub fn new(ordered_block: OrderedBlock) -> Self { + Self::Ordered(ordered_block) + } + + /// Creates a new observed ordered block with window + pub fn new_with_window(ordered_block_with_window: OrderedBlockWithWindow) -> Self { + Self::OrderedWithWindow(ordered_block_with_window) + } + + #[cfg(test)] + /// Creates a new observed ordered block for testing. + /// Note: the observed type is determined randomly. + pub fn new_for_testing(ordered_block: OrderedBlock) -> Self { + if OsRng.gen::() % 2 == 0 { + ObservedOrderedBlock::new(ordered_block.clone()) + } else { + let ordered_block_with_window = OrderedBlockWithWindow::new( + ordered_block.clone(), + ExecutionPoolWindow::new(vec![]), + ); + ObservedOrderedBlock::new_with_window(ordered_block_with_window) + } + } + + /// Consumes the observed ordered block and returns the inner ordered block + pub fn consume_ordered_block(self) -> OrderedBlock { + match self { + Self::Ordered(ordered_block) => ordered_block, + Self::OrderedWithWindow(ordered_block_with_window) => { + let (ordered_block, _) = ordered_block_with_window.into_parts(); + ordered_block + }, + } + } + + /// Returns a reference to the inner ordered block + pub fn ordered_block(&self) -> &OrderedBlock { + match self { + Self::Ordered(ordered_block) => ordered_block, + Self::OrderedWithWindow(ordered_block_with_window) => { + ordered_block_with_window.ordered_block() + }, + } + } +} diff --git a/consensus/src/consensus_observer/observer/mod.rs b/consensus/src/consensus_observer/observer/mod.rs index 757b9dc4b1e18..e63b362fe3ffd 100644 --- a/consensus/src/consensus_observer/observer/mod.rs +++ b/consensus/src/consensus_observer/observer/mod.rs @@ -3,6 +3,7 @@ pub mod active_state; pub mod consensus_observer; +pub mod execution_pool; pub mod fallback_manager; pub mod ordered_blocks; pub mod payload_store; diff --git a/consensus/src/consensus_observer/observer/ordered_blocks.rs b/consensus/src/consensus_observer/observer/ordered_blocks.rs index 1a13d92e0a85a..fe9a27d8e4fdb 100644 --- a/consensus/src/consensus_observer/observer/ordered_blocks.rs +++ b/consensus/src/consensus_observer/observer/ordered_blocks.rs @@ -7,6 +7,7 @@ use crate::consensus_observer::{ metrics, }, network::observer_message::{CommitDecision, OrderedBlock}, + observer::execution_pool::ObservedOrderedBlock, }; use aptos_config::config::ConsensusObserverConfig; use aptos_consensus_types::{common::Round, pipelined_block::PipelinedBlock}; @@ -24,7 +25,7 @@ pub struct OrderedBlockStore { // Ordered blocks. The key is the epoch and round of the last block in the // ordered block. Each entry contains the block and the commit decision (if any). - ordered_blocks: BTreeMap<(u64, Round), (OrderedBlock, Option)>, + ordered_blocks: BTreeMap<(u64, Round), (ObservedOrderedBlock, Option)>, } impl OrderedBlockStore { @@ -44,7 +45,7 @@ impl OrderedBlockStore { /// Returns a copy of the ordered blocks pub fn get_all_ordered_blocks( &self, - ) -> BTreeMap<(u64, Round), (OrderedBlock, Option)> { + ) -> BTreeMap<(u64, Round), (ObservedOrderedBlock, Option)> { self.ordered_blocks.clone() } @@ -57,20 +58,22 @@ impl OrderedBlockStore { pub fn get_last_ordered_block(&self) -> Option> { self.ordered_blocks .last_key_value() - .map(|(_, (ordered_block, _))| ordered_block.last_block()) + .map(|(_, (observed_ordered_block, _))| { + observed_ordered_block.ordered_block().last_block() + }) } /// Returns the ordered block for the given epoch and round (if any) pub fn get_ordered_block(&self, epoch: u64, round: Round) -> Option { self.ordered_blocks .get(&(epoch, round)) - .map(|(ordered_block, _)| ordered_block.clone()) + .map(|(observed_ordered_block, _)| observed_ordered_block.ordered_block().clone()) } /// Inserts the given ordered block into the ordered blocks. This function /// assumes the block has already been checked to extend the current ordered /// blocks, and that the ordered proof has been verified. - pub fn insert_ordered_block(&mut self, ordered_block: OrderedBlock) { + pub fn insert_ordered_block(&mut self, observed_ordered_block: ObservedOrderedBlock) { // Verify that the number of ordered blocks doesn't exceed the maximum let max_num_ordered_blocks = self.consensus_observer_config.max_num_pending_blocks as usize; if self.ordered_blocks.len() >= max_num_ordered_blocks { @@ -78,7 +81,7 @@ impl OrderedBlockStore { LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Exceeded the maximum number of ordered blocks: {:?}. Dropping block: {:?}.", max_num_ordered_blocks, - ordered_block.proof_block_info() + observed_ordered_block.ordered_block().proof_block_info() )) ); return; // Drop the block if we've exceeded the maximum @@ -88,18 +91,20 @@ impl OrderedBlockStore { debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Adding ordered block to the ordered blocks: {:?}", - ordered_block.proof_block_info() + observed_ordered_block.ordered_block().proof_block_info() )) ); // Get the epoch and round of the last ordered block - let last_block = ordered_block.last_block(); + let last_block = observed_ordered_block.ordered_block().last_block(); let last_block_epoch = last_block.epoch(); let last_block_round = last_block.round(); // Insert the ordered block - self.ordered_blocks - .insert((last_block_epoch, last_block_round), (ordered_block, None)); + self.ordered_blocks.insert( + (last_block_epoch, last_block_round), + (observed_ordered_block, None), + ); } /// Removes the ordered blocks for the given commit ledger info. This will @@ -173,7 +178,9 @@ impl OrderedBlockStore { let num_ordered_blocks = self .ordered_blocks .values() - .map(|(ordered_block, _)| ordered_block.blocks().len() as u64) + .map(|(observed_ordered_block, _)| { + observed_ordered_block.ordered_block().blocks().len() as u64 + }) .sum(); metrics::set_gauge_with_label( &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, @@ -185,7 +192,9 @@ impl OrderedBlockStore { let highest_ordered_round = self .ordered_blocks .last_key_value() - .map(|(_, (ordered_block, _))| ordered_block.last_block().round()) + .map(|(_, (observed_ordered_block, _))| { + observed_ordered_block.ordered_block().last_block().round() + }) .unwrap_or(0); metrics::set_gauge_with_label( &metrics::OBSERVER_PROCESSED_BLOCK_ROUNDS, @@ -725,8 +734,12 @@ mod test { let ordered_proof = create_ledger_info(epoch, i as Round); let ordered_block = OrderedBlock::new(blocks, ordered_proof); + // Create an observed ordered block + let observed_ordered_block = + ObservedOrderedBlock::new_for_testing(ordered_block.clone()); + // Insert the block into the ordered block store - ordered_block_store.insert_ordered_block(ordered_block.clone()); + ordered_block_store.insert_ordered_block(observed_ordered_block.clone()); // Add the block to the ordered blocks ordered_blocks.push(ordered_block); diff --git a/consensus/src/consensus_observer/observer/pending_blocks.rs b/consensus/src/consensus_observer/observer/pending_blocks.rs index 01b54a369cac6..3874f4b84b420 100644 --- a/consensus/src/consensus_observer/observer/pending_blocks.rs +++ b/consensus/src/consensus_observer/observer/pending_blocks.rs @@ -7,11 +7,12 @@ use crate::consensus_observer::{ metrics, }, network::observer_message::OrderedBlock, - observer::payload_store::BlockPayloadStore, + observer::{execution_pool::ObservedOrderedBlock, payload_store::BlockPayloadStore}, }; use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId}; +use aptos_crypto::HashValue; use aptos_infallible::Mutex; -use aptos_logger::{info, warn}; +use aptos_logger::{error, info, warn}; use aptos_types::block_info::Round; use std::{ collections::{btree_map::Entry, BTreeMap}, @@ -24,34 +25,36 @@ use std::{ pub struct PendingBlockWithMetadata { peer_network_id: PeerNetworkId, // The peer network ID of the block sender block_receipt_time: Instant, // The time the block was received - ordered_block: OrderedBlock, // The ordered block + observed_ordered_block: ObservedOrderedBlock, // The observed ordered block } impl PendingBlockWithMetadata { - pub fn new( + pub fn new_with_arc( peer_network_id: PeerNetworkId, block_receipt_time: Instant, - ordered_block: OrderedBlock, - ) -> Self { - Self { + observed_ordered_block: ObservedOrderedBlock, + ) -> Arc { + let pending_block_with_metadata = Self { peer_network_id, block_receipt_time, - ordered_block, - } + observed_ordered_block, + }; + Arc::new(pending_block_with_metadata) } - /// Unpacks the block with metadata into its components - pub fn into_parts(self) -> (PeerNetworkId, Instant, OrderedBlock) { + /// Unpacks the block with metadata into its components. + /// Note: this will copy/clone all components. + pub fn unpack(&self) -> (PeerNetworkId, Instant, ObservedOrderedBlock) { ( self.peer_network_id, self.block_receipt_time, - self.ordered_block, + self.observed_ordered_block.clone(), ) } /// Returns a reference to the ordered block pub fn ordered_block(&self) -> &OrderedBlock { - &self.ordered_block + self.observed_ordered_block.ordered_block() } } @@ -62,7 +65,12 @@ pub struct PendingBlockStore { // A map of ordered blocks that are without payloads. The key is // the (epoch, round) of the first block in the ordered block. - blocks_without_payloads: BTreeMap<(u64, Round), PendingBlockWithMetadata>, + blocks_without_payloads: BTreeMap<(u64, Round), Arc>, + + // A map of ordered blocks that are without payloads. The key is + // the hash of the first block in the ordered block. + // Note: this is the same as blocks_without_payloads, but with a different key. + blocks_without_payloads_by_hash: BTreeMap>, } impl PendingBlockStore { @@ -70,12 +78,14 @@ impl PendingBlockStore { Self { consensus_observer_config, blocks_without_payloads: BTreeMap::new(), + blocks_without_payloads_by_hash: BTreeMap::new(), } } /// Clears all missing blocks from the store pub fn clear_missing_blocks(&mut self) { self.blocks_without_payloads.clear(); + self.blocks_without_payloads_by_hash.clear(); } /// Returns true iff the store contains an entry for the given ordered block @@ -84,18 +94,28 @@ impl PendingBlockStore { let first_block = ordered_block.first_block(); let first_block_epoch_round = (first_block.epoch(), first_block.round()); - // Check if the block is already in the store + // Check if the block is already in the store by epoch and round self.blocks_without_payloads .contains_key(&first_block_epoch_round) } + /// Returns the pending block with the given hash (if it exists) + pub fn get_pending_block_by_hash( + &self, + block_hash: HashValue, + ) -> Option> { + self.blocks_without_payloads_by_hash + .get(&block_hash) + .cloned() + } + /// Inserts a pending block (without payloads) into the store - pub fn insert_pending_block(&mut self, pending_block_with_metadata: PendingBlockWithMetadata) { - // Get the epoch and round of the first block - let first_block = pending_block_with_metadata.ordered_block().first_block(); - let first_block_epoch_round = (first_block.epoch(), first_block.round()); + pub fn insert_pending_block(&mut self, pending_block: Arc) { + // Get the first block in the ordered blocks + let first_block = pending_block.ordered_block().first_block(); - // Insert the block into the store using the round of the first block + // Insert the block into the store using the epoch round of the first block + let first_block_epoch_round = (first_block.epoch(), first_block.round()); match self.blocks_without_payloads.entry(first_block_epoch_round) { Entry::Occupied(_) => { // The block is already in the store @@ -108,7 +128,25 @@ impl PendingBlockStore { }, Entry::Vacant(entry) => { // Insert the block into the store - entry.insert(pending_block_with_metadata); + entry.insert(pending_block.clone()); + }, + } + + // Insert the block into the hash store using the hash of the first block + let first_block_hash = first_block.id(); + match self.blocks_without_payloads_by_hash.entry(first_block_hash) { + Entry::Occupied(_) => { + // The block is already in the hash store + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "A pending block was already found for the given block hash: {:?}", + first_block_hash + )) + ); + }, + Entry::Vacant(entry) => { + // Insert the block into the hash store + entry.insert(pending_block); }, } @@ -119,32 +157,53 @@ impl PendingBlockStore { /// Garbage collects the pending blocks store by removing /// the oldest blocks if the store is too large. fn garbage_collect_pending_blocks(&mut self) { - // Calculate the number of blocks to remove + // Verify that both stores have the same number of entries. + // If not, log an error as this should never happen. let num_pending_blocks = self.blocks_without_payloads.len() as u64; + let num_pending_blocks_by_hash = self.blocks_without_payloads_by_hash.len() as u64; + if num_pending_blocks != num_pending_blocks_by_hash { + error!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "The pending block stores have different numbers of entries: {:?} and {:?} (by hash)", + num_pending_blocks, num_pending_blocks_by_hash + )) + ); + } + + // Calculate the number of blocks to remove let max_pending_blocks = self.consensus_observer_config.max_num_pending_blocks; let num_blocks_to_remove = num_pending_blocks.saturating_sub(max_pending_blocks); // Remove the oldest blocks if the store is too large for _ in 0..num_blocks_to_remove { - if let Some((oldest_epoch_round, _)) = self.blocks_without_payloads.pop_first() { + if let Some((oldest_epoch_round, pending_block)) = + self.blocks_without_payloads.pop_first() + { + // Log a warning message for the removed block warn!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "The pending block store is too large: {:?} blocks. Removing the block for the oldest epoch and round: {:?}", num_pending_blocks, oldest_epoch_round )) ); + + // Remove the block from the hash store + let first_block = pending_block.ordered_block().first_block(); + self.blocks_without_payloads_by_hash + .remove(&first_block.id()); } } } /// Removes and returns the block from the store that is now ready /// to be processed (after the new payload has been received). + // TODO: identify how this will work with execution pool blocks! pub fn remove_ready_block( &mut self, received_payload_epoch: u64, received_payload_round: Round, block_payload_store: Arc>, - ) -> Option { + ) -> Option> { // Calculate the round at which to split the blocks let split_round = received_payload_round.saturating_add(1); @@ -156,28 +215,23 @@ impl PendingBlockStore { // Check if the last block is ready (this should be the only ready block). // Any earlier blocks are considered out-of-date and will be dropped. let mut ready_block = None; - if let Some((epoch_and_round, pending_block_with_metadata)) = - self.blocks_without_payloads.pop_last() - { + if let Some((epoch_and_round, pending_block)) = self.blocks_without_payloads.pop_last() { // If all payloads exist for the block, then the block is ready if block_payload_store .lock() - .all_payloads_exist(pending_block_with_metadata.ordered_block().blocks()) + .all_payloads_exist(pending_block.ordered_block().blocks()) { - ready_block = Some(pending_block_with_metadata); + ready_block = Some(pending_block); } else { // Otherwise, check if we're still waiting for higher payloads for the block - let last_pending_block_round = pending_block_with_metadata - .ordered_block() - .last_block() - .round(); + let last_pending_block_round = pending_block.ordered_block().last_block().round(); if last_pending_block_round > received_payload_round { - blocks_at_higher_rounds.insert(epoch_and_round, pending_block_with_metadata); + blocks_at_higher_rounds.insert(epoch_and_round, pending_block); } } } - // Check if any out-of-date blocks were dropped + // Check if any out-of-date blocks are going to be dropped if !self.blocks_without_payloads.is_empty() { info!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( @@ -188,8 +242,18 @@ impl PendingBlockStore { ); } - // Update the pending blocks to only include the blocks at higher rounds + // TODO: optimize this flow! + + // Clear all blocks from the pending block stores + self.clear_missing_blocks(); + + // Update the pending block stores to only include the blocks at higher rounds self.blocks_without_payloads = blocks_at_higher_rounds; + for pending_block in self.blocks_without_payloads.values() { + let first_block = pending_block.ordered_block().first_block(); + self.blocks_without_payloads_by_hash + .insert(first_block.id(), pending_block.clone()); + } // Return the ready block (if one exists) ready_block @@ -205,6 +269,14 @@ impl PendingBlockStore { num_entries, ); + // Update the number of pending block by hash entries + let num_entries_by_hash = self.blocks_without_payloads_by_hash.len() as u64; + metrics::set_gauge_with_label( + &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, + metrics::PENDING_BLOCK_ENTRIES_BY_HASH_LABEL, + num_entries_by_hash, + ); + // Update the total number of pending blocks let num_pending_blocks = self .blocks_without_payloads @@ -217,6 +289,18 @@ impl PendingBlockStore { num_pending_blocks, ); + // Update the total number of pending blocks by hash + let num_pending_blocks_by_hash = self + .blocks_without_payloads_by_hash + .values() + .map(|block| block.ordered_block().blocks().len() as u64) + .sum(); + metrics::set_gauge_with_label( + &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, + metrics::PENDING_BLOCKS_BY_HASH_LABEL, + num_pending_blocks_by_hash, + ); + // Update the highest round for the pending blocks let highest_pending_round = self .blocks_without_payloads @@ -290,6 +374,12 @@ mod test { .lock() .blocks_without_payloads .is_empty()); + + // Verify that the hash store is now empty + assert!(pending_block_store + .lock() + .blocks_without_payloads_by_hash + .is_empty()); } #[test] @@ -317,9 +407,17 @@ mod test { // Verify that all blocks were inserted correctly for pending_block in &pending_blocks { + // Verify that the block is in the store assert!(pending_block_store .lock() .existing_pending_block(pending_block)); + + // Verify that the block is in the store by hash + let block_hash = pending_block.first_block().id(); + assert!(pending_block_store + .lock() + .get_pending_block_by_hash(block_hash) + .is_some()); } // Create a new block payload store and insert payloads for the second block @@ -346,9 +444,92 @@ mod test { // Verify that the first and second blocks are no longer in the store for pending_block in &pending_blocks[..2] { + // Verify that the block is not in the store assert!(!pending_block_store .lock() .existing_pending_block(pending_block)); + + // Verify that the block is not in the store by hash + let block_hash = pending_block.first_block().id(); + assert!(pending_block_store + .lock() + .get_pending_block_by_hash(block_hash) + .is_none()); + } + } + + #[test] + fn test_get_pending_block_by_hash() { + // Create a new pending block store + let max_num_pending_blocks = 50; + let consensus_observer_config = ConsensusObserverConfig { + max_num_pending_blocks: max_num_pending_blocks as u64, + ..ConsensusObserverConfig::default() + }; + let pending_block_store = Arc::new(Mutex::new(PendingBlockStore::new( + consensus_observer_config, + ))); + + // Insert the maximum number of blocks into the store + let current_epoch = 10; + let starting_round = 100; + let pending_blocks = create_and_add_pending_blocks( + pending_block_store.clone(), + max_num_pending_blocks, + current_epoch, + starting_round, + 5, + ); + + // Verify that all blocks were inserted correctly + for pending_block in &pending_blocks { + let pending_block_by_hash = pending_block_store + .lock() + .get_pending_block_by_hash(pending_block.first_block().id()) + .unwrap(); + assert_eq!( + pending_block_by_hash.observed_ordered_block.ordered_block(), + pending_block + ); + } + + // Remove the first and second blocks manually + for block in &pending_blocks[..2] { + pending_block_store + .lock() + .blocks_without_payloads_by_hash + .remove(&block.first_block().id()); + } + + // Verify that the first and second blocks are no longer in the store + for pending_block in &pending_blocks[..2] { + assert!(pending_block_store + .lock() + .get_pending_block_by_hash(pending_block.first_block().id()) + .is_none()); + } + + // Verify that the remaining blocks are still in the store by hash + for pending_block in &pending_blocks[2..] { + let pending_block_by_hash = pending_block_store + .lock() + .get_pending_block_by_hash(pending_block.first_block().id()) + .unwrap(); + assert_eq!( + pending_block_by_hash.observed_ordered_block.ordered_block(), + pending_block + ); + } + + // Clear the blocks from the store + pending_block_store.lock().clear_missing_blocks(); + + // Verify that all blocks are no longer in the store by hash + for pending_block in &pending_blocks { + assert!(pending_block_store + .lock() + .get_pending_block_by_hash(pending_block.first_block().id()) + .is_none()); } } @@ -468,14 +649,22 @@ mod test { &new_pending_block, ); - // Get the round of the oldest block (that was garbage collected) + // Get the oldest block (that was garbage collected) let oldest_block = pending_blocks.remove(0); - let oldest_block_round = oldest_block.first_block().round(); // Verify that the oldest block was garbage collected + let oldest_block_round = oldest_block.first_block().round(); let blocks_without_payloads = pending_block_store.lock().blocks_without_payloads.clone(); assert!(!blocks_without_payloads.contains_key(&(current_epoch, oldest_block_round))); + + // Verify that the oldest block was garbage collected by hash + let oldest_block_hash = oldest_block.first_block().id(); + let blocks_without_payloads_by_hash = pending_block_store + .lock() + .blocks_without_payloads_by_hash + .clone(); + assert!(!blocks_without_payloads_by_hash.contains_key(&oldest_block_hash)); } // Insert multiple blocks into the store (for the next epoch) and @@ -499,14 +688,22 @@ mod test { &new_pending_block, ); - // Get the round of the oldest block (that was garbage collected) + // Get the oldest block (that was garbage collected) let oldest_block = pending_blocks.remove(0); - let oldest_block_round = oldest_block.first_block().round(); // Verify that the oldest block was garbage collected + let oldest_block_round = oldest_block.first_block().round(); let blocks_without_payloads = pending_block_store.lock().blocks_without_payloads.clone(); assert!(!blocks_without_payloads.contains_key(&(current_epoch, oldest_block_round))); + + // Verify that the oldest block was garbage collected by hash + let oldest_block_hash = oldest_block.first_block().id(); + let blocks_without_payloads_by_hash = pending_block_store + .lock() + .blocks_without_payloads_by_hash + .clone(); + assert!(!blocks_without_payloads_by_hash.contains_key(&oldest_block_hash)); } } @@ -525,14 +722,15 @@ mod test { // Insert the maximum number of pending blocks into the store let mut pending_blocks_with_metadata = vec![]; for i in 0..max_num_pending_blocks { - // Create an ordered block + // Create an observed ordered block let ordered_block = create_ordered_block(0, 0, 1, i); + let observed_ordered_block = ObservedOrderedBlock::new(ordered_block.clone()); // Create a pending block with metadata - let pending_block_with_metadata = PendingBlockWithMetadata::new( + let pending_block_with_metadata = PendingBlockWithMetadata::new_with_arc( PeerNetworkId::random(), Instant::now(), - ordered_block.clone(), + observed_ordered_block.clone(), ); // Insert the ordered block into the pending block store @@ -559,14 +757,15 @@ mod test { for expected_block_with_metadata in pending_blocks_with_metadata { // Unpack the expected block with metadata into its components let (expected_peer_network_id, expected_block_receipt_time, expected_ordered_block) = - expected_block_with_metadata.into_parts(); + expected_block_with_metadata.unpack(); // Remove the pending block from the store + let first_block = expected_ordered_block.ordered_block().first_block(); let removed_block_with_metadata = pending_block_store .lock() .remove_ready_block( - expected_ordered_block.first_block().epoch(), - expected_ordered_block.first_block().round(), + first_block.epoch(), + first_block.round(), block_payload_store.clone(), ) .unwrap(); @@ -581,8 +780,8 @@ mod test { expected_block_receipt_time ); assert_eq!( - removed_block_with_metadata.ordered_block().clone(), - expected_ordered_block + removed_block_with_metadata.ordered_block(), + expected_ordered_block.ordered_block() ); } } @@ -912,11 +1111,15 @@ mod test { let ordered_block = create_ordered_block(epoch, starting_round, max_pipelined_blocks, i); + // Create an observed ordered block + let observed_ordered_block = + ObservedOrderedBlock::new_for_testing(ordered_block.clone()); + // Create a pending block with metadata - let pending_block_with_metadata = PendingBlockWithMetadata::new( + let pending_block_with_metadata = PendingBlockWithMetadata::new_with_arc( PeerNetworkId::random(), Instant::now(), - ordered_block.clone(), + observed_ordered_block, ); // Insert the ordered block into the pending block store @@ -1008,7 +1211,16 @@ mod test { num_expected_blocks ); - // Check that all pending blocks are in the store + // Check the number of pending blocks by hash + assert_eq!( + pending_block_store + .lock() + .blocks_without_payloads_by_hash + .len(), + num_expected_blocks + ); + + // Check that all pending blocks are in the stores for pending_block in pending_blocks { // Lock the pending block store let pending_block_store = pending_block_store.lock(); @@ -1022,6 +1234,16 @@ mod test { // Verify that the pending block is in the store assert_eq!(block_in_store.ordered_block(), pending_block); + + // Get the pending block in the store by hash + let first_block_hash = first_block.id(); + let block_in_store = pending_block_store + .blocks_without_payloads_by_hash + .get(&first_block_hash) + .unwrap(); + + // Verify that the pending block is in the store by hash + assert_eq!(block_in_store.ordered_block(), pending_block); } } }