Skip to content

Commit d594306

Browse files
committed
automatically attempt to reconnect to peers if a message was received successfully
1 parent cef17d6 commit d594306

File tree

9 files changed

+216
-107
lines changed

9 files changed

+216
-107
lines changed

lib/net/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::{
2323

2424
mod peer;
2525

26+
pub(crate) use peer::error::mailbox::Error as PeerConnectionMailboxError;
2627
use peer::{
2728
Connection, ConnectionContext as PeerConnectionCtxt,
2829
ConnectionHandle as PeerConnectionHandle,
@@ -256,6 +257,20 @@ impl Net {
256257
}
257258
}
258259

260+
/// Apply the provided function to the peer connection handle,
261+
/// if it exists.
262+
pub fn try_with_active_peer_connection<F, T>(
263+
&self,
264+
addr: SocketAddr,
265+
f: F,
266+
) -> Option<T>
267+
where
268+
F: FnMut(&PeerConnectionHandle) -> T,
269+
{
270+
let active_peers_read = self.active_peers.read();
271+
active_peers_read.get(&addr).map(f)
272+
}
273+
259274
// TODO: This should have more context.
260275
// Last received message, connection state, etc.
261276
pub fn get_active_peers(&self) -> Vec<Peer> {

lib/net/peer/channel_pool.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ use async_lock::{Semaphore, SemaphoreGuardArc};
1010
use futures::{FutureExt as _, Stream, StreamExt as _, future::Either, stream};
1111
use tokio::task::AbortHandle;
1212

13-
use crate::net::peer::{
14-
Connection, PeerResponseItem, error, join_set,
15-
message::{Heartbeat, Request},
13+
use crate::{
14+
net::peer::{
15+
Connection, PeerResponseItem, error,
16+
message::{Heartbeat, Request},
17+
},
18+
util::join_set,
1619
};
1720

1821
/// Type tags for channel limiters

lib/net/peer/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ pub(in crate::net::peer) mod request_queue {
188188
}
189189
}
190190

191-
pub(in crate::net::peer) mod mailbox {
191+
pub mod mailbox {
192192
#[derive(thiserror::Error, Debug)]
193193
pub enum Error {
194194
#[error("Heartbeat timeout")]

lib/net/peer/join_set.rs

Lines changed: 0 additions & 94 deletions
This file was deleted.

lib/net/peer/mailbox.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
//! Mailbox for a peer connection task
22
3+
use std::sync::{
4+
Arc,
5+
atomic::{self, AtomicBool},
6+
};
7+
38
use futures::{
49
Stream, StreamExt as _, TryFutureExt as _, channel::mpsc, stream,
510
};
@@ -65,9 +70,12 @@ pub struct Receiver {
6570
}
6671

6772
impl Receiver {
73+
/// `received_msg_successfully` is set to `True` if a valid message is
74+
/// received successfully.
6875
pub fn into_stream(
6976
self,
7077
connection: Connection,
78+
received_msg_successfully: &Arc<AtomicBool>,
7179
) -> impl Stream<Item = MailboxItem> + Unpin {
7280
let (peer_response_tx, peer_response_rx) = mpsc::unbounded();
7381
let internal_message_stream =
@@ -81,10 +89,14 @@ impl Receiver {
8189
.map(|err| MailboxItem::Error(err.into()));
8290
let peer_request_stream = stream::try_unfold((), move |()| {
8391
let conn = connection.clone();
92+
let received_msg_successfully = received_msg_successfully.clone();
8493
let fut = async move {
8594
let item = timeout(
8695
Connection::HEARTBEAT_TIMEOUT_INTERVAL,
87-
conn.receive_request(),
96+
conn.receive_request().inspect_ok(|_| {
97+
received_msg_successfully
98+
.store(true, atomic::Ordering::SeqCst);
99+
}),
88100
)
89101
.map_err(|_| Error::HeartbeatTimeout)
90102
.await??;
@@ -96,8 +108,12 @@ impl Receiver {
96108
Ok(peer_request) => MailboxItem::PeerRequest(peer_request),
97109
Err(err) => MailboxItem::Error(err),
98110
});
99-
let peer_response_stream =
100-
peer_response_rx.map(MailboxItem::PeerResponse);
111+
let peer_response_stream = peer_response_rx.map(|resp| {
112+
if resp.response.is_ok() {
113+
received_msg_successfully.store(true, atomic::Ordering::SeqCst);
114+
}
115+
MailboxItem::PeerResponse(resp)
116+
});
101117
stream::select_all([
102118
internal_message_stream.boxed(),
103119
heartbeat_stream.boxed(),

lib/net/peer/mod.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ use crate::{
2222
};
2323

2424
mod channel_pool;
25-
mod error;
26-
mod join_set;
27-
mod mailbox;
25+
pub(crate) mod error;
26+
pub(crate) mod mailbox;
2827
pub mod message;
2928
mod request_queue;
3029
mod task;
@@ -316,6 +315,8 @@ impl PeerConnectionStatus {
316315
/// Connection killed on drop
317316
pub struct ConnectionHandle {
318317
task: JoinHandle<()>,
318+
/// Indicates that at least one message has been received successfully
319+
pub(in crate::net) received_msg_successfully: Arc<AtomicBool>,
319320
/// Representation of [`PeerConnectionStatus`]
320321
pub(in crate::net) status_repr: Arc<AtomicBool>,
321322
/// Push messages from connection task / net task / node
@@ -328,6 +329,12 @@ impl ConnectionHandle {
328329
self.status_repr.load(atomic::Ordering::SeqCst),
329330
)
330331
}
332+
333+
/// Indicates that at least one message has been received successfully
334+
pub fn received_msg_successfully(&self) -> bool {
335+
self.received_msg_successfully
336+
.load(atomic::Ordering::SeqCst)
337+
}
331338
}
332339

333340
impl Drop for ConnectionHandle {
@@ -346,15 +353,18 @@ pub fn handle(
346353
let (info_tx, info_rx) = mpsc::unbounded();
347354
let (mailbox_tx, mailbox_rx) = mailbox::new();
348355
let internal_message_tx = mailbox_tx.internal_message_tx.clone();
356+
let received_msg_successfully = Arc::new(AtomicBool::new(false));
349357
let connection_task = {
350358
let info_tx = info_tx.clone();
359+
let received_msg_successfully = received_msg_successfully.clone();
351360
move || async move {
352361
let connection_task = ConnectionTask {
353362
connection,
354363
ctxt,
355364
info_tx,
356365
mailbox_rx,
357366
mailbox_tx,
367+
received_msg_successfully,
358368
};
359369
connection_task.run().await
360370
}
@@ -373,6 +383,7 @@ pub fn handle(
373383
let status = PeerConnectionStatus::Connected;
374384
let connection_handle = ConnectionHandle {
375385
task,
386+
received_msg_successfully,
376387
status_repr: Arc::new(AtomicBool::new(status.as_repr())),
377388
internal_message_tx,
378389
};
@@ -385,10 +396,12 @@ pub fn connect(
385396
) -> (ConnectionHandle, mpsc::UnboundedReceiver<Info>) {
386397
let connection_status = PeerConnectionStatus::Connecting;
387398
let status_repr = Arc::new(AtomicBool::new(connection_status.as_repr()));
399+
let received_msg_successfully = Arc::new(AtomicBool::new(false));
388400
let (info_tx, info_rx) = mpsc::unbounded();
389401
let (mailbox_tx, mailbox_rx) = mailbox::new();
390402
let internal_message_tx = mailbox_tx.internal_message_tx.clone();
391403
let connection_task = {
404+
let received_msg_successfully = received_msg_successfully.clone();
392405
let status_repr = status_repr.clone();
393406
let info_tx = info_tx.clone();
394407
move || async move {
@@ -404,6 +417,7 @@ pub fn connect(
404417
info_tx,
405418
mailbox_rx,
406419
mailbox_tx,
420+
received_msg_successfully,
407421
};
408422
connection_task.run().await
409423
}
@@ -419,6 +433,7 @@ pub fn connect(
419433
});
420434
let connection_handle = ConnectionHandle {
421435
task,
436+
received_msg_successfully,
422437
status_repr,
423438
internal_message_tx,
424439
};

lib/net/peer/task.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::{
44
cmp::Ordering,
55
collections::{HashMap, HashSet},
6+
sync::{Arc, atomic::AtomicBool},
67
};
78

89
use fallible_iterator::FallibleIterator;
@@ -32,6 +33,8 @@ pub(in crate::net::peer) struct ConnectionTask {
3233
pub mailbox_rx: mailbox::Receiver,
3334
/// Receiver for the task's mailbox
3435
pub mailbox_tx: mailbox::Sender,
36+
/// `True` if a valid message has been received successfully
37+
pub received_msg_successfully: Arc<AtomicBool>,
3538
}
3639

3740
impl ConnectionTask {
@@ -766,7 +769,9 @@ impl ConnectionTask {
766769
let mut peer_state = Option::<PeerStateId>::None;
767770
// known peer states
768771
let mut peer_states = HashMap::<PeerStateId, PeerState>::new();
769-
let mut mailbox_stream = self.mailbox_rx.into_stream(self.connection);
772+
let mut mailbox_stream = self
773+
.mailbox_rx
774+
.into_stream(self.connection, &self.received_msg_successfully);
770775
while let Some(mailbox_item) = mailbox_stream.next().await {
771776
match mailbox_item {
772777
MailboxItem::Error(err) => return Err(err.into()),

0 commit comments

Comments
 (0)