Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
911 changes: 714 additions & 197 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/chat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ release = false
[dependencies]
tokio = { workspace = true, features = ["full"] }
futures = { workspace = true }
libp2p = { path = "../../libp2p", features = [ "tokio", "gossipsub", "mdns", "noise", "macros", "tcp", "yamux", "quic"] }
libp2p = { version = "0.56", features = ["tokio", "gossipsub", "mdns", "noise", "macros", "tcp", "yamux", "quic"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[lints]
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-private/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ release = false
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "io-std"] }
either = "1.12"
futures = { workspace = true }
libp2p = { path = "../../libp2p", features = [ "tokio", "gossipsub", "dns", "identify", "kad", "macros", "noise", "ping", "pnet", "tcp", "websocket", "yamux"] }
libp2p = { version = "0.56", features = ["tokio", "gossipsub", "dns", "identify", "kad", "macros", "noise", "ping", "pnet", "tcp", "websocket", "yamux"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[lints]
Expand Down
3 changes: 3 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
- Remove `Rpc` from the public API.
See [PR 6091](https://github.com/libp2p/rust-libp2p/pull/6091)

- Report peers with Low scores.
See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/6091)

## 0.49.2

- Relax `Behaviour::with_metrics` requirements, do not require DataTransform and TopicSubscriptionFilter to also impl Default
Expand Down
16 changes: 8 additions & 8 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ getrandom = { workspace = true }
hashlink = { workspace = true }
hex_fmt = "0.3.0"
web-time = { workspace = true }
libp2p-core = { workspace = true }
libp2p-identity = { workspace = true, features = ["rand"] }
libp2p-swarm = { workspace = true }
# Libp2p crates, updated to use crates.io versions so that we can use this gossipsub fork with
# crates.io libp2p
libp2p-core = "0.43"
libp2p-identity = { version = "0.2", features = ["rand"] }
libp2p-swarm = "0.47"
quick-protobuf = "0.8"
quick-protobuf-codec = { workspace = true }
quick-protobuf-codec = "0.3.1"
rand = "0.8"
regex = "1.10.5"
serde = { version = "1", optional = true, features = ["derive"] }
sha2 = "0.10.8"
tracing = { workspace = true }

# Metrics dependencies
prometheus-client = { workspace = true, optional = true }
prometheus-client = { version = "0.23", optional = true }

[dev-dependencies]
libp2p-core = { workspace = true }
libp2p-swarm-test = { path = "../../swarm-test" }
libp2p-swarm-test = { version = "0.6.0", features = ["tokio"] }
quickcheck = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "macros"] }
Expand Down
35 changes: 24 additions & 11 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ pub enum Event {
/// The types and amounts of failed messages that are occurring for this peer.
failed_messages: FailedMessages,
},
/// A Peer is below the score threshold.
BelowThresholdPeers { peer_ids: Vec<PeerId> },
}

/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
Expand Down Expand Up @@ -2139,18 +2141,23 @@ where
let mesh_outbound_min = self.config.mesh_outbound_min_for_topic(topic_hash);

// drop all peers with negative score, without PX
// if there is at some point a stable retain method for BTreeSet the following can be
// written more efficiently with retain.
let mut to_remove_peers = Vec::new();
for peer_id in peers.iter() {
// report peers below the score report threshold.
let mut removed_peers = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good idea to change to_remove_peer in this PR? It doesn't seem related

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not directly related, but since I am touching this code might as well take the opportunity to cleanup this part

let mut peers_to_report = vec![];
peers.retain(|peer_id| {
let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();

// Record the score per mesh
#[cfg(feature = "metrics")]
if let Some(metrics) = self.metrics.as_mut() {
metrics.observe_mesh_peers_score(topic_hash, peer_score);
}

if let Some(threshold) = self.config.score_report_threshold() {
if peer_score < threshold {
peers_to_report.push(*peer_id);
}
}

if peer_score < 0.0 {
tracing::debug!(
peer=%peer_id,
Expand All @@ -2162,17 +2169,23 @@ where
let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
no_px.insert(*peer_id);
to_remove_peers.push(*peer_id);
removed_peers += 1;
return false;
}

true
});

if !peers_to_report.is_empty() {
self.events
.push_back(ToSwarm::GenerateEvent(Event::BelowThresholdPeers {
peer_ids: peers_to_report,
}));
}

#[cfg(feature = "metrics")]
if let Some(m) = self.metrics.as_mut() {
m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
}

for peer_id in to_remove_peers {
peers.remove(&peer_id);
m.peers_removed(topic_hash, Churn::BadScore, removed_peers)
}

// too little peers - add some
Expand Down
57 changes: 57 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6787,3 +6787,60 @@ fn test_validation_message_size_within_topic_specific() {
_ => panic!("Unexpected event"),
}
}

#[test]
fn test_low_score_peer_is_reported() {
let score_report_threshold = -5.0;

let config = ConfigBuilder::default()
.score_report_threshold(score_report_threshold)
.build()
.expect("valid config");

let (mut gs, peers, _receivers, _topics) = inject_nodes1()
.peer_no(3)
.topics(vec!["test".into()])
.to_subscribe(true)
.gs_config(config)
.scoring(Some((
PeerScoreParams::default(),
PeerScoreThresholds::default(),
)))
.create_network();

// Reduce the score of the first peer below the threshold
gs.as_peer_score_mut().add_penalty(&peers[0], 10);

// Reduce the score of the second peer below the threshold
gs.as_peer_score_mut().add_penalty(&peers[1], 8);

// Verify initial scores are below threshold
assert!(gs.as_peer_score_mut().score_report(&peers[0]).score < score_report_threshold);
assert!(gs.as_peer_score_mut().score_report(&peers[1]).score < score_report_threshold);
assert!(gs.as_peer_score_mut().score_report(&peers[2]).score >= score_report_threshold);

// Trigger a heartbeat which should generate the LowScorePeers event
gs.heartbeat();

// Check for the LowScorePeers event
let low_score_event = gs
.events
.iter()
.find(|event| {
matches!(
event,
ToSwarm::GenerateEvent(Event::BelowThresholdPeers { .. })
)
})
.unwrap();

match low_score_event {
ToSwarm::GenerateEvent(Event::BelowThresholdPeers { peer_ids }) => {
assert_eq!(peer_ids.len(), 2);
assert!(peer_ids.contains(&peers[0]));
assert!(peer_ids.contains(&peers[1]));
assert!(!peer_ids.contains(&peers[2]));
}
_ => unreachable!(),
}
}
16 changes: 16 additions & 0 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub struct Config {
connection_handler_forward_duration: Duration,
idontwant_message_size_threshold: usize,
idontwant_on_publish: bool,
score_report_threshold: Option<f64>,
topic_configuration: TopicConfigs,
}

Expand Down Expand Up @@ -476,6 +477,12 @@ impl Config {
pub fn idontwant_on_publish(&self) -> bool {
self.idontwant_on_publish
}

/// Score threshold below which peers should be reported. If set to None, no peer reporting
/// based on score will occur. Default is None.
pub fn score_report_threshold(&self) -> Option<f64> {
self.score_report_threshold
}
}

impl Default for Config {
Expand Down Expand Up @@ -545,6 +552,7 @@ impl Default for ConfigBuilder {
connection_handler_forward_duration: Duration::from_secs(1),
idontwant_message_size_threshold: 1000,
idontwant_on_publish: false,
score_report_threshold: None,
topic_configuration: TopicConfigs::default(),
},
invalid_protocol: false,
Expand Down Expand Up @@ -1040,6 +1048,14 @@ impl ConfigBuilder {
self
}

/// Sets the score threshold below which peers should be reported.
/// When set, the behaviour will report peers whose score falls below this threshold.
/// Default is None (no reporting).
pub fn score_report_threshold(&mut self, threshold: f64) -> &mut Self {
self.config.score_report_threshold = Some(threshold);
self
}

/// The topic configuration sets mesh parameter sizes for a given topic. Notes on default
/// below.
///
Expand Down
Loading