Skip to content

Commit 734bb79

Browse files
committed
fix(transport): sample RTT from ping/pong keep-alive cycle
## Problem `TransportMetrics::record_rtt_sample` was only reachable from `record_transfer_completed`, which fires on stream-transfer completion. Connections that exchange only keep-alive / control / small-contract traffic for hours never complete a stream transfer, so they contributed zero RTT samples. The dashboard / telemetry RTT statistics (`min_rtt_us`, `max_rtt_us`, `avg_rtt_us`) under-represented quiet connections. Same class of undercounting as the per-peer SENT/RECV bug fixed in #3996. ## Approach The ping/pong keep-alive cycle already records each Ping's send timestamp in `pending_pings` (sequence -> nanos). The Pong handler already removed the matching entry but discarded the timestamp. Capture it, compute the round-trip against the same `time_source`, and record an RTT sample. The keep-alive cycle runs on every connection regardless of stream traffic, so RTT statistics now stay populated for quiet, long-lived connections. - `record_rtt_sample` widened from private to `pub(crate)` so the connection layer can call it (it was already the shared sink for the stream-completion path). - The `rtt_us > 0` guard mirrors the existing stream-completion path. - The write lock on `pending_pings` is released before recording the sample. This is observation-only: it does not feed congestion control (the shadow-RTT system in #4074 remains the path for that). ## Testing New regression test `pong_records_rtt_sample_from_keepalive_cycle` drives an encrypted Pong through the real `recv()` path after seeding a pending ping, and asserts the global RTT sample counter advances and the ping is consumed. Verified it FAILS without the fix (records zero samples) and PASSES with it. Full transport lib suite (607 tests) green; `cargo clippy --locked -- -D warnings` clean. Closes #4000 [AI-assisted - Claude]
1 parent eda67ac commit 734bb79

2 files changed

Lines changed: 183 additions & 11 deletions

File tree

crates/core/src/transport/metrics.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,13 @@ impl TransportMetrics {
255255
}
256256

257257
/// Record an RTT sample in microseconds.
258-
fn record_rtt_sample(&self, rtt_us: u64) {
258+
///
259+
/// Called both from [`Self::record_transfer_completed`] (LEDBAT
260+
/// `base_delay` at stream completion) and from the ping/pong keep-alive
261+
/// cycle in `peer_connection.rs` (round-trip of a keep-alive Ping). The
262+
/// keep-alive path is what keeps RTT statistics populated for quiet,
263+
/// long-lived connections that rarely complete a stream transfer (#4000).
264+
pub(crate) fn record_rtt_sample(&self, rtt_us: u64) {
259265
self.rtt_sum_us
260266
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
261267
Some(v.saturating_add(rtt_us))
@@ -290,6 +296,16 @@ impl TransportMetrics {
290296
self.cumulative_bytes_sent.load(Ordering::Relaxed)
291297
}
292298

299+
/// Number of RTT samples accumulated in the current snapshot window.
300+
///
301+
/// Reset to 0 by `take_snapshot`. Exposed so tests can assert that a
302+
/// code path (e.g. the ping/pong keep-alive cycle) actually recorded a
303+
/// sample without consuming the snapshot.
304+
#[cfg(test)]
305+
pub(crate) fn rtt_sample_count(&self) -> u32 {
306+
self.rtt_samples.load(Ordering::Relaxed)
307+
}
308+
293309
/// Record a completed inbound stream transfer.
294310
///
295311
/// Aggregates stream-payload bytes into the per-snapshot `bytes_received`

crates/core/src/transport/peer_connection.rs

Lines changed: 166 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -972,16 +972,42 @@ impl<S: super::Socket, T: TimeSource> PeerConnection<S, T> {
972972
pong_sequence = sequence,
973973
"Received Pong, confirming bidirectional liveness"
974974
);
975-
// Remove the corresponding ping from pending set
976-
let mut pending = self.pending_pings.write();
977-
if pending.remove(sequence).is_some() {
978-
tracing::trace!(
979-
target: "freenet_core::transport::keepalive_received",
980-
remote = ?self.remote_conn.remote_addr,
981-
pong_sequence = sequence,
982-
remaining_pending = pending.len(),
983-
"Removed acknowledged ping from pending set"
984-
);
975+
// Remove the corresponding ping from pending set and,
976+
// if found, sample the round-trip time. The keep-alive
977+
// cycle runs on every connection regardless of stream
978+
// traffic, so this keeps RTT statistics populated for
979+
// quiet, long-lived connections that rarely (or never)
980+
// complete a stream transfer (#4000). Without it,
981+
// RTT was only sampled at stream completion and quiet
982+
// connections contributed zero samples.
983+
let removed_send_nanos = {
984+
let mut pending = self.pending_pings.write();
985+
let removed = pending.remove(sequence);
986+
if removed.is_some() {
987+
tracing::trace!(
988+
target: "freenet_core::transport::keepalive_received",
989+
remote = ?self.remote_conn.remote_addr,
990+
pong_sequence = sequence,
991+
remaining_pending = pending.len(),
992+
"Removed acknowledged ping from pending set"
993+
);
994+
}
995+
removed
996+
};
997+
if let Some(send_nanos) = removed_send_nanos {
998+
// The ping timestamp was recorded with this same
999+
// `time_source` (the keep-alive task clones it), so
1000+
// the subtraction is over one monotonic clock.
1001+
// `saturating_sub` is defensive against any skew
1002+
// yielding a 0 sample rather than underflowing.
1003+
let rtt_nanos = self
1004+
.time_source
1005+
.now_nanos()
1006+
.saturating_sub(send_nanos);
1007+
let rtt_us = rtt_nanos / 1_000;
1008+
if rtt_us > 0 {
1009+
super::TRANSPORT_METRICS.record_rtt_sample(rtt_us);
1010+
}
9851011
}
9861012
}
9871013
SymmetricMessagePayload::AckConnection { .. } | SymmetricMessagePayload::ShortMessage { .. } | SymmetricMessagePayload::StreamFragment { .. } => {}
@@ -3032,6 +3058,136 @@ mod tests {
30323058
crate::transport::TRANSPORT_METRICS.remove_peer(remote_addr);
30333059
}
30343060

3061+
/// Regression test for #4000: a Pong received for a pending keep-alive
3062+
/// Ping must record an RTT sample, so quiet connections that never
3063+
/// complete a stream transfer still contribute to the RTT statistics.
3064+
///
3065+
/// Before the fix, the Pong handler removed the matching ping timestamp
3066+
/// from `pending_pings` and discarded it — `record_rtt_sample` was only
3067+
/// reachable from `record_transfer_completed` (stream completion), so a
3068+
/// connection exchanging only keep-alive traffic recorded zero RTT
3069+
/// samples. This test seeds a pending ping, advances the (mock) clock,
3070+
/// feeds an encrypted Pong, and asserts the global RTT sample counter
3071+
/// moved by at least one.
3072+
#[tokio::test]
3073+
async fn pong_records_rtt_sample_from_keepalive_cycle() {
3074+
use crate::transport::crypto::TransportKeypair;
3075+
use crate::transport::packet_data::PacketData;
3076+
use crate::transport::symmetric_message::SymmetricMessagePayload;
3077+
use crate::util::time_source::SharedMockTimeSource;
3078+
use bytes::Bytes;
3079+
3080+
let time_source = SharedMockTimeSource::new();
3081+
let (inbound_tx, inbound_rx) = mpsc::channel(16);
3082+
let remote_addr = SocketAddr::new(Ipv4Addr::new(10, 99, 99, 2).into(), 50002);
3083+
3084+
let mut key = [0u8; 16];
3085+
crate::config::GlobalRng::fill_bytes(&mut key);
3086+
let cipher = Aes128Gcm::new(&key.into());
3087+
let keypair = TransportKeypair::new();
3088+
3089+
let sent_tracker = Arc::new(parking_lot::Mutex::new(
3090+
SentPacketTracker::new_with_time_source(time_source.clone()),
3091+
));
3092+
let congestion_controller =
3093+
crate::transport::congestion_control::CongestionControlConfig::default()
3094+
.build_arc_with_time_source(time_source.clone());
3095+
let token_bucket = Arc::new(TokenBucket::new_with_time_source(
3096+
10_000,
3097+
10_000_000,
3098+
time_source.clone(),
3099+
));
3100+
let socket = Arc::new(TestSocket::new(
3101+
mpsc::channel::<(SocketAddr, Arc<[u8]>)>(16).0,
3102+
));
3103+
3104+
let rolling_rtt_stats = crate::transport::rolling_rtt_stats::RollingRttStatsHandle::new(
3105+
remote_addr,
3106+
time_source.clone(),
3107+
);
3108+
let remote_conn = RemoteConnection {
3109+
outbound_symmetric_key: cipher.clone(),
3110+
remote_addr,
3111+
sent_tracker,
3112+
last_packet_id: Arc::new(AtomicU32::new(0)),
3113+
inbound_packet_recv: inbound_rx,
3114+
inbound_symmetric_key: cipher.clone(),
3115+
inbound_symmetric_key_bytes: key,
3116+
my_address: None,
3117+
transport_secret_key: keypair.secret,
3118+
congestion_controller,
3119+
token_bucket,
3120+
socket,
3121+
global_bandwidth: None,
3122+
rolling_rtt_stats,
3123+
time_source: time_source.clone(),
3124+
};
3125+
3126+
// Build the encrypted Pong (sequence 7) and a trailing ShortMessage.
3127+
// The Pong handler does not make `recv()` return (it produces no
3128+
// message), so the ShortMessage gives the loop a value to yield once
3129+
// the Pong has been processed.
3130+
let pong_seq = 7u64;
3131+
let pong = SymmetricMessage::serialize_msg_to_packet_data(
3132+
1,
3133+
SymmetricMessagePayload::Pong { sequence: pong_seq },
3134+
&cipher,
3135+
vec![],
3136+
)
3137+
.expect("encrypt pong");
3138+
let pong_packet =
3139+
PacketData::<crate::transport::packet_data::UnknownEncryption>::from_buf(pong.data());
3140+
3141+
let short = SymmetricMessage::serialize_msg_to_packet_data(
3142+
2,
3143+
SymmetricMessagePayload::ShortMessage {
3144+
payload: Bytes::from_static(b"done"),
3145+
},
3146+
&cipher,
3147+
vec![],
3148+
)
3149+
.expect("encrypt short");
3150+
let short_packet =
3151+
PacketData::<crate::transport::packet_data::UnknownEncryption>::from_buf(short.data());
3152+
3153+
let mut conn = PeerConnection::new(remote_conn);
3154+
3155+
// Advance past zero before seeding so the ping's send timestamp is
3156+
// strictly positive. The recv loop's stale-ping cleanup retains only
3157+
// pings with `sent_at_nanos > now_nanos - idle_timeout`; at small mock
3158+
// times that threshold saturates to 0, so a ping stamped at exactly 0
3159+
// would be pruned before the Pong arrives. Stamp at 10 ms, then
3160+
// advance another 50 ms so the round-trip is a deterministic 50 ms.
3161+
time_source.advance_time(Duration::from_millis(10));
3162+
let send_nanos = time_source.now_nanos();
3163+
conn.pending_pings.write().insert(pong_seq, send_nanos);
3164+
time_source.advance_time(Duration::from_millis(50));
3165+
3166+
let samples_before = crate::transport::TRANSPORT_METRICS.rtt_sample_count();
3167+
3168+
inbound_tx.send(pong_packet).await.expect("send pong");
3169+
inbound_tx.send(short_packet).await.expect("send short");
3170+
3171+
let _msg = conn.recv().await.expect("recv");
3172+
3173+
let samples_after = crate::transport::TRANSPORT_METRICS.rtt_sample_count();
3174+
assert!(
3175+
samples_after >= samples_before + 1,
3176+
"Pong for a pending ping must record at least one RTT sample \
3177+
(before={samples_before}, after={samples_after}). \
3178+
The keep-alive RTT path (#4000) is not wired up."
3179+
);
3180+
3181+
// The matching ping must have been consumed so it isn't sampled twice
3182+
// or treated as still-unanswered by the keep-alive backoff.
3183+
assert!(
3184+
!conn.pending_pings.read().contains_key(&pong_seq),
3185+
"Pong must remove the matching pending ping"
3186+
);
3187+
3188+
crate::transport::TRANSPORT_METRICS.remove_peer(remote_addr);
3189+
}
3190+
30353191
/// Regression test for #4079.
30363192
///
30373193
/// Reproduces both leak scenarios called out in the issue:

0 commit comments

Comments
 (0)