diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index bb0a77f755..82e14bb105 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -174,12 +174,14 @@ impl SequencerAppender { .next() .unwrap_or(with_jitter(DEFAULT_BACKOFF_TIME, 0.5)); if self.current_wave >= TONE_ESCALATION_THRESHOLD { + let estimated_bytes = self.records.estimated_encode_size(); warn!( wave = %self.current_wave, loglet_id=%self.sequencer_shared_state.loglet_id(), first_offset=%self.first_offset, to_offset=%self.records.last_offset(self.first_offset).unwrap(), length=%self.records.len(), + estimated_bytes=%estimated_bytes, otel.name="replicated_loglet::sequencer::appender: run", "Append wave failed, retrying with a new wave after {:?}. Status is {}", delay, self.nodeset_status ); @@ -353,18 +355,6 @@ impl SequencerAppender { StoreTaskStatus::Shutdown => { return State::Cancelled; } - StoreTaskStatus::Error(RpcError::Timeout(spent)) => { - // Yes, I know those checks are ugly, but it's a quick and dirty way until we - // have a nice macro for it. - if self.current_wave >= TONE_ESCALATION_THRESHOLD { - debug!(peer = %node_id, "Timeout waiting for node {} to commit a batch, spent={:?}", node_id, spent); - } else { - trace!(peer = %node_id, "Timeout waiting for node {} to commit a batch, spent={:?}", node_id, spent); - } - self.nodeset_status.merge(node_id, PerNodeStatus::timeout()); - self.graylist.insert(node_id); - continue; - } StoreTaskStatus::Error(err) => { // couldn't send store command to remote server if self.current_wave >= TONE_ESCALATION_THRESHOLD { @@ -372,7 +362,8 @@ impl SequencerAppender { } else { trace!(peer = %node_id, %err, "Failed to send batch to node"); } - self.nodeset_status.merge(node_id, PerNodeStatus::failed()); + self.nodeset_status + .merge(node_id, PerNodeStatus::failed(err)); self.graylist.insert(node_id); continue; } @@ -445,17 +436,13 @@ impl SequencerAppender { } } -#[derive(Default, Debug, PartialEq, Clone, Copy)] +#[derive(Default, Debug)] enum PerNodeStatus { #[default] NotAttempted, - // todo: the distinction between timeout and failed might not be worth the hassle. - // consider only doing failed if in practice it wasn't as useful to keep both variants. Failed { attempts: usize, - }, - Timeout { - attempts: usize, + last_err: RpcError, }, Committed, Sealed, @@ -465,27 +452,28 @@ impl Display for PerNodeStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { PerNodeStatus::NotAttempted => write!(f, ""), - PerNodeStatus::Failed { attempts } => write!(f, "ERROR({attempts})"), + PerNodeStatus::Failed { attempts, last_err } => { + write!(f, "ERROR(attempts={attempts}, last_err='{last_err}')") + } PerNodeStatus::Committed => write!(f, "COMMITTED"), - PerNodeStatus::Timeout { attempts } => write!(f, "TIMEDOUT({attempts})"), PerNodeStatus::Sealed => write!(f, "SEALED"), } } } impl PerNodeStatus { - fn timeout() -> Self { - Self::Timeout { attempts: 1 } - } - fn failed() -> Self { - Self::Failed { attempts: 1 } + fn failed(err: RpcError) -> Self { + Self::Failed { + attempts: 1, + last_err: err, + } } } impl Merge for PerNodeStatus { fn merge(&mut self, other: Self) -> bool { use PerNodeStatus::*; - match (*self, other) { + match (&self, other) { (NotAttempted, NotAttempted) => false, (Committed, Committed) => false, (NotAttempted, e) => { @@ -496,20 +484,17 @@ impl Merge for PerNodeStatus { // committed is more important for showing where did we write. Not that this is likely // to ever happen though. (Committed, _) => false, - (Failed { attempts: a1 }, Failed { attempts: a2 }) => { - *self = Failed { attempts: a1 + a2 }; - true - } - (Failed { attempts: a1 }, Timeout { attempts: a2 }) => { - *self = Timeout { attempts: a1 + a2 }; - true - } - (Timeout { attempts: a1 }, Failed { attempts: a2 }) => { - *self = Failed { attempts: a1 + a2 }; - true - } - (Timeout { attempts: a1 }, Timeout { attempts: a2 }) => { - *self = Timeout { attempts: a1 + a2 }; + ( + Failed { attempts: a1, .. }, + Failed { + attempts: a2, + last_err, + }, + ) => { + *self = Failed { + attempts: *a1 + a2, + last_err, + }; true } (_, Committed) => { diff --git a/crates/core/src/network/grpc/connector.rs b/crates/core/src/network/grpc/connector.rs index 0828a28b62..cba4c1cf26 100644 --- a/crates/core/src/network/grpc/connector.rs +++ b/crates/core/src/network/grpc/connector.rs @@ -17,7 +17,7 @@ use tokio_stream::StreamExt; use tonic::codec::CompressionEncoding; use tonic::transport::Endpoint; use tonic::transport::channel::Channel; -use tracing::debug; +use tracing::{debug, warn}; use restate_types::config::{Configuration, NetworkingOptions}; use restate_types::net::address::{AdvertisedAddress, GrpcPort, ListenerPort, PeerNetAddress}; @@ -71,7 +71,13 @@ impl TransportConnect for GrpcConnector { }; let incoming = client.create_connection(output_stream).await?.into_inner(); - Ok(incoming.map_while(|x| x.ok())) + Ok(incoming.map_while(|x| match x { + Ok(msg) => Some(msg), + Err(err) => { + warn!(%err, "Error while receiving network message from peer, connection will be dropped"); + None + } + })) } } diff --git a/crates/core/src/network/grpc/svc_handler.rs b/crates/core/src/network/grpc/svc_handler.rs index eb821e9c9a..e5c5d8adfa 100644 --- a/crates/core/src/network/grpc/svc_handler.rs +++ b/crates/core/src/network/grpc/svc_handler.rs @@ -12,6 +12,7 @@ use futures::stream::BoxStream; use tokio_stream::StreamExt; use tonic::codec::CompressionEncoding; use tonic::{Request, Response, Status, Streaming}; +use tracing::warn; use restate_types::config::NetworkingOptions; @@ -70,7 +71,13 @@ impl CoreNodeSvc for CoreNodeSvcHandler { request: Request>, ) -> Result, Status> { let incoming = request.into_inner(); - let transformed = incoming.map_while(|x| x.ok()); + let transformed = incoming.map_while(|x| match x { + Ok(msg) => Some(msg), + Err(err) => { + warn!(%err, "Error while receiving network message from peer, connection will be dropped"); + None + } + }); let output_stream = self .connections .accept_incoming_connection(transformed) diff --git a/crates/core/src/network/types.rs b/crates/core/src/network/types.rs index 6786d0f55a..f450462faa 100644 --- a/crates/core/src/network/types.rs +++ b/crates/core/src/network/types.rs @@ -17,6 +17,7 @@ use bytes::Bytes; use futures::FutureExt; use tokio::sync::oneshot; +use restate_time_util::DurationExt; use restate_types::net::address::AdvertisedAddress; use restate_types::net::address::FabricPort; use restate_types::net::metadata::MetadataKind; @@ -176,7 +177,7 @@ pub enum RpcError { Send(#[from] MessageSendError), #[error(transparent)] Receive(#[from] RpcReplyError), - #[error("timed out")] + #[error("timed out after {}", .0.friendly())] Timeout(Duration), }