diff --git a/Cargo.lock b/Cargo.lock index 7ec3531bb6d..391735e4de6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6432,6 +6432,7 @@ dependencies = [ "futures", "hkdf", "human-repr", + "humantime", "humantime-serde", "indicatif", "ipnetwork", diff --git a/gateway/src/node/internal_service_providers/authenticator/config/mod.rs b/gateway/src/node/internal_service_providers/authenticator/config/mod.rs index 1a1e5423687..b05be7d6de8 100644 --- a/gateway/src/node/internal_service_providers/authenticator/config/mod.rs +++ b/gateway/src/node/internal_service_providers/authenticator/config/mod.rs @@ -7,6 +7,7 @@ use nym_network_defaults::{ }; use serde::{Deserialize, Serialize}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::time::Duration; pub use nym_client_core::config::Config as BaseClientConfig; pub use persistence::AuthenticatorPaths; @@ -26,7 +27,6 @@ pub struct Config { impl Config { pub fn validate(&self) -> bool { - // no other sections have explicit requirements (yet) self.base.validate() } } @@ -57,6 +57,12 @@ pub struct Authenticator { /// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv6. /// The maximum value for IPv6 is 128 pub private_network_prefix_v6: u8, + + /// Timeout to wait for responses from the peer controller before failing. + /// Helps the authenticator recover from suspend/resume scenarios where the peer controller + /// process/task can get stuck and never respond to oneshot RPC responses, which previously + /// caused the authenticator to block forever waiting on the oneshot channel. + pub peer_interaction_timeout: Duration, } impl Default for Authenticator { @@ -68,6 +74,7 @@ impl Default for Authenticator { tunnel_announced_port: WG_TUNNEL_PORT, private_network_prefix_v4: WG_TUN_DEVICE_NETMASK_V4, private_network_prefix_v6: WG_TUN_DEVICE_NETMASK_V6, + peer_interaction_timeout: default_peer_interaction_timeout(), } } } @@ -85,3 +92,7 @@ impl From for nym_wireguard_types::Config { } } } + +pub fn default_peer_interaction_timeout() -> Duration { + Duration::from_millis(5_000) +} diff --git a/gateway/src/node/internal_service_providers/authenticator/error.rs b/gateway/src/node/internal_service_providers/authenticator/error.rs index 5bdde159194..8f5de74508b 100644 --- a/gateway/src/node/internal_service_providers/authenticator/error.rs +++ b/gateway/src/node/internal_service_providers/authenticator/error.rs @@ -85,6 +85,9 @@ pub enum AuthenticatorError { #[error("peers can't be interacted with anymore")] PeerInteractionStopped, + #[error("peers interaction timed out while attempting to {operation}")] + PeerInteractionTimeout { operation: &'static str }, + #[error("unknown version number")] UnknownVersion, diff --git a/gateway/src/node/internal_service_providers/authenticator/mixnet_listener.rs b/gateway/src/node/internal_service_providers/authenticator/mixnet_listener.rs index c05d9d8cd6f..77a7d38d063 100644 --- a/gateway/src/node/internal_service_providers/authenticator/mixnet_listener.rs +++ b/gateway/src/node/internal_service_providers/authenticator/mixnet_listener.rs @@ -42,7 +42,6 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; -use tokio::sync::RwLock; use tokio_stream::wrappers::IntervalStream; type AuthenticatorHandleResult = Result<(Vec, Option), AuthenticatorError>; @@ -74,7 +73,7 @@ pub(crate) struct MixnetListener { pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient, // Registrations awaiting confirmation - pub(crate) registered_and_free: RwLock, + pub(crate) registered_and_free: RegisteredAndFree, pub(crate) peer_manager: PeerManager, @@ -95,14 +94,15 @@ impl MixnetListener { mixnet_client: nym_sdk::mixnet::MixnetClient, upgrade_mode: UpgradeModeDetails, ecash_verifier: Arc, + peer_interaction_timeout: Duration, ) -> Self { let timeout_check_interval = IntervalStream::new(tokio::time::interval(DEFAULT_REGISTRATION_TIMEOUT_CHECK)); MixnetListener { config, mixnet_client, - registered_and_free: RwLock::new(RegisteredAndFree::new(free_private_network_ips)), - peer_manager: PeerManager::new(wireguard_gateway_data), + registered_and_free: RegisteredAndFree::new(free_private_network_ips), + peer_manager: PeerManager::new(wireguard_gateway_data, peer_interaction_timeout), upgrade_mode, ecash_verifier, timeout_check_interval, @@ -131,8 +131,8 @@ impl MixnetListener { )) } - async fn remove_stale_registrations(&self) -> Result<(), AuthenticatorError> { - let mut registered_and_free = self.registered_and_free.write().await; + async fn remove_stale_registrations(&mut self) -> Result<(), AuthenticatorError> { + let registered_and_free = &mut self.registered_and_free; let registered_values: Vec<_> = registered_and_free .registration_in_progres .values() @@ -185,8 +185,9 @@ impl MixnetListener { ) -> AuthenticatorHandleResult { let remote_public = init_message.pub_key(); let nonce: u64 = fastrand::u64(..); - let mut registered_and_free = self.registered_and_free.write().await; - if let Some(registration_data) = registered_and_free + + if let Some(registration_data) = self + .registered_and_free .registration_in_progres .get(&remote_public) { @@ -292,7 +293,17 @@ impl MixnetListener { return Ok((bytes, reply_to)); } - let peer = self.peer_manager.query_peer(remote_public).await?; + let peer = match self.peer_manager.query_peer(remote_public).await { + Ok(peer) => peer, + Err(err) => { + tracing::warn!( + "Failed to query peer {}: {err}. Continuing with fresh registration", + remote_public + ); + None + } + }; + if let Some(peer) = peer { let allowed_ipv4 = peer .allowed_ips @@ -383,19 +394,21 @@ impl MixnetListener { return Ok((bytes, reply_to)); } - let private_ip_ref = registered_and_free + let private_ip = self + .registered_and_free .free_private_network_ips .iter_mut() .filter(|r| r.1.is_none()) .choose(&mut thread_rng()) .ok_or(AuthenticatorError::NoFreeIp)?; - let private_ips = *private_ip_ref.0; + let private_ips = *private_ip.0; // mark it as used, even though it's not final - *private_ip_ref.1 = Some(SystemTime::now()); + *private_ip.1 = Some(SystemTime::now()); + let gateway_data = GatewayClient::new( self.keypair().private_key(), remote_public.inner(), - *private_ip_ref.0, + private_ips, nonce, ); let registration_data = latest::registration::RegistrationData { @@ -403,7 +416,8 @@ impl MixnetListener { gateway_data: gateway_data.clone(), wg_port: self.config.authenticator.tunnel_announced_port, }; - registered_and_free + + self.registered_and_free .registration_in_progres .insert(remote_public, registration_data.clone()); let bytes = match AuthenticatorVersion::from(protocol) { @@ -539,12 +553,12 @@ impl MixnetListener { request_id: u64, reply_to: Option, ) -> AuthenticatorHandleResult { - let mut registered_and_free = self.registered_and_free.write().await; - let registration_data = registered_and_free + let registration_data = self + .registered_and_free .registration_in_progres .get(&final_message.gateway_client_pub_key()) - .ok_or(AuthenticatorError::RegistrationNotInProgress)? - .clone(); + .cloned() + .ok_or(AuthenticatorError::RegistrationNotInProgress)?; if final_message .verify(self.keypair().private_key(), registration_data.nonce) @@ -595,7 +609,7 @@ impl MixnetListener { return Err(e); } - registered_and_free + self.registered_and_free .registration_in_progres .remove(&final_message.gateway_client_pub_key()); @@ -818,7 +832,7 @@ impl MixnetListener { .to_bytes() .map_err(AuthenticatorError::response_serialisation)?, AuthenticatorVersion::V1 | AuthenticatorVersion::V2 | AuthenticatorVersion::UNKNOWN => { - return Err(AuthenticatorError::UnknownVersion) + return Err(AuthenticatorError::UnknownVersion); } }; diff --git a/gateway/src/node/internal_service_providers/authenticator/mod.rs b/gateway/src/node/internal_service_providers/authenticator/mod.rs index f63a86fcc2d..bac338400c8 100644 --- a/gateway/src/node/internal_service_providers/authenticator/mod.rs +++ b/gateway/src/node/internal_service_providers/authenticator/mod.rs @@ -151,6 +151,7 @@ impl Authenticator { } }) .collect(); + let peer_timeout = self.config.authenticator.peer_interaction_timeout; let mixnet_listener = crate::node::internal_service_providers::authenticator::mixnet_listener::MixnetListener::new( self.config, free_private_network_ips, @@ -158,6 +159,7 @@ impl Authenticator { mixnet_client, self.upgrade_mode_state, self.ecash_verifier, + peer_timeout, ); tracing::info!("The address of this client is: {self_address}"); diff --git a/gateway/src/node/internal_service_providers/authenticator/peer_manager.rs b/gateway/src/node/internal_service_providers/authenticator/peer_manager.rs index c057dfa57cc..85130380fe3 100644 --- a/gateway/src/node/internal_service_providers/authenticator/peer_manager.rs +++ b/gateway/src/node/internal_service_providers/authenticator/peer_manager.rs @@ -8,15 +8,19 @@ use nym_credential_verification::{ClientBandwidth, TicketVerifier}; use nym_credentials_interface::CredentialSpendingData; use nym_wireguard::{peer_controller::PeerControlRequest, WireguardGatewayData}; use nym_wireguard_types::PeerPublicKey; +use std::time::Duration; +use tokio::time::timeout; pub struct PeerManager { pub(crate) wireguard_gateway_data: WireguardGatewayData, + response_timeout: Duration, } impl PeerManager { - pub fn new(wireguard_gateway_data: WireguardGatewayData) -> Self { + pub fn new(wireguard_gateway_data: WireguardGatewayData, response_timeout: Duration) -> Self { PeerManager { wireguard_gateway_data, + response_timeout, } } pub async fn add_peer(&self, peer: Peer) -> Result<(), AuthenticatorError> { @@ -28,9 +32,8 @@ impl PeerManager { .await .map_err(|_| AuthenticatorError::PeerInteractionStopped)?; - response_rx - .await - .map_err(|_| AuthenticatorError::InternalError("no response for add peer".to_string()))? + recv_with_timeout(response_rx, "add peer", self.response_timeout) + .await? .map_err(|err| { AuthenticatorError::InternalError(format!( "adding peer could not be performed: {err:?}" @@ -48,11 +51,8 @@ impl PeerManager { .await .map_err(|_| AuthenticatorError::PeerInteractionStopped)?; - response_rx - .await - .map_err(|_| { - AuthenticatorError::InternalError("no response for remove peer".to_string()) - })? + recv_with_timeout(response_rx, "remove peer", self.response_timeout) + .await? .map_err(|err| { AuthenticatorError::InternalError(format!( "removing peer could not be performed: {err:?}" @@ -73,11 +73,8 @@ impl PeerManager { .await .map_err(|_| AuthenticatorError::PeerInteractionStopped)?; - response_rx - .await - .map_err(|_| { - AuthenticatorError::InternalError("no response for query peer".to_string()) - })? + recv_with_timeout(response_rx, "query peer", self.response_timeout) + .await? .map_err(|err| { AuthenticatorError::InternalError(format!( "querying peer could not be performed: {err:?}" @@ -106,13 +103,8 @@ impl PeerManager { .await .map_err(|_| AuthenticatorError::PeerInteractionStopped)?; - response_rx - .await - .map_err(|_| { - AuthenticatorError::InternalError( - "no response for query client bandwidth".to_string(), - ) - })? + recv_with_timeout(response_rx, "query client bandwidth", self.response_timeout) + .await? .map_err(|err| { AuthenticatorError::InternalError(format!( "querying client bandwidth could not be performed: {err:?}" @@ -138,11 +130,8 @@ impl PeerManager { .await .map_err(|_| AuthenticatorError::PeerInteractionStopped)?; - response_rx - .await - .map_err(|_| { - AuthenticatorError::InternalError("no response for query verifier".to_string()) - })? + recv_with_timeout(response_rx, "query verifier", self.response_timeout) + .await? .map_err(|err| { AuthenticatorError::InternalError(format!( "querying verifier could not be performed: {err:?}" @@ -151,10 +140,31 @@ impl PeerManager { } } +async fn recv_with_timeout( + response_rx: oneshot::Receiver, + operation: &'static str, + timeout_duration: Duration, +) -> Result { + // Suspend/resume can wedge the peer controller, so we bound the wait to avoid deadlocking + // authenticator responses on a stuck oneshot channel. + match timeout(timeout_duration, response_rx).await { + Ok(Ok(value)) => Ok(value), + Ok(Err(_)) => Err(AuthenticatorError::PeerInteractionStopped), + Err(_) => { + tracing::warn!( + "peer controller response timed out while attempting to {operation} after {:?}", + timeout_duration + ); + Err(AuthenticatorError::PeerInteractionTimeout { operation }) + } + } +} + #[cfg(test)] mod tests { use std::{str::FromStr, sync::Arc}; + use futures::channel::oneshot; use nym_credential_verification::{ bandwidth_storage_manager::BandwidthStorageManager, ecash::MockEcashManager, }; @@ -163,7 +173,8 @@ mod tests { use nym_gateway_storage::traits::{mock::MockGatewayStorage, BandwidthGatewayStorage}; use nym_wireguard::peer_controller::{start_controller, stop_controller}; use rand::rngs::OsRng; - use time::{Duration, OffsetDateTime}; + use std::time::Duration; + use time::{Duration as TimeDuration, OffsetDateTime}; use tokio::sync::RwLock; use crate::nym_authenticator::{ @@ -243,7 +254,7 @@ mod tests { Authenticator::default().into(), Arc::new(KeyPair::new(&mut OsRng)), ); - let peer_manager = PeerManager::new(wireguard_data); + let peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5)); let (storage, task_manager) = start_controller( peer_manager.wireguard_gateway_data.peer_tx().clone(), request_rx, @@ -291,7 +302,7 @@ mod tests { Authenticator::default().into(), Arc::new(KeyPair::new(&mut OsRng)), ); - let mut peer_manager = PeerManager::new(wireguard_data); + let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5)); let key = Key::default(); let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); let (storage, task_manager) = start_controller( @@ -311,7 +322,7 @@ mod tests { Authenticator::default().into(), Arc::new(KeyPair::new(&mut OsRng)), ); - let mut peer_manager = PeerManager::new(wireguard_data); + let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5)); let key = Key::default(); let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); let (storage, task_manager) = start_controller( @@ -334,7 +345,7 @@ mod tests { Authenticator::default().into(), Arc::new(KeyPair::new(&mut OsRng)), ); - let mut peer_manager = PeerManager::new(wireguard_data); + let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5)); let key = Key::default(); let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); let (storage, task_manager) = start_controller( @@ -357,7 +368,7 @@ mod tests { Authenticator::default().into(), Arc::new(KeyPair::new(&mut OsRng)), ); - let mut peer_manager = PeerManager::new(wireguard_data); + let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5)); let key = Key::default(); let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); let (storage, task_manager) = start_controller( @@ -388,7 +399,7 @@ mod tests { Authenticator::default().into(), Arc::new(KeyPair::new(&mut OsRng)), ); - let mut peer_manager = PeerManager::new(wireguard_data); + let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5)); let key = Key::default(); let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); let (storage, task_manager) = start_controller( @@ -417,7 +428,7 @@ mod tests { Authenticator::default().into(), Arc::new(KeyPair::new(&mut OsRng)), ); - let mut peer_manager = PeerManager::new(wireguard_data); + let mut peer_manager = PeerManager::new(wireguard_data, Duration::from_secs(5)); let key = Key::default(); let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); let top_up = 42; @@ -444,7 +455,7 @@ mod tests { .increase_bandwidth( Bandwidth::new_unchecked(top_up as u64), OffsetDateTime::now_utc() - .checked_add(Duration::minutes(1)) + .checked_add(TimeDuration::minutes(1)) .unwrap(), ) .await @@ -466,4 +477,28 @@ mod tests { stop_controller(task_manager).await; } + + #[tokio::test] + async fn recv_with_timeout_errors_after_deadline() { + let (_tx, rx) = oneshot::channel::<()>(); + let err = super::recv_with_timeout(rx, "unit-test", Duration::from_millis(10)) + .await + .unwrap_err(); + assert!(matches!( + err, + AuthenticatorError::PeerInteractionTimeout { + operation: "unit-test" + } + )); + } + + #[tokio::test] + async fn recv_with_timeout_succeeds_before_deadline() { + let (tx, rx) = oneshot::channel::(); + tx.send(42).unwrap(); + let value = super::recv_with_timeout(rx, "unit-test", Duration::from_secs(1)) + .await + .unwrap(); + assert_eq!(value, 42); + } } diff --git a/nym-node/Cargo.toml b/nym-node/Cargo.toml index f431886ebfd..6ce6e796b1f 100644 --- a/nym-node/Cargo.toml +++ b/nym-node/Cargo.toml @@ -27,6 +27,7 @@ console-subscriber = { workspace = true, optional = true } csv = { workspace = true } clap = { workspace = true, features = ["cargo", "env"] } futures = { workspace = true } +humantime = { workspace = true } humantime-serde = { workspace = true } human-repr = { workspace = true } ipnetwork = { workspace = true } diff --git a/nym-node/src/config/mod.rs b/nym-node/src/config/mod.rs index 08b578760ef..16225fb2cc2 100644 --- a/nym-node/src/config/mod.rs +++ b/nym-node/src/config/mod.rs @@ -944,6 +944,7 @@ pub struct Wireguard { /// Tunnel port announced to external clients wishing to connect to the wireguard interface. /// Useful in the instances where the node is behind a proxy. + #[serde(alias = "announced_port")] pub announced_tunnel_port: u16, /// Metadata port announced to external clients wishing to connect to the metadata endpoint. @@ -1001,6 +1002,7 @@ impl From for nym_authenticator::config::Authenticator { tunnel_announced_port: value.announced_tunnel_port, private_network_prefix_v4: value.private_network_prefix_v4, private_network_prefix_v6: value.private_network_prefix_v6, + peer_interaction_timeout: nym_authenticator::config::default_peer_interaction_timeout(), } } } diff --git a/nym-node/src/config/old_configs/old_config_v10.rs b/nym-node/src/config/old_configs/old_config_v10.rs index e45cca8dd21..7fa7c9a2695 100644 --- a/nym-node/src/config/old_configs/old_config_v10.rs +++ b/nym-node/src/config/old_configs/old_config_v10.rs @@ -63,6 +63,7 @@ pub struct WireguardV10 { /// Port announced to external clients wishing to connect to the wireguard interface. /// Useful in the instances where the node is behind a proxy. + #[serde(alias = "announced_tunnel_port")] pub announced_port: u16, /// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv4.