Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl FlowContext {
return Err(err)?;
}
// Broadcast as soon as the block has been validated and inserted into the DAG
self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await;
self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) }), None).await;

self.on_new_block(consensus, Default::default(), block, virtual_state_task).await;
self.log_block_event(BlockLogEvent::Submit(hash));
Expand Down Expand Up @@ -544,7 +544,7 @@ impl FlowContext {
.iter()
.map(|(b, _)| make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(b.hash().into()) }))
.collect();
self.hub.broadcast_many(msgs).await;
self.hub.broadcast_many(msgs, None).await;

// Process blocks in topological order
blocks.sort_by(|a, b| a.0.header.blue_work.partial_cmp(&b.0.header.blue_work).unwrap());
Expand Down
2 changes: 1 addition & 1 deletion protocol/flows/src/flowcontext/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl TransactionsSpread {
// TODO: Figure out a better number
self.hub.broadcast_to_some_peers(msg, 8).await
} else {
self.hub.broadcast(msg).await
self.hub.broadcast(msg, None).await
}
}
}
9 changes: 7 additions & 2 deletions protocol/flows/src/v7/blockrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,16 @@ impl HandleRelayInvsFlow {
.iter()
.map(|b| make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(b.hash().into()) }))
.collect();
self.ctx.hub().broadcast_many(msgs).await;
// we filter out the current peer to avoid sending it back invs we know it already has
self.ctx.hub().broadcast_many(msgs, Some(self.router.key())).await;

// we filter out the current peer to avoid sending it back the same invs
self.ctx
.hub()
.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(inv.hash.into()) }))
.broadcast(
make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(inv.hash.into()) }),
Some(self.router.key()),
)
.await;
}

Expand Down
24 changes: 18 additions & 6 deletions protocol/p2p/src/core/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,15 @@ impl Hub {
}
}

/// Broadcast a message to all peers
pub async fn broadcast(&self, msg: KaspadMessage) {
let peers = self.peers.read().values().cloned().collect::<Vec<_>>();
/// Broadcast a message to all peers (except an optional filtered peer)
pub async fn broadcast(&self, msg: KaspadMessage, filter_peer: Option<PeerKey>) {
let peers = self
.peers
.read()
.values()
.filter(|&r| filter_peer.is_none_or(|filter_peer| r.key() != filter_peer))
.cloned()
.collect::<Vec<_>>();
for router in peers {
let _ = router.enqueue(msg.clone()).await;
}
Expand All @@ -149,12 +155,18 @@ impl Hub {
}
}

/// Broadcast a vector of messages to all peers
pub async fn broadcast_many(&self, msgs: Vec<KaspadMessage>) {
/// Broadcast a vector of messages to all peers (except an optional filtered peer)
pub async fn broadcast_many(&self, msgs: Vec<KaspadMessage>, filter_peer: Option<PeerKey>) {
if msgs.is_empty() {
return;
}
let peers = self.peers.read().values().cloned().collect::<Vec<_>>();
let peers = self
.peers
.read()
.values()
.filter(|&r| filter_peer.is_none_or(|filter_peer| r.key() != filter_peer))
.cloned()
.collect::<Vec<_>>();
for router in peers {
for msg in msgs.iter().cloned() {
let _ = router.enqueue(msg).await;
Expand Down