Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ where
/// Returns a new event listener for the rollup node manager.
pub fn event_listener(&mut self) -> EventStream<RollupManagerEvent> {
if let Some(event_sender) = &self.event_sender {
return event_sender.new_listener()
return event_sender.new_listener();
};

let event_sender = EventSender::new(EVENT_CHANNEL_SIZE);
Expand Down
69 changes: 49 additions & 20 deletions crates/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{
BlockImportOutcome, BlockValidation, NetworkHandleMessage, NetworkManagerEvent,
NewBlockWithPeer, ScrollNetworkHandle,
};
use alloy_primitives::{FixedBytes, Signature, U128};
use alloy_primitives::{FixedBytes, Signature, B256, U128};
use futures::{FutureExt, Stream, StreamExt};
use reth_chainspec::EthChainSpec;
use reth_eth_wire_types::NewBlock as EthWireNewBlock;
Expand Down Expand Up @@ -54,7 +54,9 @@ pub struct ScrollNetworkManager<N, CS> {
/// The receiver for new blocks received from the network (used to bridge from eth-wire).
eth_wire_listener: Option<EventStream<RethNewBlockWithPeer<ScrollBlock>>>,
/// The scroll wire protocol manager.
scroll_wire: ScrollWireManager,
pub scroll_wire: ScrollWireManager,
/// The LRU cache used to track already seen (block,signature) pair.
pub blocks_seen: LruCache<(B256, Signature)>,
/// The constant value that must be added to the block number to get the total difficulty.
td_constant: U128,
}
Expand Down Expand Up @@ -90,6 +92,8 @@ impl<CS: ScrollHardforks + EthChainSpec + Send + Sync + 'static>
// Create the scroll-wire protocol manager.
let scroll_wire = ScrollWireManager::new(events);

let blocks_seen = LruCache::new(LRU_CACHE_SIZE);

// Spawn the inner network manager.
tokio::spawn(inner_network_manager);

Expand All @@ -98,6 +102,7 @@ impl<CS: ScrollHardforks + EthChainSpec + Send + Sync + 'static>
handle,
from_handle_rx: from_handle_rx.into(),
scroll_wire,
blocks_seen,
eth_wire_listener,
td_constant,
}
Expand Down Expand Up @@ -128,11 +133,14 @@ impl<

let handle = ScrollNetworkHandle::new(to_manager_tx, inner_network_handle);

let blocks_seen = LruCache::new(LRU_CACHE_SIZE);

Self {
chain_spec,
handle,
from_handle_rx: from_handle_rx.into(),
scroll_wire,
blocks_seen,
eth_wire_listener,
td_constant,
}
Expand Down Expand Up @@ -177,11 +185,29 @@ impl<
}

/// Handler for received events from the [`ScrollWireManager`].
fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> NetworkManagerEvent {
fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> Option<NetworkManagerEvent> {
match event {
ScrollWireEvent::NewBlock { peer_id, block, signature } => {
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block.hash_slow(), signature = ?signature, "Received new block");
NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature })
let block_hash = block.hash_slow();
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block");
if self.blocks_seen.contains(&(block_hash, signature)) {
None
} else {
// Update the state of the peer cache i.e. peer has seen this block.
self.scroll_wire
.state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);
// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block.hash_slow(), signature));

Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer {
peer_id,
block,
signature,
}))
}
}
// Only `NewBlock` events are expected from the scroll-wire protocol.
_ => {
Expand Down Expand Up @@ -214,12 +240,6 @@ impl<
Ok(BlockValidation::ValidBlock { new_block: msg }) |
Ok(BlockValidation::ValidHeader { new_block: msg }) => {
trace!(target: "scroll::network::manager", peer_id = ?peer, block = ?msg.block, "Block import successful - announcing block to network");
let hash = msg.block.hash_slow();
self.scroll_wire
.state_mut()
.entry(peer)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(hash);
self.announce_block(msg);
}
Err(BlockImportError::Consensus(err)) => {
Expand All @@ -241,14 +261,6 @@ impl<
block: reth_network_api::block::NewBlockWithPeer<ScrollBlock>,
) -> Option<NetworkManagerEvent> {
let reth_network_api::block::NewBlockWithPeer { peer_id, mut block } = block;
let block_hash = block.hash_slow();
self.scroll_wire
.state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);

trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, "Received new block from eth-wire protocol");

// We purge the extra data field post euclid v2 to align with protocol specification.
let extra_data = if self.chain_spec.is_euclid_v2_active_at_timestamp(block.timestamp) {
Expand All @@ -268,6 +280,21 @@ impl<
.checked_sub(ECDSA_SIGNATURE_LEN)
.and_then(|i| Signature::from_raw(&extra_data[i..]).ok())
{
let block_hash = block.hash_slow();
if self.blocks_seen.contains(&(block_hash, signature)) {
return None;
}
trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, "Received new block from eth-wire protocol");

// Update the state of the peer cache i.e. peer has seen this block.
self.scroll_wire
.state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);

// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block_hash, signature));
Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature }))
} else {
tracing::warn!(target: "scroll::bridge::import", peer_id = %peer_id, "Failed to extract signature from block extra data, penalizing peer");
Expand Down Expand Up @@ -306,7 +333,9 @@ impl<

// Next we handle the scroll-wire events.
if let Poll::Ready(event) = this.scroll_wire.poll_unpin(cx) {
return Poll::Ready(Some(this.on_scroll_wire_event(event)));
if let Some(event) = this.on_scroll_wire_event(event) {
return Poll::Ready(Some(event));
}
}

// Handle blocks received from the eth-wire protocol.
Expand Down
Loading