diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index c757f53fd6..cafe38e3f3 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -502,7 +502,7 @@ impl FlowContext { return Err(err)?; } // Broadcast as soon as the block has been validated and inserted into the DAG - self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await; + self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) }), None).await; self.on_new_block(consensus, Default::default(), block, virtual_state_task).await; self.log_block_event(BlockLogEvent::Submit(hash)); @@ -544,7 +544,7 @@ impl FlowContext { .iter() .map(|(b, _)| make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(b.hash().into()) })) .collect(); - self.hub.broadcast_many(msgs).await; + self.hub.broadcast_many(msgs, None).await; // Process blocks in topological order blocks.sort_by(|a, b| a.0.header.blue_work.partial_cmp(&b.0.header.blue_work).unwrap()); diff --git a/protocol/flows/src/flowcontext/transactions.rs b/protocol/flows/src/flowcontext/transactions.rs index 5fe1bb5939..998b76415a 100644 --- a/protocol/flows/src/flowcontext/transactions.rs +++ b/protocol/flows/src/flowcontext/transactions.rs @@ -99,7 +99,7 @@ impl TransactionsSpread { // TODO: Figure out a better number self.hub.broadcast_to_some_peers(msg, 8).await } else { - self.hub.broadcast(msg).await + self.hub.broadcast(msg, None).await } } } diff --git a/protocol/flows/src/v7/blockrelay/flow.rs b/protocol/flows/src/v7/blockrelay/flow.rs index 12175f6625..8d398ca01c 100644 --- a/protocol/flows/src/v7/blockrelay/flow.rs +++ b/protocol/flows/src/v7/blockrelay/flow.rs @@ -207,11 +207,16 @@ impl HandleRelayInvsFlow { .iter() .map(|b| make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(b.hash().into()) })) .collect(); - self.ctx.hub().broadcast_many(msgs).await; + // we filter out the current peer to avoid sending it back invs we know it already has + self.ctx.hub().broadcast_many(msgs, Some(self.router.key())).await; + // we filter out the current peer to avoid sending it back the same invs self.ctx .hub() - .broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(inv.hash.into()) })) + .broadcast( + make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(inv.hash.into()) }), + Some(self.router.key()), + ) .await; } diff --git a/protocol/p2p/src/core/hub.rs b/protocol/p2p/src/core/hub.rs index eec622ffbc..2ac0f03405 100644 --- a/protocol/p2p/src/core/hub.rs +++ b/protocol/p2p/src/core/hub.rs @@ -130,9 +130,15 @@ impl Hub { } } - /// Broadcast a message to all peers - pub async fn broadcast(&self, msg: KaspadMessage) { - let peers = self.peers.read().values().cloned().collect::>(); + /// Broadcast a message to all peers (except an optional filtered peer) + pub async fn broadcast(&self, msg: KaspadMessage, filter_peer: Option) { + let peers = self + .peers + .read() + .values() + .filter(|&r| filter_peer.is_none_or(|filter_peer| r.key() != filter_peer)) + .cloned() + .collect::>(); for router in peers { let _ = router.enqueue(msg.clone()).await; } @@ -149,12 +155,18 @@ impl Hub { } } - /// Broadcast a vector of messages to all peers - pub async fn broadcast_many(&self, msgs: Vec) { + /// Broadcast a vector of messages to all peers (except an optional filtered peer) + pub async fn broadcast_many(&self, msgs: Vec, filter_peer: Option) { if msgs.is_empty() { return; } - let peers = self.peers.read().values().cloned().collect::>(); + let peers = self + .peers + .read() + .values() + .filter(|&r| filter_peer.is_none_or(|filter_peer| r.key() != filter_peer)) + .cloned() + .collect::>(); for router in peers { for msg in msgs.iter().cloned() { let _ = router.enqueue(msg).await;