Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus Observer] Add ObservedOrderedBlock to various stores. #15892

Open
wants to merge 3 commits into
base: co_exec_pool2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions consensus/src/consensus_observer/common/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub const CREATED_SUBSCRIPTION_LABEL: &str = "created_subscription";
pub const ORDERED_BLOCK_ENTRIES_LABEL: &str = "ordered_block_entries";
pub const ORDERED_BLOCK_LABEL: &str = "ordered_block";
pub const ORDERED_BLOCK_WITH_WINDOW_LABEL: &str = "ordered_block_with_window";
pub const PENDING_BLOCK_ENTRIES_BY_HASH_LABEL: &str = "pending_block_by_hash_entries";
pub const PENDING_BLOCK_ENTRIES_LABEL: &str = "pending_block_entries";
pub const PENDING_BLOCKS_BY_HASH_LABEL: &str = "pending_blocks_by_hash";
pub const PENDING_BLOCKS_LABEL: &str = "pending_blocks";
pub const STORED_PAYLOADS_LABEL: &str = "stored_payloads";

Expand Down
11 changes: 8 additions & 3 deletions consensus/src/consensus_observer/observer/active_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ fn handle_committed_blocks(
#[cfg(test)]
mod test {
use super::*;
use crate::consensus_observer::network::observer_message::{
BlockPayload, BlockTransactionPayload, OrderedBlock,
use crate::consensus_observer::{
network::observer_message::{BlockPayload, BlockTransactionPayload, OrderedBlock},
observer::execution_pool::ObservedOrderedBlock,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_consensus_types::{
Expand Down Expand Up @@ -573,10 +574,14 @@ mod test {
create_ledger_info(epoch, i as aptos_consensus_types::common::Round);
let ordered_block = OrderedBlock::new(blocks, ordered_proof);

// Create an observed ordered block
let observed_ordered_block =
ObservedOrderedBlock::new_for_testing(ordered_block.clone());

// Insert the block into the ordered block store
ordered_block_store
.lock()
.insert_ordered_block(ordered_block.clone());
.insert_ordered_block(observed_ordered_block.clone());

// Add the block to the ordered blocks
ordered_blocks.push(ordered_block);
Expand Down
21 changes: 14 additions & 7 deletions consensus/src/consensus_observer/observer/consensus_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
},
observer::{
active_state::ActiveObserverState,
execution_pool::ObservedOrderedBlock,
fallback_manager::ObserverFallbackManager,
ordered_blocks::OrderedBlockStore,
payload_store::BlockPayloadStore,
Expand Down Expand Up @@ -732,8 +733,12 @@ impl ConsensusObserver {
update_metrics_for_ordered_block_message(peer_network_id, &ordered_block);

// Create a new pending block with metadata
let pending_block_with_metadata =
PendingBlockWithMetadata::new(peer_network_id, message_received_time, ordered_block);
let observed_ordered_block = ObservedOrderedBlock::new(ordered_block);
let pending_block_with_metadata = PendingBlockWithMetadata::new_with_arc(
peer_network_id,
message_received_time,
observed_ordered_block,
);

// If all payloads exist, process the block. Otherwise, store it
// in the pending block store and wait for the payloads to arrive.
Expand All @@ -751,11 +756,12 @@ impl ConsensusObserver {
/// has been sanity checked and that all payloads exist.
async fn process_ordered_block(
&mut self,
pending_block_with_metadata: PendingBlockWithMetadata,
pending_block_with_metadata: Arc<PendingBlockWithMetadata>,
) {
// Unpack the pending block
let (peer_network_id, message_received_time, ordered_block) =
pending_block_with_metadata.into_parts();
let (peer_network_id, message_received_time, observed_ordered_block) =
pending_block_with_metadata.unpack();
let ordered_block = observed_ordered_block.ordered_block().clone();

// Verify the ordered block proof
let epoch_state = self.get_epoch_state();
Expand Down Expand Up @@ -831,7 +837,7 @@ impl ConsensusObserver {
// Insert the ordered block into the pending blocks
self.ordered_block_store
.lock()
.insert_ordered_block(ordered_block.clone());
.insert_ordered_block(observed_ordered_block.clone());

// If state sync is not syncing to a commit, finalize the ordered blocks
if !self.state_sync_manager.is_syncing_to_commit() {
Expand Down Expand Up @@ -1079,8 +1085,9 @@ impl ConsensusObserver {

// Process all the newly ordered blocks
let all_ordered_blocks = self.ordered_block_store.lock().get_all_ordered_blocks();
for (_, (ordered_block, commit_decision)) in all_ordered_blocks {
for (_, (observed_ordered_block, commit_decision)) in all_ordered_blocks {
// Finalize the ordered block
let ordered_block = observed_ordered_block.consume_ordered_block();
self.finalize_ordered_block(ordered_block).await;

// If a commit decision is available, forward it to the execution pipeline
Expand Down
64 changes: 64 additions & 0 deletions consensus/src/consensus_observer/observer/execution_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#[cfg(test)]
use crate::consensus_observer::network::observer_message::ExecutionPoolWindow;
use crate::consensus_observer::network::observer_message::{OrderedBlock, OrderedBlockWithWindow};
#[cfg(test)]
use rand::{rngs::OsRng, Rng};

/// A simple enum wrapper that holds an observed ordered block, allowing
/// self-contained ordered blocks and ordered blocks with execution pool windows.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ObservedOrderedBlock {
Ordered(OrderedBlock),
OrderedWithWindow(OrderedBlockWithWindow),
}

impl ObservedOrderedBlock {
/// Creates a new observed ordered block
pub fn new(ordered_block: OrderedBlock) -> Self {
Self::Ordered(ordered_block)
}

/// Creates a new observed ordered block with window
pub fn new_with_window(ordered_block_with_window: OrderedBlockWithWindow) -> Self {
Self::OrderedWithWindow(ordered_block_with_window)
}

#[cfg(test)]
/// Creates a new observed ordered block for testing.
/// Note: the observed type is determined randomly.
pub fn new_for_testing(ordered_block: OrderedBlock) -> Self {
if OsRng.gen::<u8>() % 2 == 0 {
ObservedOrderedBlock::new(ordered_block.clone())
} else {
let ordered_block_with_window = OrderedBlockWithWindow::new(
ordered_block.clone(),
ExecutionPoolWindow::new(vec![]),
);
ObservedOrderedBlock::new_with_window(ordered_block_with_window)
}
}

/// Consumes the observed ordered block and returns the inner ordered block
pub fn consume_ordered_block(self) -> OrderedBlock {
match self {
Self::Ordered(ordered_block) => ordered_block,
Self::OrderedWithWindow(ordered_block_with_window) => {
let (ordered_block, _) = ordered_block_with_window.into_parts();
ordered_block
},
}
}

/// Returns a reference to the inner ordered block
pub fn ordered_block(&self) -> &OrderedBlock {
match self {
Self::Ordered(ordered_block) => ordered_block,
Self::OrderedWithWindow(ordered_block_with_window) => {
ordered_block_with_window.ordered_block()
},
}
}
}
1 change: 1 addition & 0 deletions consensus/src/consensus_observer/observer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod active_state;
pub mod consensus_observer;
pub mod execution_pool;
pub mod fallback_manager;
pub mod ordered_blocks;
pub mod payload_store;
Expand Down
39 changes: 26 additions & 13 deletions consensus/src/consensus_observer/observer/ordered_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::consensus_observer::{
metrics,
},
network::observer_message::{CommitDecision, OrderedBlock},
observer::execution_pool::ObservedOrderedBlock,
};
use aptos_config::config::ConsensusObserverConfig;
use aptos_consensus_types::{common::Round, pipelined_block::PipelinedBlock};
Expand All @@ -24,7 +25,7 @@ pub struct OrderedBlockStore {

// Ordered blocks. The key is the epoch and round of the last block in the
// ordered block. Each entry contains the block and the commit decision (if any).
ordered_blocks: BTreeMap<(u64, Round), (OrderedBlock, Option<CommitDecision>)>,
ordered_blocks: BTreeMap<(u64, Round), (ObservedOrderedBlock, Option<CommitDecision>)>,
}

impl OrderedBlockStore {
Expand All @@ -44,7 +45,7 @@ impl OrderedBlockStore {
/// Returns a copy of the ordered blocks
pub fn get_all_ordered_blocks(
&self,
) -> BTreeMap<(u64, Round), (OrderedBlock, Option<CommitDecision>)> {
) -> BTreeMap<(u64, Round), (ObservedOrderedBlock, Option<CommitDecision>)> {
self.ordered_blocks.clone()
}

Expand All @@ -57,28 +58,30 @@ impl OrderedBlockStore {
pub fn get_last_ordered_block(&self) -> Option<Arc<PipelinedBlock>> {
self.ordered_blocks
.last_key_value()
.map(|(_, (ordered_block, _))| ordered_block.last_block())
.map(|(_, (observed_ordered_block, _))| {
observed_ordered_block.ordered_block().last_block()
})
}

/// Returns the ordered block for the given epoch and round (if any)
pub fn get_ordered_block(&self, epoch: u64, round: Round) -> Option<OrderedBlock> {
self.ordered_blocks
.get(&(epoch, round))
.map(|(ordered_block, _)| ordered_block.clone())
.map(|(observed_ordered_block, _)| observed_ordered_block.ordered_block().clone())
}

/// Inserts the given ordered block into the ordered blocks. This function
/// assumes the block has already been checked to extend the current ordered
/// blocks, and that the ordered proof has been verified.
pub fn insert_ordered_block(&mut self, ordered_block: OrderedBlock) {
pub fn insert_ordered_block(&mut self, observed_ordered_block: ObservedOrderedBlock) {
// Verify that the number of ordered blocks doesn't exceed the maximum
let max_num_ordered_blocks = self.consensus_observer_config.max_num_pending_blocks as usize;
if self.ordered_blocks.len() >= max_num_ordered_blocks {
warn!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Exceeded the maximum number of ordered blocks: {:?}. Dropping block: {:?}.",
max_num_ordered_blocks,
ordered_block.proof_block_info()
observed_ordered_block.ordered_block().proof_block_info()
))
);
return; // Drop the block if we've exceeded the maximum
Expand All @@ -88,18 +91,20 @@ impl OrderedBlockStore {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Adding ordered block to the ordered blocks: {:?}",
ordered_block.proof_block_info()
observed_ordered_block.ordered_block().proof_block_info()
))
);

// Get the epoch and round of the last ordered block
let last_block = ordered_block.last_block();
let last_block = observed_ordered_block.ordered_block().last_block();
let last_block_epoch = last_block.epoch();
let last_block_round = last_block.round();

// Insert the ordered block
self.ordered_blocks
.insert((last_block_epoch, last_block_round), (ordered_block, None));
self.ordered_blocks.insert(
(last_block_epoch, last_block_round),
(observed_ordered_block, None),
);
}

/// Removes the ordered blocks for the given commit ledger info. This will
Expand Down Expand Up @@ -173,7 +178,9 @@ impl OrderedBlockStore {
let num_ordered_blocks = self
.ordered_blocks
.values()
.map(|(ordered_block, _)| ordered_block.blocks().len() as u64)
.map(|(observed_ordered_block, _)| {
observed_ordered_block.ordered_block().blocks().len() as u64
})
.sum();
metrics::set_gauge_with_label(
&metrics::OBSERVER_NUM_PROCESSED_BLOCKS,
Expand All @@ -185,7 +192,9 @@ impl OrderedBlockStore {
let highest_ordered_round = self
.ordered_blocks
.last_key_value()
.map(|(_, (ordered_block, _))| ordered_block.last_block().round())
.map(|(_, (observed_ordered_block, _))| {
observed_ordered_block.ordered_block().last_block().round()
})
.unwrap_or(0);
metrics::set_gauge_with_label(
&metrics::OBSERVER_PROCESSED_BLOCK_ROUNDS,
Expand Down Expand Up @@ -725,8 +734,12 @@ mod test {
let ordered_proof = create_ledger_info(epoch, i as Round);
let ordered_block = OrderedBlock::new(blocks, ordered_proof);

// Create an observed ordered block
let observed_ordered_block =
ObservedOrderedBlock::new_for_testing(ordered_block.clone());

// Insert the block into the ordered block store
ordered_block_store.insert_ordered_block(ordered_block.clone());
ordered_block_store.insert_ordered_block(observed_ordered_block.clone());

// Add the block to the ordered blocks
ordered_blocks.push(ordered_block);
Expand Down
Loading
Loading