Skip to content

Commit ed41dd4

Browse files
authored
Merge pull request #570 from kinode-dao/release-candidate
hotfix: in net, use AtomicU64 in Peers to respect fd_manager limits
2 parents 76c312a + 2c6c3e2 commit ed41dd4

File tree

7 files changed

+64
-36
lines changed

7 files changed

+64
-36
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "kinode_lib"
33
authors = ["KinodeDAO"]
4-
version = "0.9.5"
4+
version = "0.9.6"
55
edition = "2021"
66
description = "A general-purpose sovereign cloud computing platform"
77
homepage = "https://kinode.org"

kinode/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "kinode"
33
authors = ["KinodeDAO"]
4-
version = "0.9.5"
4+
version = "0.9.6"
55
edition = "2021"
66
description = "A general-purpose sovereign cloud computing platform"
77
homepage = "https://kinode.org"

kinode/src/net/connect.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,40 @@ use tokio::sync::mpsc;
66

77
/// if target is a peer, queue to be routed
88
/// otherwise, create peer and initiate routing
9-
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) {
9+
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, mut km: KernelMessage) {
1010
if let Some(mut peer) = data.peers.get_mut(&km.target.node) {
11-
peer.sender.send(km).expect("net: peer sender was dropped");
12-
peer.set_last_message();
13-
} else {
14-
let Some(peer_id) = data.pki.get(&km.target.node) else {
15-
return utils::error_offline(km, &ext.network_error_tx).await;
16-
};
17-
let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false);
18-
// send message to be routed
19-
peer.send(km);
20-
data.peers.insert(peer_id.name.clone(), peer).await;
21-
tokio::spawn(connect_to_peer(
22-
ext.clone(),
23-
data.clone(),
24-
peer_id.clone(),
25-
peer_rx,
26-
));
11+
match peer.send(km) {
12+
Ok(()) => {
13+
peer.set_last_message();
14+
return;
15+
}
16+
Err(e_km) => {
17+
// peer connection was closed, remove it and try to reconnect
18+
data.peers.remove(&peer.identity.name).await;
19+
km = e_km.0;
20+
}
21+
}
2722
}
23+
let Some(peer_id) = data.pki.get(&km.target.node) else {
24+
return utils::error_offline(km, &ext.network_error_tx).await;
25+
};
26+
let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false);
27+
// send message to be routed
28+
match peer.send(km) {
29+
Ok(()) => {
30+
peer.set_last_message();
31+
}
32+
Err(e_km) => {
33+
return utils::error_offline(e_km.0, &ext.network_error_tx).await;
34+
}
35+
};
36+
data.peers.insert(peer_id.name.clone(), peer).await;
37+
tokio::spawn(connect_to_peer(
38+
ext.clone(),
39+
data.clone(),
40+
peer_id.clone(),
41+
peer_rx,
42+
));
2843
}
2944

3045
/// based on peer's identity, either use one of their

kinode/src/net/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ pub async fn networking(
6969
peers,
7070
pending_passthroughs,
7171
active_passthroughs,
72-
max_peers,
7372
max_passthroughs,
7473
fds_limit: 10, // small hardcoded limit that gets replaced by fd_manager soon after boot
7574
};
@@ -212,7 +211,7 @@ async fn handle_local_request(
212211
printout.push_str(&format!(
213212
"we have connections with {} peers ({} max):\r\n",
214213
data.peers.peers().len(),
215-
data.max_peers,
214+
data.peers.max_peers(),
216215
));
217216

218217
let now = std::time::SystemTime::now()
@@ -342,16 +341,17 @@ async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &mut NetDat
342341
match req {
343342
lib::core::FdManagerRequest::FdsLimit(fds_limit) => {
344343
data.fds_limit = fds_limit;
345-
if data.max_peers > fds_limit {
346-
data.max_peers = fds_limit;
347-
}
344+
data.peers.set_max_peers(fds_limit);
348345
// TODO combine with max_peers check
346+
// only update passthrough limit if it's higher than the new fds limit
347+
// most nodes have passthroughs disabled, meaning this will keep it at 0
349348
if data.max_passthroughs > fds_limit {
350349
data.max_passthroughs = fds_limit;
351350
}
352351
// TODO cull passthroughs too
353352
if data.peers.peers().len() >= data.fds_limit as usize {
354353
let diff = data.peers.peers().len() - data.fds_limit as usize;
354+
println!("net: culling {diff} peer(s)\r\n");
355355
data.peers.cull(diff).await;
356356
}
357357
}

kinode/src/net/types.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use {
66
dashmap::DashMap,
77
ring::signature::Ed25519KeyPair,
88
serde::{Deserialize, Serialize},
9+
std::sync::atomic::AtomicU64,
910
std::sync::Arc,
1011
tokio::net::TcpStream,
1112
tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender},
@@ -57,15 +58,15 @@ pub struct RoutingRequest {
5758

5859
#[derive(Clone)]
5960
pub struct Peers {
60-
max_peers: u64,
61+
max_peers: Arc<AtomicU64>,
6162
send_to_loop: MessageSender,
6263
peers: Arc<DashMap<String, Peer>>,
6364
}
6465

6566
impl Peers {
6667
pub fn new(max_peers: u64, send_to_loop: MessageSender) -> Self {
6768
Self {
68-
max_peers,
69+
max_peers: Arc::new(max_peers.into()),
6970
send_to_loop,
7071
peers: Arc::new(DashMap::new()),
7172
}
@@ -75,6 +76,15 @@ impl Peers {
7576
&self.peers
7677
}
7778

79+
pub fn max_peers(&self) -> u64 {
80+
self.max_peers.load(std::sync::atomic::Ordering::Relaxed)
81+
}
82+
83+
pub fn set_max_peers(&self, max_peers: u64) {
84+
self.max_peers
85+
.store(max_peers, std::sync::atomic::Ordering::Relaxed);
86+
}
87+
7888
pub fn get(&self, name: &str) -> Option<dashmap::mapref::one::Ref<'_, String, Peer>> {
7989
self.peers.get(name)
8090
}
@@ -94,15 +104,15 @@ impl Peers {
94104
/// remove the one with the oldest last_message.
95105
pub async fn insert(&self, name: String, peer: Peer) {
96106
self.peers.insert(name, peer);
97-
if self.peers.len() > self.max_peers as usize {
107+
if self.peers.len() as u64 > self.max_peers.load(std::sync::atomic::Ordering::Relaxed) {
98108
let oldest = self
99109
.peers
100110
.iter()
101111
.min_by_key(|p| p.last_message)
102112
.unwrap()
103113
.key()
104114
.clone();
105-
self.peers.remove(&oldest);
115+
self.remove(&oldest).await;
106116
crate::fd_manager::send_fd_manager_hit_fds_limit(
107117
&Address::new("our", NET_PROCESS_ID.clone()),
108118
&self.send_to_loop,
@@ -122,7 +132,7 @@ impl Peers {
122132
sorted_peers.sort_by_key(|p| p.last_message);
123133
to_remove.extend(sorted_peers.iter().take(n));
124134
for peer in to_remove {
125-
self.peers.remove(&peer.identity.name);
135+
self.remove(&peer.identity.name).await;
126136
}
127137
crate::fd_manager::send_fd_manager_hit_fds_limit(
128138
&Address::new("our", NET_PROCESS_ID.clone()),
@@ -189,9 +199,13 @@ impl Peer {
189199
}
190200

191201
/// Send a message to the peer.
192-
pub fn send(&mut self, km: KernelMessage) {
193-
self.sender.send(km).expect("net: peer sender was dropped");
202+
pub fn send(
203+
&mut self,
204+
km: KernelMessage,
205+
) -> Result<(), tokio::sync::mpsc::error::SendError<KernelMessage>> {
206+
self.sender.send(km)?;
194207
self.set_last_message();
208+
Ok(())
195209
}
196210

197211
/// Update the last message time to now.
@@ -222,7 +236,6 @@ pub struct NetData {
222236
pub pending_passthroughs: PendingPassthroughs,
223237
/// only used by routers
224238
pub active_passthroughs: ActivePassthroughs,
225-
pub max_peers: u64,
226239
pub max_passthroughs: u64,
227240
pub fds_limit: u64,
228241
}

lib/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "lib"
33
authors = ["KinodeDAO"]
4-
version = "0.9.5"
4+
version = "0.9.6"
55
edition = "2021"
66
description = "A general-purpose sovereign cloud computing platform"
77
homepage = "https://kinode.org"

0 commit comments

Comments
 (0)