Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use nym_network_defaults::{
};
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::Duration;

pub use nym_client_core::config::Config as BaseClientConfig;
pub use persistence::AuthenticatorPaths;
Expand All @@ -26,7 +27,6 @@ pub struct Config {

impl Config {
pub fn validate(&self) -> bool {
// no other sections have explicit requirements (yet)
self.base.validate()
}
}
Expand Down Expand Up @@ -57,6 +57,12 @@ pub struct Authenticator {
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv6.
/// The maximum value for IPv6 is 128
pub private_network_prefix_v6: u8,

/// Timeout to wait for responses from the peer controller before failing.
/// Helps the authenticator recover from suspend/resume scenarios where the peer controller
/// process/task can get stuck and never respond to oneshot RPC responses, which previously
/// caused the authenticator to block forever waiting on the oneshot channel.
pub peer_interaction_timeout: Duration,
}

impl Default for Authenticator {
Expand All @@ -68,6 +74,7 @@ impl Default for Authenticator {
tunnel_announced_port: WG_TUNNEL_PORT,
private_network_prefix_v4: WG_TUN_DEVICE_NETMASK_V4,
private_network_prefix_v6: WG_TUN_DEVICE_NETMASK_V6,
peer_interaction_timeout: default_peer_interaction_timeout(),
}
}
}
Expand All @@ -85,3 +92,7 @@ impl From<Authenticator> for nym_wireguard_types::Config {
}
}
}

pub fn default_peer_interaction_timeout() -> Duration {
Duration::from_millis(5_000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ pub enum AuthenticatorError {
#[error("peers can't be interacted with anymore")]
PeerInteractionStopped,

#[error("peers interaction timed out while attempting to {operation}")]
PeerInteractionTimeout { operation: &'static str },

#[error("unknown version number")]
UnknownVersion,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use tokio_stream::wrappers::IntervalStream;

type AuthenticatorHandleResult = Result<(Vec<u8>, Option<Recipient>), AuthenticatorError>;
Expand Down Expand Up @@ -74,7 +73,7 @@ pub(crate) struct MixnetListener {
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,

// Registrations awaiting confirmation
pub(crate) registered_and_free: RwLock<RegisteredAndFree>,
pub(crate) registered_and_free: RegisteredAndFree,

pub(crate) peer_manager: PeerManager,

Expand All @@ -95,14 +94,15 @@ impl MixnetListener {
mixnet_client: nym_sdk::mixnet::MixnetClient,
upgrade_mode: UpgradeModeDetails,
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
peer_interaction_timeout: Duration,
) -> Self {
let timeout_check_interval =
IntervalStream::new(tokio::time::interval(DEFAULT_REGISTRATION_TIMEOUT_CHECK));
MixnetListener {
config,
mixnet_client,
registered_and_free: RwLock::new(RegisteredAndFree::new(free_private_network_ips)),
peer_manager: PeerManager::new(wireguard_gateway_data),
registered_and_free: RegisteredAndFree::new(free_private_network_ips),
peer_manager: PeerManager::new(wireguard_gateway_data, peer_interaction_timeout),
upgrade_mode,
ecash_verifier,
timeout_check_interval,
Expand Down Expand Up @@ -131,8 +131,8 @@ impl MixnetListener {
))
}

async fn remove_stale_registrations(&self) -> Result<(), AuthenticatorError> {
let mut registered_and_free = self.registered_and_free.write().await;
async fn remove_stale_registrations(&mut self) -> Result<(), AuthenticatorError> {
let registered_and_free = &mut self.registered_and_free;
let registered_values: Vec<_> = registered_and_free
.registration_in_progres
.values()
Expand Down Expand Up @@ -185,8 +185,9 @@ impl MixnetListener {
) -> AuthenticatorHandleResult {
let remote_public = init_message.pub_key();
let nonce: u64 = fastrand::u64(..);
let mut registered_and_free = self.registered_and_free.write().await;
if let Some(registration_data) = registered_and_free

if let Some(registration_data) = self
.registered_and_free
.registration_in_progres
.get(&remote_public)
{
Expand Down Expand Up @@ -292,7 +293,17 @@ impl MixnetListener {
return Ok((bytes, reply_to));
}

let peer = self.peer_manager.query_peer(remote_public).await?;
let peer = match self.peer_manager.query_peer(remote_public).await {
Ok(peer) => peer,
Err(err) => {
tracing::warn!(
"Failed to query peer {}: {err}. Continuing with fresh registration",
remote_public
);
None
}
};

if let Some(peer) = peer {
let allowed_ipv4 = peer
.allowed_ips
Expand Down Expand Up @@ -383,27 +394,30 @@ impl MixnetListener {
return Ok((bytes, reply_to));
}

let private_ip_ref = registered_and_free
let private_ip = self
.registered_and_free
.free_private_network_ips
.iter_mut()
.filter(|r| r.1.is_none())
.choose(&mut thread_rng())
.ok_or(AuthenticatorError::NoFreeIp)?;
let private_ips = *private_ip_ref.0;
let private_ips = *private_ip.0;
// mark it as used, even though it's not final
*private_ip_ref.1 = Some(SystemTime::now());
*private_ip.1 = Some(SystemTime::now());

let gateway_data = GatewayClient::new(
self.keypair().private_key(),
remote_public.inner(),
*private_ip_ref.0,
private_ips,
nonce,
);
let registration_data = latest::registration::RegistrationData {
nonce,
gateway_data: gateway_data.clone(),
wg_port: self.config.authenticator.tunnel_announced_port,
};
registered_and_free

self.registered_and_free
.registration_in_progres
.insert(remote_public, registration_data.clone());
let bytes = match AuthenticatorVersion::from(protocol) {
Expand Down Expand Up @@ -539,12 +553,12 @@ impl MixnetListener {
request_id: u64,
reply_to: Option<Recipient>,
) -> AuthenticatorHandleResult {
let mut registered_and_free = self.registered_and_free.write().await;
let registration_data = registered_and_free
let registration_data = self
.registered_and_free
.registration_in_progres
.get(&final_message.gateway_client_pub_key())
.ok_or(AuthenticatorError::RegistrationNotInProgress)?
.clone();
.cloned()
.ok_or(AuthenticatorError::RegistrationNotInProgress)?;

if final_message
.verify(self.keypair().private_key(), registration_data.nonce)
Expand Down Expand Up @@ -595,7 +609,7 @@ impl MixnetListener {
return Err(e);
}

registered_and_free
self.registered_and_free
.registration_in_progres
.remove(&final_message.gateway_client_pub_key());

Expand Down Expand Up @@ -818,7 +832,7 @@ impl MixnetListener {
.to_bytes()
.map_err(AuthenticatorError::response_serialisation)?,
AuthenticatorVersion::V1 | AuthenticatorVersion::V2 | AuthenticatorVersion::UNKNOWN => {
return Err(AuthenticatorError::UnknownVersion)
return Err(AuthenticatorError::UnknownVersion);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,15 @@ impl Authenticator {
}
})
.collect();
let peer_timeout = self.config.authenticator.peer_interaction_timeout;
let mixnet_listener = crate::node::internal_service_providers::authenticator::mixnet_listener::MixnetListener::new(
self.config,
free_private_network_ips,
self.wireguard_gateway_data,
mixnet_client,
self.upgrade_mode_state,
self.ecash_verifier,
peer_timeout,
);

tracing::info!("The address of this client is: {self_address}");
Expand Down
Loading
Loading