Skip to content

Commit 7655eb1

Browse files
sanityclaude
andauthored
fix(transport): drive gateway admission rate limiter by VirtualTime in sim (#4383)
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 7e09c1e commit 7655eb1

3 files changed

Lines changed: 184 additions & 23 deletions

File tree

crates/core/src/transport/connection_handler.rs

Lines changed: 168 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ use super::{
4141
symmetric_message::{SymmetricMessage, SymmetricMessagePayload},
4242
token_bucket::TokenBucket,
4343
};
44-
use crate::simulation::{RealTime, TimeSource};
44+
use crate::simulation::{RealTime, TimeSource, VirtualTime};
45+
use crate::transport::in_memory_socket::get_socket_virtual_time;
4546

4647
// Constants for interval increase
4748
const INITIAL_INTERVAL: Duration = Duration::from_millis(50);
@@ -296,7 +297,16 @@ impl<S: Socket> OutboundConnectionHandler<S> {
296297
global_bandwidth,
297298
ledbat_min_ssthresh,
298299
expected_non_gateway: expected_non_gateway.clone(),
299-
gw_rate_limiter: GatewayConnectionRateLimiter::new(time_source.clone()),
300+
// In simulation the admission ramp must advance with the network's
301+
// VirtualTime, not only the RealTime source this handler is built with
302+
// (see `GatewayConnectionRateLimiter::effective_elapsed_nanos`).
303+
// `get_socket_virtual_time` returns `Some` only when a SimNetwork has
304+
// registered a VirtualTime for this address, and `None` in production
305+
// (where the ramp stays on real wall-clock exactly as before).
306+
gw_rate_limiter: GatewayConnectionRateLimiter::new(
307+
time_source.clone(),
308+
get_socket_virtual_time(&socket_addr),
309+
),
300310
time_source,
301311
congestion_config: Some(congestion_config.unwrap_or_default()),
302312
};
@@ -495,7 +505,9 @@ impl<S: Socket> OutboundConnectionHandler<S, crate::simulation::VirtualTime> {
495505
global_bandwidth,
496506
ledbat_min_ssthresh,
497507
expected_non_gateway: expected_non_gateway.clone(),
498-
gw_rate_limiter: GatewayConnectionRateLimiter::new(time_source.clone()),
508+
// This listener's `time_source` is already VirtualTime, so the admission
509+
// clock needs no override.
510+
gw_rate_limiter: GatewayConnectionRateLimiter::new(time_source.clone(), None),
499511
time_source,
500512
congestion_config,
501513
};
@@ -1124,32 +1136,74 @@ impl<S, T: TimeSource> ConnectionStateManager<S, T> {
11241136
/// simultaneously. Uses a 1-second tumbling window to count accepted connections
11251137
/// and compares against a rate that ramps up over time since gateway start.
11261138
struct GatewayConnectionRateLimiter<T: TimeSource> {
1127-
/// Nanos timestamp when the gateway started listening.
1128-
start_nanos: u64,
1129-
/// Nanos timestamp of the current 1-second window start.
1130-
window_start_nanos: u64,
1139+
/// Real-clock timestamp (`time_source`) captured when the gateway started listening.
1140+
real_start_nanos: u64,
1141+
/// Simulation `VirtualTime` timestamp at gateway start, when a sim override clock is present.
1142+
virtual_start_nanos: Option<u64>,
1143+
/// Start of the current 1-second window, in effective-elapsed nanos
1144+
/// (see [`Self::effective_elapsed_nanos`]).
1145+
window_start_effective_nanos: u64,
11311146
/// Number of connections accepted in the current window.
11321147
window_count: u64,
11331148
time_source: T,
1149+
/// Simulation override clock. See [`Self::effective_elapsed_nanos`] for why the
1150+
/// admission ramp uses `max(real_elapsed, virtual_elapsed)` rather than either clock alone.
1151+
///
1152+
/// `Some` only in simulation (a `VirtualTime` is registered for this socket's
1153+
/// network — note `VirtualTime is now always enabled` for SimNetworks). `None` in
1154+
/// production, where the real `time_source` is the only and correct admission clock.
1155+
virtual_clock: Option<VirtualTime>,
11341156
}
11351157

11361158
impl<T: TimeSource> GatewayConnectionRateLimiter<T> {
1137-
fn new(time_source: T) -> Self {
1138-
let now = time_source.now_nanos();
1159+
fn new(time_source: T, virtual_clock: Option<VirtualTime>) -> Self {
1160+
let real_start_nanos = time_source.now_nanos();
1161+
let virtual_start_nanos = virtual_clock.as_ref().map(|v| v.now_nanos());
11391162
Self {
1140-
start_nanos: now,
1141-
window_start_nanos: now,
1163+
real_start_nanos,
1164+
virtual_start_nanos,
1165+
window_start_effective_nanos: 0,
11421166
window_count: 0,
11431167
time_source,
1168+
virtual_clock,
11441169
}
11451170
}
11461171

1147-
/// Returns the maximum connections/second allowed based on time since gateway start.
1148-
fn current_rate_limit(&self) -> Option<u64> {
1149-
let elapsed = self
1172+
/// Nanoseconds of *simulation progress* since gateway start, defined as
1173+
/// `max(real_elapsed, virtual_elapsed)`.
1174+
///
1175+
/// The admission ramp (5/s for 30s, 20/s to 2min, then unlimited) must advance with
1176+
/// whatever clock the simulation is actually progressing on, and the two SimNetwork
1177+
/// execution modes use different clocks:
1178+
/// - **Virtual-time-driven** (e.g. `test_thundering_herd_connect_storm` advances
1179+
/// `VirtualTime` explicitly while spending only ~10ms real per 100ms virtual): the
1180+
/// ramp must follow the virtual clock or it never progresses, leaving admission
1181+
/// pinned at the throttled initial rate and nondeterministic in real-scheduler order
1182+
/// (the root cause of that test's flakiness).
1183+
/// - **Real-time-driven** (the virtual clock is not advanced): the ramp must follow the
1184+
/// real clock or the 1-second window never resets and the gateway permanently blocks
1185+
/// new peers after the first burst, so the network never forms.
1186+
///
1187+
/// `max()` advances at least as fast as either clock alone, so it is correct for both
1188+
/// modes and is **never more restrictive than the pre-existing real-clock behavior**
1189+
/// (it can only let the ramp complete sooner, never later). Both elapsed terms are
1190+
/// monotonic, so their max is monotonic. In production `virtual_clock` is `None`, so
1191+
/// this is exactly real-clock elapsed and behavior is unchanged.
1192+
fn effective_elapsed_nanos(&self) -> u64 {
1193+
let real_elapsed = self
11501194
.time_source
11511195
.now_nanos()
1152-
.saturating_sub(self.start_nanos);
1196+
.saturating_sub(self.real_start_nanos);
1197+
let virtual_elapsed = match (&self.virtual_clock, self.virtual_start_nanos) {
1198+
(Some(virtual_time), Some(start)) => virtual_time.now_nanos().saturating_sub(start),
1199+
_ => 0,
1200+
};
1201+
real_elapsed.max(virtual_elapsed)
1202+
}
1203+
1204+
/// Returns the maximum connections/second allowed based on time since gateway start.
1205+
fn current_rate_limit(&self) -> Option<u64> {
1206+
let elapsed = self.effective_elapsed_nanos();
11531207
if elapsed < GW_RAMP_PHASE1_DURATION.as_nanos() as u64 {
11541208
Some(GW_RAMP_PHASE1_RATE)
11551209
} else if elapsed < GW_RAMP_PHASE2_DURATION.as_nanos() as u64 {
@@ -1165,12 +1219,12 @@ impl<T: TimeSource> GatewayConnectionRateLimiter<T> {
11651219
return true; // Unlimited phase
11661220
};
11671221

1168-
let now = self.time_source.now_nanos();
1169-
let window_elapsed = now.saturating_sub(self.window_start_nanos);
1222+
let now = self.effective_elapsed_nanos();
1223+
let window_elapsed = now.saturating_sub(self.window_start_effective_nanos);
11701224

11711225
// Reset window every second
11721226
if window_elapsed >= Duration::from_secs(1).as_nanos() as u64 {
1173-
self.window_start_nanos = now;
1227+
self.window_start_effective_nanos = now;
11741228
self.window_count = 0;
11751229
}
11761230

@@ -1444,7 +1498,11 @@ impl<S: Socket, T: TimeSource> UdpPacketsListener<S, T> {
14441498
continue;
14451499
}
14461500

1447-
// Rate limit intro attempts (per-IP)
1501+
// Rate limit intro attempts (per-IP).
1502+
// NOTE: this per-IP throttle is still measured on the real
1503+
// wall-clock `time_source`, unlike the gateway admission ramp
1504+
// below which uses the simulation clock. It is deliberately left
1505+
// on RealTime here (out of scope for the #4383 fix); see #4384.
14481506
if self.connections.is_rate_limited(&remote_addr) {
14491507
tracing::trace!(peer_addr = %remote_addr, "Rate limiting gateway intro attempt");
14501508
continue;
@@ -3209,7 +3267,8 @@ mod version_cmp {
32093267
use std::time::Duration;
32103268

32113269
let time = VirtualTime::new();
3212-
let mut limiter = GatewayConnectionRateLimiter::new(time.clone());
3270+
// `time_source` is itself VirtualTime here, so no separate override is needed.
3271+
let mut limiter = GatewayConnectionRateLimiter::new(time.clone(), None);
32133272

32143273
// Phase 1: should allow GW_RAMP_PHASE1_RATE connections per second
32153274
for _ in 0..GW_RAMP_PHASE1_RATE {
@@ -3260,6 +3319,95 @@ mod version_cmp {
32603319
assert!(limiter.try_accept(), "Should accept unlimited in phase 3");
32613320
}
32623321
}
3322+
3323+
/// Regression test for the gateway admission clock used by
3324+
/// `test_thundering_herd_connect_storm` and every other SimNetwork test.
3325+
///
3326+
/// In a SimNetwork the connection handler is built with a `RealTime` source
3327+
/// (`config_listener`) plus a simulation `VirtualTime` override. The admission
3328+
/// ramp must advance on `max(real_elapsed, virtual_elapsed)` so it is correct in
3329+
/// both execution modes:
3330+
/// - **Virtual-time-driven** (the thundering-herd test advances `VirtualTime`
3331+
/// while real time barely moves): the ramp must follow the virtual clock or it
3332+
/// never progresses and admission stalls nondeterministically.
3333+
/// - **Real-time-driven** (the virtual clock is never advanced): the ramp must
3334+
/// follow the real clock or the 1-second window never resets and the gateway
3335+
/// blocks every peer after the first burst, so the network never forms.
3336+
///
3337+
/// The two halves below drive exactly one clock each (using two independently
3338+
/// controllable `VirtualTime` instances, one standing in for the real source).
3339+
/// A revert to either single clock fails one half: pure-virtual fails the
3340+
/// real-driven half, pure-real fails the virtual-driven half.
3341+
#[test]
3342+
fn test_gateway_rate_limiter_admission_clock_is_max_of_real_and_virtual() {
3343+
use super::{GW_RAMP_PHASE1_RATE, GW_RAMP_PHASE2_RATE, GatewayConnectionRateLimiter};
3344+
use crate::simulation::VirtualTime;
3345+
use std::time::Duration;
3346+
3347+
// Drives the ramp through phase 1 -> reset -> phase 2 by advancing only the
3348+
// named clock, asserting the OTHER clock staying frozen does not matter.
3349+
fn drive(advance: impl Fn(&mut GatewayConnectionRateLimiter<VirtualTime>)) {
3350+
let real = VirtualTime::new();
3351+
let virtual_clock = VirtualTime::new();
3352+
let mut limiter = GatewayConnectionRateLimiter::new(real, Some(virtual_clock));
3353+
// Phase 1: exactly GW_RAMP_PHASE1_RATE admits, then throttled.
3354+
for _ in 0..GW_RAMP_PHASE1_RATE {
3355+
assert!(limiter.try_accept(), "phase 1 admits within rate");
3356+
}
3357+
assert!(
3358+
!limiter.try_accept(),
3359+
"phase 1 rate exhausted (no clock advanced)"
3360+
);
3361+
advance(&mut limiter);
3362+
}
3363+
3364+
// Helpers to advance one clock of the limiter under test.
3365+
fn adv_virtual(l: &mut GatewayConnectionRateLimiter<VirtualTime>, d: Duration) {
3366+
l.virtual_clock.as_ref().unwrap().advance(d);
3367+
}
3368+
fn adv_real(l: &mut GatewayConnectionRateLimiter<VirtualTime>, d: Duration) {
3369+
l.time_source.advance(d);
3370+
}
3371+
3372+
// Virtual-time-driven: advancing ONLY the virtual clock ramps the limiter.
3373+
drive(|l| {
3374+
adv_virtual(l, Duration::from_millis(1001));
3375+
assert!(l.try_accept(), "virtual-clock advance resets the window");
3376+
adv_virtual(l, Duration::from_secs(31));
3377+
assert_eq!(
3378+
l.current_rate_limit(),
3379+
Some(GW_RAMP_PHASE2_RATE),
3380+
"virtual-clock advance ramps into phase 2"
3381+
);
3382+
});
3383+
3384+
// Real-time-driven (the case that regressed under a virtual-only clock):
3385+
// advancing ONLY the real clock, with the virtual clock frozen, ramps it too.
3386+
drive(|l| {
3387+
adv_real(l, Duration::from_millis(1001));
3388+
assert!(l.try_accept(), "real-clock advance resets the window");
3389+
adv_real(l, Duration::from_secs(31));
3390+
assert_eq!(
3391+
l.current_rate_limit(),
3392+
Some(GW_RAMP_PHASE2_RATE),
3393+
"real-clock advance ramps into phase 2"
3394+
);
3395+
});
3396+
3397+
// Pin max() vs a hypothetical sum: advancing BOTH clocks by 20s leaves
3398+
// effective elapsed at max(20s, 20s) = 20s, still phase 1. A sum would reach
3399+
// 40s and report phase 2, a min would report phase 1 only because one term is
3400+
// frozen — this both-clocks case distinguishes max from sum.
3401+
drive(|l| {
3402+
adv_real(l, Duration::from_secs(20));
3403+
adv_virtual(l, Duration::from_secs(20));
3404+
assert_eq!(
3405+
l.current_rate_limit(),
3406+
Some(GW_RAMP_PHASE1_RATE),
3407+
"effective elapsed is max(20s, 20s) = 20s (phase 1), not the 40s sum"
3408+
);
3409+
});
3410+
}
32633411
}
32643412

32653413
/// Smoke tests pinning the contract the transport layer relies on from

crates/core/src/transport/in_memory_socket.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,17 @@ fn get_network_time_source(network_name: &str) -> Option<VirtualTime> {
121121
NETWORK_TIME_SOURCES.get(network_name).map(|r| r.clone())
122122
}
123123

124+
/// Returns the simulation `VirtualTime` for the network that owns `addr`, if any.
125+
///
126+
/// Used by the transport's gateway admission rate limiter so its ramp-up advances
127+
/// with simulation time instead of real wall-clock (the connection handler is
128+
/// otherwise built with a `RealTime` source). Returns `None` outside simulation
129+
/// (no address/network registered), where real time is the correct clock.
130+
pub(crate) fn get_socket_virtual_time(addr: &SocketAddr) -> Option<VirtualTime> {
131+
let network_name = get_address_network(addr)?;
132+
get_network_time_source(&network_name)
133+
}
134+
124135
/// Result of checking packet delivery through fault injection.
125136
#[derive(Debug)]
126137
pub enum PacketDeliveryDecision {

crates/core/tests/simulation_integration.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6342,9 +6342,11 @@ async fn test_thundering_herd_connect_storm() {
63426342
);
63436343
// Note: The gateway rate limiter (GatewayConnectionRateLimiter) intentionally
63446344
// throttles the thundering herd to 5 connections/sec initially, ramping up over
6345-
// 2 minutes. In simulation, RealTime-based rate limiting means fewer connections
6346-
// complete than the 20 peers attempting to reconnect. This is the desired behavior
6347-
// — the storm is prevented, not just survived.
6345+
// 2 minutes. Its admission ramp advances on the simulation clock
6346+
// (max(real_elapsed, virtual_elapsed); see GatewayConnectionRateLimiter), so the
6347+
// throttle still applies as virtual time advances but fewer connections complete
6348+
// per window than the 20 peers attempting to reconnect. This is the desired
6349+
// behavior: the storm is prevented, not just survived.
63486350

63496351
// 6b: Verify network recovered (the actual regression check for #3207/#3208)
63506352
sim.check_partial_connectivity(Duration::from_secs(20), 0.8)

0 commit comments

Comments
 (0)