Skip to content

Commit a42dc4b

Browse files
feat(blend): calculate topology diameter (#66)
1 parent be5df9e commit a42dc4b

File tree

2 files changed

+163
-77
lines changed

2 files changed

+163
-77
lines changed

simlib/blendnet-sims/src/main.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
// std
23
use std::fs::File;
34
use std::path::{Path, PathBuf};
@@ -16,7 +17,7 @@ use netrunner::node::{NodeId, NodeIdExt};
1617
use netrunner::output_processors::Record;
1718
use netrunner::runner::{BoxedNode, SimulationRunnerHandle};
1819
use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
19-
use node::blend::topology::build_topology;
20+
use node::blend::topology::Topology;
2021
use nomos_blend::cover_traffic::CoverTrafficSettings;
2122
use nomos_blend::message_blend::{
2223
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
@@ -26,7 +27,7 @@ use rand::seq::SliceRandom;
2627
use rand::{RngCore, SeedableRng};
2728
use rand_chacha::ChaCha12Rng;
2829
use serde::de::DeserializeOwned;
29-
use serde::Serialize;
30+
use serde::{Deserialize, Serialize};
3031
// internal
3132
use crate::node::blend::BlendNode;
3233
use crate::settings::SimSettings;
@@ -89,7 +90,8 @@ impl SimulationApp {
8990

9091
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed)));
9192

92-
let topology = build_topology(&node_ids, settings.connected_peers_count, &mut rng);
93+
let topology = Topology::new(&node_ids, settings.connected_peers_count, &mut rng);
94+
log_topology(&topology);
9395

9496
let nodes: Vec<_> = node_ids
9597
.iter()
@@ -243,6 +245,20 @@ fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
243245
Ok(serde_json::from_reader(f)?)
244246
}
245247

248+
fn log_topology(topology: &Topology) {
249+
let log = TopologyLog {
250+
topology: topology.to_node_indices(),
251+
diameter: topology.diameter(),
252+
};
253+
tracing::info!("Topology: {}", serde_json::to_string(&log).unwrap());
254+
}
255+
256+
#[derive(Debug, Serialize, Deserialize)]
257+
struct TopologyLog {
258+
topology: HashMap<usize, Vec<usize>>,
259+
diameter: usize,
260+
}
261+
246262
fn main() -> anyhow::Result<()> {
247263
let app: SimulationApp = SimulationApp::parse();
248264
let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics);
+144-74
Original file line numberDiff line numberDiff line change
@@ -1,91 +1,149 @@
11
use std::collections::{HashMap, HashSet};
22

3-
use netrunner::node::NodeId;
3+
use netrunner::node::{NodeId, NodeIdExt};
44
use rand::{seq::SliceRandom, RngCore};
55

6-
pub type Topology = HashMap<NodeId, HashSet<NodeId>>;
6+
#[derive(Clone)]
7+
pub struct Topology(HashMap<NodeId, HashSet<NodeId>>);
78

8-
/// Builds a topology with the given nodes and peering degree
9-
/// by ensuring that all nodes are connected (no partition)
10-
/// and all nodes have the same number of connections (only if possible).
11-
pub fn build_topology<R: RngCore>(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Topology {
12-
tracing::info!("Building topology: peering_degree:{}", peering_degree);
13-
loop {
14-
let mut topology = nodes
15-
.iter()
16-
.map(|&node| (node, HashSet::new()))
17-
.collect::<HashMap<_, _>>();
18-
19-
for node in nodes.iter() {
20-
// Collect peer candidates
21-
let mut others = nodes
9+
impl Topology {
10+
/// Builds a topology with the given nodes and peering degree
11+
/// by ensuring that all nodes are connected (no partition)
12+
/// and all nodes have the same number of connections (only if possible).
13+
pub fn new<R: RngCore>(nodes: &[NodeId], peering_degree: usize, mut rng: R) -> Self {
14+
tracing::info!("Building topology: peering_degree:{}", peering_degree);
15+
loop {
16+
let mut topology = nodes
2217
.iter()
23-
.filter(|&other| {
24-
// Check if the other node is not already connected to the current node
25-
// and the other node has not reached the peering degree.
26-
other != node
27-
&& !topology.get(node).unwrap().contains(other)
28-
&& topology.get(other).unwrap().len() < peering_degree
29-
})
30-
.copied()
31-
.collect::<Vec<_>>();
32-
33-
// How many more connections the current node needs
34-
let num_needs = peering_degree - topology.get(node).unwrap().len();
35-
// Sample peers as many as possible and connect them to the current node
36-
let k = std::cmp::min(num_needs, others.len());
37-
others.as_mut_slice().shuffle(&mut rng);
38-
others.into_iter().take(k).for_each(|peer| {
39-
topology.get_mut(node).unwrap().insert(peer);
40-
topology.get_mut(&peer).unwrap().insert(*node);
41-
});
42-
}
18+
.map(|&node| (node, HashSet::new()))
19+
.collect::<HashMap<_, _>>();
4320

44-
// Check constraints:
45-
// - All nodes are connected (no partition)
46-
// - All nodes have the same number of connections (if possible)
47-
let can_have_equal_conns = (nodes.len() * peering_degree) % 2 == 0;
48-
if check_all_connected(&topology)
49-
&& (!can_have_equal_conns || check_equal_conns(&topology, peering_degree))
50-
{
51-
return topology;
21+
for node in nodes.iter() {
22+
// Collect peer candidates
23+
let mut others = nodes
24+
.iter()
25+
.filter(|&other| {
26+
// Check if the other node is not already connected to the current node
27+
// and the other node has not reached the peering degree.
28+
other != node
29+
&& !topology.get(node).unwrap().contains(other)
30+
&& topology.get(other).unwrap().len() < peering_degree
31+
})
32+
.copied()
33+
.collect::<Vec<_>>();
34+
35+
// How many more connections the current node needs
36+
let num_needs = peering_degree - topology.get(node).unwrap().len();
37+
// Sample peers as many as possible and connect them to the current node
38+
let k = std::cmp::min(num_needs, others.len());
39+
others.as_mut_slice().shuffle(&mut rng);
40+
others.into_iter().take(k).for_each(|peer| {
41+
topology.get_mut(node).unwrap().insert(peer);
42+
topology.get_mut(&peer).unwrap().insert(*node);
43+
});
44+
}
45+
46+
// Check constraints:
47+
// - All nodes are connected (no partition)
48+
// - All nodes have the same number of connections (if possible)
49+
let topology = Self(topology);
50+
let can_have_equal_conns = (nodes.len() * peering_degree) % 2 == 0;
51+
if topology.check_all_connected()
52+
&& (!can_have_equal_conns || topology.check_equal_conns(peering_degree))
53+
{
54+
return topology;
55+
}
56+
tracing::info!("Topology doesn't meet constraints. Retrying...");
5257
}
53-
tracing::info!("Topology doesn't meet constraints. Retrying...");
5458
}
55-
}
5659

57-
/// Checks if all nodes are connected (no partition) in the topology.
58-
fn check_all_connected(topology: &Topology) -> bool {
59-
let visited = dfs(topology, *topology.keys().next().unwrap());
60-
visited.len() == topology.len()
61-
}
60+
/// Checks if all nodes are connected (no partition) in the topology.
61+
fn check_all_connected(&self) -> bool {
62+
let visited = self.dfs(*self.0.keys().next().unwrap());
63+
visited.len() == self.0.len()
64+
}
6265

63-
/// Depth-first search to visit nodes in the topology.
64-
fn dfs(topology: &Topology, start_node: NodeId) -> HashSet<NodeId> {
65-
let mut visited: HashSet<NodeId> = HashSet::new();
66-
let mut stack: Vec<NodeId> = Vec::new();
67-
stack.push(start_node);
68-
while let Some(node) = stack.pop() {
69-
visited.insert(node);
70-
for peer in topology.get(&node).unwrap().iter() {
71-
if !visited.contains(peer) {
72-
stack.push(*peer);
66+
/// Depth-first search to visit nodes in the topology.
67+
fn dfs(&self, start_node: NodeId) -> HashSet<NodeId> {
68+
let mut visited: HashSet<NodeId> = HashSet::new();
69+
let mut stack: Vec<NodeId> = Vec::new();
70+
stack.push(start_node);
71+
while let Some(node) = stack.pop() {
72+
visited.insert(node);
73+
for peer in self.0.get(&node).unwrap().iter() {
74+
if !visited.contains(peer) {
75+
stack.push(*peer);
76+
}
7377
}
7478
}
79+
visited
7580
}
76-
visited
77-
}
7881

79-
/// Checks if all nodes have the same number of connections.
80-
fn check_equal_conns(topology: &Topology, peering_degree: usize) -> bool {
81-
topology
82-
.iter()
83-
.all(|(_, peers)| peers.len() == peering_degree)
82+
/// Checks if all nodes have the same number of connections.
83+
fn check_equal_conns(&self, peering_degree: usize) -> bool {
84+
self.0
85+
.iter()
86+
.all(|(_, peers)| peers.len() == peering_degree)
87+
}
88+
89+
/// Calculate the diameter (longest path length) of the topology.
90+
pub fn diameter(&self) -> usize {
91+
// Calculate a diameter from each node and take the maximum
92+
self.0
93+
.keys()
94+
.map(|&node| self.diameter_from(node))
95+
.fold(0, usize::max)
96+
}
97+
98+
/// Calculate a diameter (longest path length) of the topology from the start_node.
99+
fn diameter_from(&self, start_node: NodeId) -> usize {
100+
// start_node is visited at the beginning
101+
let mut visited: HashSet<NodeId> = HashSet::from([start_node]);
102+
103+
// Count the number of hops to visit all nodes
104+
let mut hop_count = 0;
105+
let mut next_hop: HashSet<NodeId> = self.0.get(&start_node).unwrap().clone();
106+
while !next_hop.is_empty() {
107+
// First, visit all nodes in the next hop and increase the hop count
108+
next_hop.iter().for_each(|&node| {
109+
assert!(visited.insert(node));
110+
});
111+
hop_count += 1;
112+
// Then, build the new next hop by collecting all peers of the current next hop
113+
// except peers already visited
114+
next_hop = next_hop
115+
.iter()
116+
.flat_map(|node| self.0.get(node).unwrap())
117+
.filter(|&peer| !visited.contains(peer))
118+
.copied()
119+
.collect();
120+
}
121+
hop_count
122+
}
123+
124+
pub fn get(&self, node: &NodeId) -> Option<&HashSet<NodeId>> {
125+
self.0.get(node)
126+
}
127+
128+
/// Converts all [`NodeId`]s in the topology to their indices.
129+
pub fn to_node_indices(&self) -> HashMap<usize, Vec<usize>> {
130+
self.0
131+
.iter()
132+
.map(|(node, peers)| {
133+
(
134+
node.index(),
135+
peers.iter().map(|peer| peer.index()).collect(),
136+
)
137+
})
138+
.collect()
139+
}
84140
}
85141

86142
#[cfg(test)]
87143
mod tests {
88144
use netrunner::node::NodeIdExt;
145+
use rand::SeedableRng;
146+
use rand_chacha::ChaCha8Rng;
89147

90148
use super::*;
91149

@@ -97,9 +155,9 @@ mod tests {
97155
let peering_degree = 4;
98156

99157
let mut rng = rand::rngs::OsRng;
100-
let topology = build_topology(&nodes, peering_degree, &mut rng);
101-
assert_eq!(topology.len(), nodes.len());
102-
for (node, peers) in topology.iter() {
158+
let topology = Topology::new(&nodes, peering_degree, &mut rng);
159+
assert_eq!(topology.0.len(), nodes.len());
160+
for (node, peers) in topology.0.iter() {
103161
assert!(peers.len() == peering_degree);
104162
for peer in peers.iter() {
105163
assert!(topology.get(peer).unwrap().contains(node));
@@ -115,13 +173,25 @@ mod tests {
115173
let peering_degree = 3;
116174

117175
let mut rng = rand::rngs::OsRng;
118-
let topology = build_topology(&nodes, peering_degree, &mut rng);
119-
assert_eq!(topology.len(), nodes.len());
120-
for (node, peers) in topology.iter() {
176+
let topology = Topology::new(&nodes, peering_degree, &mut rng);
177+
assert_eq!(topology.0.len(), nodes.len());
178+
for (node, peers) in topology.0.iter() {
121179
assert!(peers.len() <= peering_degree);
122180
for peer in peers.iter() {
123181
assert!(topology.get(peer).unwrap().contains(node));
124182
}
125183
}
126184
}
185+
186+
#[test]
187+
fn test_diameter() {
188+
let nodes = (0..100).map(NodeId::from_index).collect::<Vec<_>>();
189+
let peering_degree = 4;
190+
let mut rng = ChaCha8Rng::seed_from_u64(0);
191+
let topology = Topology::new(&nodes, peering_degree, &mut rng);
192+
let diameter = topology.diameter();
193+
println!("diameter: {}", diameter);
194+
assert!(diameter > 0);
195+
assert!(diameter <= nodes.len());
196+
}
127197
}

0 commit comments

Comments
 (0)