diff --git a/Cargo.lock b/Cargo.lock index b0fb072f47..974b5f214d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2426,6 +2426,7 @@ dependencies = [ "kaspa-addressmanager", "kaspa-core", "kaspa-p2p-lib", + "kaspa-perigeemanager", "kaspa-utils", "log", "parking_lot", @@ -2996,6 +2997,7 @@ dependencies = [ "kaspa-notify", "kaspa-p2p-lib", "kaspa-p2p-mining", + "kaspa-perigeemanager", "kaspa-utils", "kaspa-utils-tower", "log", @@ -3067,6 +3069,22 @@ dependencies = [ "workflow-perf-monitor", ] +[[package]] +name = "kaspa-perigeemanager" +version = "1.1.0-rc.2" +dependencies = [ + "itertools 0.13.0", + "kaspa-consensus-core", + "kaspa-core", + "kaspa-hashes", + "kaspa-p2p-lib", + "kaspa-utils", + "log", + "parking_lot", + "rand 0.8.5", + "uuid 1.18.1", +] + [[package]] name = "kaspa-pow" version = "1.1.0-rc.2" @@ -3770,6 +3788,7 @@ dependencies = [ "kaspa-addresses", "kaspa-addressmanager", "kaspa-alloc", + "kaspa-connectionmanager", "kaspa-consensus", "kaspa-consensus-core", "kaspa-consensus-notify", @@ -3785,6 +3804,7 @@ dependencies = [ "kaspa-p2p-lib", "kaspa-p2p-mining", "kaspa-perf-monitor", + "kaspa-perigeemanager", "kaspa-rpc-core", "kaspa-rpc-service", "kaspa-txscript", diff --git a/Cargo.toml b/Cargo.toml index 65503b6ce7..aa9810ce4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,6 +115,7 @@ kaspa-p2p-flows = { version = "1.1.0-rc.2", path = "protocol/flows" } kaspa-p2p-lib = { version = "1.1.0-rc.2", path = "protocol/p2p" } kaspa-p2p-mining = { version = "1.1.0-rc.2", path = "protocol/mining" } kaspa-perf-monitor = { version = "1.1.0-rc.2", path = "metrics/perf_monitor" } +kaspa-perigeemanager = {version = "1.1.0-rc.2", path = "components/perigeemanager" } kaspa-pow = { version = "1.1.0-rc.2", path = "consensus/pow" } kaspa-rpc-core = { version = "1.1.0-rc.2", path = "rpc/core" } kaspa-rpc-macros = { version = "1.1.0-rc.2", path = "rpc/macros" } diff --git a/components/addressmanager/src/lib.rs b/components/addressmanager/src/lib.rs index 3e7dc3d2af..1ab13ed87a 100644 --- a/components/addressmanager/src/lib.rs +++ b/components/addressmanager/src/lib.rs @@ -290,8 +290,12 @@ impl AddressManager { self.address_store.iterate_addresses() } - pub fn iterate_prioritized_random_addresses(&self, exceptions: HashSet) -> impl ExactSizeIterator { - self.address_store.iterate_prioritized_random_addresses(exceptions) + pub fn iterate_prioritized_random_addresses( + &self, + priorities: Vec, + exceptions: HashSet, + ) -> impl ExactSizeIterator { + self.address_store.iterate_prioritized_random_addresses(priorities, exceptions) } pub fn ban(&mut self, ip: IpAddress) { @@ -325,6 +329,18 @@ impl AddressManager { pub fn get_all_banned_addresses(&self) -> Vec { self.banned_address_store.iterator().map(|x| IpAddress::from(x.unwrap().0)).collect_vec() } + + pub fn set_new_perigee_addresses(&mut self, addresses: Vec) { + self.address_store.set_new_perigee_addresses(addresses); + } + + pub fn get_perigee_addresses(&mut self) -> Vec { + self.address_store.get_perigee_addresses().unwrap() + } + + pub fn reset_perigee_data(&mut self) { + self.address_store.reset_perigee_data(); + } } mod address_store_with_cache { @@ -346,7 +362,7 @@ mod address_store_with_cache { use crate::{ stores::{ - address_store::{AddressesStore, DbAddressesStore, Entry}, + address_store::{AddressesStore, AddressesStoreReader, DbAddressesStore, Entry}, AddressKey, }, NetAddress, MAX_ADDRESSES, MAX_CONNECTION_FAILED_COUNT, @@ -369,6 +385,18 @@ mod address_store_with_cache { Self { db_store, addresses } } + pub fn set_new_perigee_addresses(&mut self, addresses: Vec) { + self.db_store.set_new_perigee_addresses(addresses).unwrap(); + } + + pub fn get_perigee_addresses(&mut self) -> Option> { + self.db_store.get_perigee_addresses().ok() + } + + pub fn reset_perigee_data(&mut self) { + self.db_store.reset_perigee_data().unwrap(); + } + pub fn has(&mut self, address: NetAddress) -> bool { self.addresses.contains_key(&address.into()) } @@ -427,9 +455,11 @@ mod address_store_with_cache { ///``` pub fn iterate_prioritized_random_addresses( &self, + priorities: Vec, exceptions: HashSet, ) -> impl ExactSizeIterator { - let exceptions: HashSet = exceptions.into_iter().map(|addr| addr.into()).collect(); + let exceptions: HashSet = + exceptions.into_iter().chain(priorities.iter().cloned()).map(|addr| addr.into()).collect(); let mut prefix_counter: HashMap = HashMap::new(); let (mut weights, filtered_addresses): (Vec, Vec) = self .addresses @@ -447,7 +477,7 @@ mod address_store_with_cache { *weights.get_mut(i).unwrap() /= *prefix_counter.get(&address.prefix_bucket()).unwrap() as f64; } - RandomWeightedIterator::new(weights, filtered_addresses) + RandomWeightedIteratorWithPriorities::new(weights, filtered_addresses, priorities) } pub fn remove_by_ip(&mut self, ip: IpAddr) { @@ -461,30 +491,34 @@ mod address_store_with_cache { Store::new(db) } - pub struct RandomWeightedIterator { + pub struct RandomWeightedIteratorWithPriorities { weighted_index: Option>, remaining: usize, addresses: Vec, + priorities: Vec, } - impl RandomWeightedIterator { - pub fn new(weights: Vec, addresses: Vec) -> Self { + impl RandomWeightedIteratorWithPriorities { + pub fn new(weights: Vec, addresses: Vec, priorities: Vec) -> Self { assert_eq!(weights.len(), addresses.len()); - let remaining = weights.iter().filter(|&&w| w > 0.0).count(); + let remaining = weights.iter().filter(|&&w| w > 0.0).count() + priorities.len(); let weighted_index = match WeightedIndex::new(weights) { Ok(index) => Some(index), Err(WeightedError::NoItem) => None, Err(e) => panic!("{e}"), }; - Self { weighted_index, remaining, addresses } + Self { weighted_index, remaining, addresses, priorities: priorities.into_iter().rev().collect() } } } - impl Iterator for RandomWeightedIterator { + impl Iterator for RandomWeightedIteratorWithPriorities { type Item = NetAddress; fn next(&mut self) -> Option { - if let Some(weighted_index) = self.weighted_index.as_mut() { + if !self.priorities.is_empty() { + self.remaining -= 1; + Some(self.priorities.pop().unwrap()) + } else if let Some(weighted_index) = self.weighted_index.as_mut() { let i = weighted_index.sample(&mut rand::thread_rng()); // Zero the selected address entry match weighted_index.update_weights(&[(i, &0f64)]) { @@ -507,7 +541,7 @@ mod address_store_with_cache { } } - impl ExactSizeIterator for RandomWeightedIterator {} + impl ExactSizeIterator for RandomWeightedIteratorWithPriorities {} #[cfg(test)] mod tests { @@ -526,11 +560,11 @@ mod address_store_with_cache { #[test] fn test_weighted_iterator() { let address = NetAddress::new(IpAddr::V6(Ipv6Addr::LOCALHOST).into(), 1); - let iter = RandomWeightedIterator::new(vec![0.2, 0.3, 0.0], vec![address, address, address]); + let iter = RandomWeightedIteratorWithPriorities::new(vec![0.2, 0.3, 0.0], vec![address, address, address], vec![]); assert_eq!(iter.len(), 2); assert_eq!(iter.count(), 2); - let iter = RandomWeightedIterator::new(vec![], vec![]); + let iter = RandomWeightedIteratorWithPriorities::new(vec![], vec![], vec![]); assert_eq!(iter.len(), 0); assert_eq!(iter.count(), 0); } @@ -603,7 +637,7 @@ mod address_store_with_cache { // The weight sampled expected uniform distribution let prioritized_address_distribution = am .lock() - .iterate_prioritized_random_addresses(HashSet::new()) + .iterate_prioritized_random_addresses(vec![], HashSet::new()) .take(num_of_buckets) .map(|addr| addr.prefix_bucket().as_u64() as f64) .collect_vec(); @@ -624,5 +658,32 @@ mod address_store_with_cache { ); assert!(adjusted_p <= significance); } + + #[test] + fn test_perigee_data() { + let db = create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let config = Config::new(SIMNET_PARAMS); + let (am, _) = AddressManager::new(Arc::new(config), db.1, Arc::new(TickService::default())); + + let mut am_guard = am.lock(); + + let perigee_addresses: Vec = (0..10) + .map(|i| { + NetAddress::new( + IpAddress::from_str(&format!("{}.{}.{}.{}", i, i, i, i)).unwrap(), + SIMNET_PARAMS.default_p2p_port(), + ) + }) + .collect(); + am_guard.set_new_perigee_addresses(perigee_addresses.clone()); + + let fetched_perigee_addresses = am_guard.get_perigee_addresses(); + assert_eq!(fetched_perigee_addresses, perigee_addresses); + + am_guard.reset_perigee_data(); + + let fetched_perigee_addresses_after_reset = am_guard.get_perigee_addresses(); + assert!(fetched_perigee_addresses_after_reset.is_empty()); + } } } diff --git a/components/addressmanager/src/stores/address_store.rs b/components/addressmanager/src/stores/address_store.rs index fe4ddb244b..9ea3f71201 100644 --- a/components/addressmanager/src/stores/address_store.rs +++ b/components/addressmanager/src/stores/address_store.rs @@ -1,13 +1,11 @@ use kaspa_database::{ - prelude::DB, - prelude::{CachePolicy, StoreError, StoreResult}, - prelude::{CachedDbAccess, DirectDbWriter}, + prelude::{CachePolicy, CachedDbAccess, DirectDbWriter, StoreError, StoreResult, DB}, registry::DatabaseStorePrefixes, }; use kaspa_utils::mem_size::MemSizeEstimator; use serde::{Deserialize, Serialize}; -use std::net::Ipv6Addr; use std::{error::Error, fmt::Display, sync::Arc}; +use std::{mem, net::Ipv6Addr}; use super::AddressKey; use crate::NetAddress; @@ -20,16 +18,29 @@ pub struct Entry { impl MemSizeEstimator for Entry {} +/// Address entry for persisted leveraged perigee addresses +/// the rank indicates the quality of the address (lower is better) +#[derive(Clone, Copy, Serialize, Deserialize)] +pub struct PerigeeEntry { + pub rank: u16, + pub address: NetAddress, +} + +impl MemSizeEstimator for PerigeeEntry {} + pub trait AddressesStoreReader { #[allow(dead_code)] fn get(&self, key: AddressKey) -> Result; + fn get_perigee_addresses(&self) -> Result, StoreError>; } pub trait AddressesStore: AddressesStoreReader { fn set(&mut self, key: AddressKey, entry: Entry) -> StoreResult<()>; + fn set_new_perigee_addresses(&mut self, entries: Vec) -> StoreResult<()>; #[allow(dead_code)] fn set_failed_count(&mut self, key: AddressKey, connection_failed_count: u64) -> StoreResult<()>; fn remove(&mut self, key: AddressKey) -> StoreResult<()>; + fn reset_perigee_data(&mut self) -> StoreResult<()>; } const IPV6_LEN: usize = 16; @@ -72,15 +83,55 @@ impl From for AddressKey { } } +impl From for DbAddressKey { + fn from(address: NetAddress) -> Self { + AddressKey::from(address).into() + } +} + +#[derive(Eq, Hash, PartialEq, Debug, Copy, Clone)] +struct DbPerigeeRankedAddressKey([u8; mem::size_of::() + ADDRESS_KEY_SIZE]); + +impl From for DbPerigeeRankedAddressKey { + fn from(perigee_entry: PerigeeEntry) -> Self { + let mut bytes = [0; mem::size_of::()]; + bytes[..mem::size_of::()].copy_from_slice(&perigee_entry.rank.to_be_bytes()); // big-endian for lexicographic ordering in rocks db, it is important + bytes[mem::size_of::()..].copy_from_slice(DbAddressKey::from(perigee_entry.address).as_ref()); + Self(bytes) + } +} + +impl From for PerigeeEntry { + fn from(db_key: DbPerigeeRankedAddressKey) -> Self { + let rank_byte_array = db_key.0[..mem::size_of::()].try_into().unwrap(); + let rank = u16::from_le_bytes(rank_byte_array); + let address_key_bytes: [u8; ADDRESS_KEY_SIZE] = db_key.0[mem::size_of::()..].try_into().unwrap(); + let address_key = DbAddressKey(address_key_bytes); + let address = AddressKey::from(address_key).into(); + PerigeeEntry { rank, address } + } +} + +impl AsRef<[u8]> for DbPerigeeRankedAddressKey { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + #[derive(Clone)] pub struct DbAddressesStore { db: Arc, access: CachedDbAccess, + perigee_access: CachedDbAccess, } impl DbAddressesStore { pub fn new(db: Arc, cache_policy: CachePolicy) -> Self { - Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_policy, DatabaseStorePrefixes::Addresses.into()) } + Self { + db: Arc::clone(&db), + access: CachedDbAccess::new(db.clone(), cache_policy, DatabaseStorePrefixes::Addresses.into()), + perigee_access: CachedDbAccess::new(db.clone(), cache_policy, DatabaseStorePrefixes::PerigeeAddresses.into()), + } } pub fn iterator(&self) -> impl Iterator>> + '_ { @@ -102,6 +153,16 @@ impl AddressesStoreReader for DbAddressesStore { fn get(&self, key: AddressKey) -> Result { self.access.read(key.into()) } + + /// Get persisted leveraged perigee addresses, ordered by their ascending perigee rank (low is better). + fn get_perigee_addresses(&self) -> StoreResult> { + self.perigee_access + .iterator() + .map(|res| { + res.map(|(_, perigee_entry)| perigee_entry.address).map_err(|err| StoreError::DataInconsistency(err.to_string())) + }) + .collect::>>() + } } impl AddressesStore for DbAddressesStore { @@ -109,6 +170,23 @@ impl AddressesStore for DbAddressesStore { self.access.write(DirectDbWriter::new(&self.db), key.into(), entry) } + /// Replaces all existing persisted perigee addresses with the given set of addresses. + /// note: the order of the given addresses determines their perigee rank (low is better). + /// this is important for order of retrieval of leveraged perigee addresses between restarts. + /// in cases where the number of required addresses decreases, only the top addresses are chosen. + fn set_new_perigee_addresses(&mut self, entries: Vec) -> StoreResult<()> { + // First, clear existing perigee addresses + self.perigee_access.delete_all(DirectDbWriter::new(&self.db))?; + + let mut key_iter = entries.iter().enumerate().map(|(rank, address)| { + let perigee_entry = PerigeeEntry { rank: rank as u16, address: *address }; + let db_key = DbPerigeeRankedAddressKey::from(perigee_entry); + (db_key, perigee_entry) + }); + + self.perigee_access.write_many(DirectDbWriter::new(&self.db), &mut key_iter) + } + fn remove(&mut self, key: AddressKey) -> StoreResult<()> { self.access.delete(DirectDbWriter::new(&self.db), key.into()) } @@ -117,4 +195,8 @@ impl AddressesStore for DbAddressesStore { let entry = self.get(key)?; self.set(key, Entry { connection_failed_count, address: entry.address }) } + + fn reset_perigee_data(&mut self) -> StoreResult<()> { + self.perigee_access.delete_all(DirectDbWriter::new(&self.db)) + } } diff --git a/components/addressmanager/src/stores/mod.rs b/components/addressmanager/src/stores/mod.rs index 11721d2ce7..0cd50077a8 100644 --- a/components/addressmanager/src/stores/mod.rs +++ b/components/addressmanager/src/stores/mod.rs @@ -32,3 +32,10 @@ impl From for AddressKey { ) } } + +impl From for NetAddress { + fn from(value: AddressKey) -> Self { + let ip = if let Some(ipv4) = value.0.to_ipv4() { IpAddr::V4(ipv4) } else { IpAddr::V6(value.0) }; + NetAddress { ip: ip.into(), port: value.1 } + } +} diff --git a/components/connectionmanager/Cargo.toml b/components/connectionmanager/Cargo.toml index 8fd76303fe..68aea172b9 100644 --- a/components/connectionmanager/Cargo.toml +++ b/components/connectionmanager/Cargo.toml @@ -14,6 +14,7 @@ duration-string.workspace = true futures-util.workspace = true itertools.workspace = true kaspa-addressmanager.workspace = true +kaspa-perigeemanager.workspace = true kaspa-core.workspace = true kaspa-p2p-lib.workspace = true kaspa-utils.workspace = true diff --git a/components/connectionmanager/src/lib.rs b/components/connectionmanager/src/lib.rs index 2146ec62d1..1ef8ab0622 100644 --- a/components/connectionmanager/src/lib.rs +++ b/components/connectionmanager/src/lib.rs @@ -2,7 +2,10 @@ use std::{ cmp::min, collections::{HashMap, HashSet}, net::{IpAddr, SocketAddr, ToSocketAddrs}, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::{Duration, SystemTime}, }; @@ -11,29 +14,35 @@ use futures_util::future::{join_all, try_join_all}; use itertools::Itertools; use kaspa_addressmanager::{AddressManager, NetAddress}; use kaspa_core::{debug, info, warn}; -use kaspa_p2p_lib::{common::ProtocolError, ConnectionError, Peer}; +use kaspa_p2p_lib::{common::ProtocolError, ConnectionError, Peer, PeerKey, PeerOutboundType}; +use kaspa_perigeemanager::{PerigeeConfig, PerigeeManager}; use kaspa_utils::triggers::SingleTrigger; use parking_lot::Mutex as ParkingLotMutex; -use rand::{seq::SliceRandom, thread_rng}; -use tokio::{ - select, - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Mutex as TokioMutex, - }, - time::{interval, MissedTickBehavior}, +use rand::{ + seq::{IteratorRandom, SliceRandom}, + thread_rng, }; +use tokio::{select, sync::Mutex as TokioMutex, task::JoinHandle, try_join}; + +pub const EVENT_LOOP_TIMER: Duration = Duration::from_secs(30); + +pub enum ConnectionManagerEvent { + Tick(usize), + AddPeer, +} pub struct ConnectionManager { p2p_adaptor: Arc, - outbound_target: usize, + random_graph_target: usize, inbound_limit: usize, dns_seeders: &'static [&'static str], default_port: u16, address_manager: Arc>, - connection_requests: TokioMutex>, - force_next_iteration: UnboundedSender<()>, + connection_requests: Arc>>, shutdown_signal: SingleTrigger, + tick_counter: Arc, + perigee_manager: Option>>, + perigee_config: Option, } #[derive(Clone, Debug)] @@ -52,40 +61,57 @@ impl ConnectionRequest { impl ConnectionManager { pub fn new( p2p_adaptor: Arc, - outbound_target: usize, + random_graph_target: usize, + + // perigee parameters + perigee_manager: Option>>, inbound_limit: usize, dns_seeders: &'static [&'static str], default_port: u16, address_manager: Arc>, ) -> Arc { - let (tx, rx) = unbounded_channel::<()>(); + let perigee_config = perigee_manager.as_ref().map(|pm| pm.clone().lock().config()); let manager = Arc::new(Self { p2p_adaptor, - outbound_target, + random_graph_target, inbound_limit, address_manager, - connection_requests: Default::default(), - force_next_iteration: tx, + connection_requests: Arc::new(Default::default()), shutdown_signal: SingleTrigger::new(), + tick_counter: Arc::new(AtomicUsize::new(0)), dns_seeders, default_port, + perigee_config, + perigee_manager, }); - manager.clone().start_event_loop(rx); - manager.force_next_iteration.send(()).unwrap(); + + manager.clone().start_event_loop(); manager } - fn start_event_loop(self: Arc, mut rx: UnboundedReceiver<()>) { - let mut ticker = interval(Duration::from_secs(30)); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + fn start_event_loop(self: Arc) { tokio::spawn(async move { + let mut next_tick = tokio::time::Instant::now(); + loop { if self.shutdown_signal.trigger.is_triggered() { break; } select! { - _ = rx.recv() => self.clone().handle_event().await, - _ = ticker.tick() => self.clone().handle_event().await, + _ = tokio::time::sleep_until(next_tick) => { + debug!("Connection manager handling connections"); + + let tick_start = tokio::time::Instant::now(); + + self.clone().handle_event(ConnectionManagerEvent::Tick( + self.tick_counter.fetch_add(1, Ordering::SeqCst) + )).await; + + // Calculate next tick deadline + next_tick = tick_start + EVENT_LOOP_TIMER; + debug!("Connection manager event loop tick completed in {}", DurationString::from(tokio::time::Instant::now().duration_since(tick_start))); + debug!("Next connection manager event loop tick scheduled in {}", DurationString::from(next_tick.duration_since(tokio::time::Instant::now()))); + }, _ = self.shutdown_signal.listener.clone() => break, } } @@ -93,27 +119,168 @@ impl ConnectionManager { }); } - async fn handle_event(self: Arc) { - debug!("Starting connection loop iteration"); - let peers = self.p2p_adaptor.active_peers(); - let peer_by_address: HashMap = peers.into_iter().map(|peer| (peer.net_address(), peer)).collect(); + fn spawn_initiate_perigee(self: &Arc, peer_by_address: &HashMap) -> JoinHandle<()> { + let cmgr = self.clone(); + let peer_by_address = peer_by_address.clone(); + tokio::spawn(async move { cmgr.initiate_perigee(&peer_by_address) }) + } + + fn initiate_perigee(self: &Arc, peer_by_address: &HashMap) { + // Initiate the perigee manager with persisted peers. + let perigee_manager = self.perigee_manager.as_ref().unwrap(); + let mut perigee_manager_guard = perigee_manager.lock(); + let init = self.address_manager.lock().get_perigee_addresses(); + info!( + "Connection manager: Initiating perigee with db persisted peers: {:?}", + init.iter() + .map(|addr| SocketAddr::new(addr.ip.into(), addr.port).to_string()) + .collect::>() + .join(", ") + .trim_end_matches(", "), + ); + perigee_manager_guard.set_initial_persistent_peers( + init.iter() + .filter_map(|addr| peer_by_address.get(&SocketAddr::new(addr.ip.into(), addr.port))) + .map(|p| p.key()) + .take(self.perigee_config.as_ref().unwrap().perigee_outbound_target) + .collect(), + ); + } + + async fn evaluate_perigee_round(self: &Arc, peer_by_address: Arc>) -> HashSet { + debug!("Evaluating perigee round..."); + let (to_leverage, to_evict, has_leveraged_changed) = { + let mut perigee_manager_guard = self.perigee_manager.as_ref().unwrap().lock(); + if perigee_manager_guard.config().statistics { + perigee_manager_guard.log_statistics(&peer_by_address); + } + + perigee_manager_guard.evaluate_round(&peer_by_address) + }; + + // save leveraged peers to db if persistence is enabled and there was a change in leveraged peers. + if has_leveraged_changed && self.perigee_config.as_ref().unwrap().persistence { + let am = &mut self.address_manager.lock(); + + // Update persisted perigee addresses + am.set_new_perigee_addresses(to_leverage.iter().map(|pk| pk.sock_addr().into()).collect()); + } + + // Log the results of the perigee round + if has_leveraged_changed { + info!( + "Connection manager: Leveraging perigee peers \n {}", + to_leverage.iter().map(|pk| pk.sock_addr().to_string()).collect::>().join(", ").trim_end_matches(", "), + ); + } else { + debug!("Connection manager: No changes in leveraged perigee peers"); + } + if !to_evict.is_empty() { + info!( + "Connection manager: Evicting perigee peers: {}", + to_evict.iter().map(|pk| pk.sock_addr().to_string()).collect::>().join(", ").trim_end_matches(", "), + ); + } else { + debug!("Connection manager: No perigee peers to evict"); + } + + to_evict + } + + async fn reset_perigee_round(self: &Arc) { + if let Some(perigee_manager) = &self.perigee_manager { + let mut perigee_manager_guard = perigee_manager.lock(); + perigee_manager_guard.start_new_round(); + } + // This causes potentially a minor lag between the perigee manager round reset and the peer perigee timestamp resets, + // better to clear this after the perigee manager reset to avoid penalizing fast peers sending data in this short lag. + self.p2p_adaptor.clear_perigee_timestamps().await; + debug!("Connection manager: Reset perigee round"); + } - self.handle_connection_requests(&peer_by_address).await; - self.handle_outbound_connections(&peer_by_address).await; - self.handle_inbound_connections(&peer_by_address).await; + fn get_peers_by_address(self: &Arc, include_perigee_data: bool) -> Arc> { + debug!("Getting peers by addresses (include_perigee_data={})", include_perigee_data); + let peers = self.p2p_adaptor.active_peers(include_perigee_data); + Arc::new(peers.into_iter().map(|peer| (peer.net_address(), peer)).collect()) } - pub async fn add_connection_request(&self, address: SocketAddr, is_permanent: bool) { + async fn handle_event(self: Arc, event: ConnectionManagerEvent) { + match event { + ConnectionManagerEvent::Tick(tick_count) => { + let should_initiate_perigee = self.perigee_config.as_ref().is_some_and(|pc| pc.persistence && tick_count == 0); + let should_activate_perigee = + self.perigee_config.as_ref().is_some_and(|pc| tick_count % pc.round_frequency == 0 && tick_count != 0); + if should_initiate_perigee { + let mut peer_by_address = self.get_peers_by_address(false); + // First, we await populating outbound connections. + self.handle_outbound_connections(peer_by_address.clone(), HashSet::new()).await; + + // Now, we can reinstate peer_by_address to include the newly connected peers + peer_by_address = self.get_peers_by_address(false); // don't need the data to init + + // Continue with the congruent connection handling. + try_join!( + self.spawn_initiate_perigee(&peer_by_address), + self.spawn_handle_connection_requests(peer_by_address.clone()), + self.spawn_handle_inbound_connections(peer_by_address.clone()), + ) + .unwrap(); + } else if should_activate_perigee { + let peer_by_address = self.get_peers_by_address(true); + + // This is a round where perigee should be evaluated and processed. + + // We await this (not spawn), so that `spawn_handle_outbound_connections` is called after the perigee round evaluation is executed, + let peers_evicted = self.evaluate_perigee_round(peer_by_address.clone()).await; + + // Reset the perigee round state. + self.reset_perigee_round().await; + + // Continue with the regular congruent connection handling. + try_join!( + self.spawn_handle_outbound_connections(peer_by_address.clone(), peers_evicted), + self.spawn_handle_inbound_connections(peer_by_address.clone()), + self.spawn_handle_connection_requests(peer_by_address.clone()), + ) + .unwrap(); + } else { + let peer_by_address = self.get_peers_by_address(false); + try_join!( + self.spawn_handle_outbound_connections(peer_by_address.clone(), HashSet::new()), + self.spawn_handle_inbound_connections(peer_by_address.clone()), + self.spawn_handle_connection_requests(peer_by_address.clone()), + ) + .unwrap(); + } + } + ConnectionManagerEvent::AddPeer => { + let peer_by_address = self.get_peers_by_address(false); + // We only need to handle connection requests for this event. + self.spawn_handle_connection_requests(peer_by_address).await.unwrap(); + } + } + } + + pub async fn add_connection_requests(self: Arc, requests: Vec<(SocketAddr, bool)>) { // If the request already exists, it resets the attempts count and overrides the `is_permanent` setting. - self.connection_requests.lock().await.insert(address, ConnectionRequest::new(is_permanent)); - self.force_next_iteration.send(()).unwrap(); // We force the next iteration of the connection loop. + let mut connection_requests = self.connection_requests.lock().await; + for (address, is_permanent) in requests { + connection_requests.insert(address, ConnectionRequest::new(is_permanent)); + } + drop(connection_requests); + self.handle_event(ConnectionManagerEvent::AddPeer).await; } pub async fn stop(&self) { self.shutdown_signal.trigger.trigger() } - async fn handle_connection_requests(self: &Arc, peer_by_address: &HashMap) { + fn spawn_handle_connection_requests(self: &Arc, peer_by_address: Arc>) -> JoinHandle<()> { + let cmgr = self.clone(); + tokio::spawn(async move { cmgr.handle_connection_requests(peer_by_address).await }) + } + + async fn handle_connection_requests(self: &Arc, peer_by_address: Arc>) { let mut requests = self.connection_requests.lock().await; let mut new_requests = HashMap::with_capacity(requests.len()); for (address, request) in requests.iter() { @@ -127,13 +294,14 @@ impl ConnectionManager { if !is_connected && request.next_attempt <= SystemTime::now() { debug!("Connecting to peer request {}", address); - match self.p2p_adaptor.connect_peer(address.to_string()).await { + match self.p2p_adaptor.connect_peer(address.to_string(), PeerOutboundType::UserSupplied).await { Err(err) => { debug!("Failed connecting to peer request: {}, {}", address, err); if request.is_permanent { const MAX_ACCOUNTABLE_ATTEMPTS: u32 = 4; - let retry_duration = - Duration::from_secs(30u64 * 2u64.pow(min(request.attempts, MAX_ACCOUNTABLE_ATTEMPTS))); + let retry_duration = Duration::from_secs( + EVENT_LOOP_TIMER.as_secs() * 2u64.pow(min(request.attempts, MAX_ACCOUNTABLE_ATTEMPTS)), + ); debug!("Will retry peer request {} in {}", address, DurationString::from(retry_duration)); new_requests.insert( address, @@ -159,15 +327,130 @@ impl ConnectionManager { *requests = new_requests; } - async fn handle_outbound_connections(self: &Arc, peer_by_address: &HashMap) { - let active_outbound: HashSet = - peer_by_address.values().filter(|peer| peer.is_outbound()).map(|peer| peer.net_address().into()).collect(); - if active_outbound.len() >= self.outbound_target { - return; + fn spawn_handle_outbound_connections( + self: &Arc, + peer_by_address: Arc>, + to_terminate: HashSet, + ) -> JoinHandle<()> { + let cmgr = self.clone(); + tokio::spawn(async move { cmgr.handle_outbound_connections(peer_by_address, to_terminate).await }) + } + + async fn handle_outbound_connections( + self: &Arc, + peer_by_address: Arc>, + to_terminate: HashSet, + ) { + debug!("Handling outbound connections..."); + + let mut active_outbound = HashSet::new(); + let mut num_active_perigee_outbound = 0usize; + let mut num_active_random_graph_outbound = 0usize; + + let peers_by_address = if !to_terminate.is_empty() { + // Create a filtered view of peer_by_address without the terminated peers. + let filtered_peer_by_address = Arc::new( + peer_by_address + .iter() + // Filter out peers that were just terminated. + .filter(|(_, peer)| !to_terminate.contains(&peer.key())) + .map(|(addr, peer)| (*addr, peer.clone())) + .collect::>(), + ); + + // Terminate peers passed explicitly. + self.terminate_peers(to_terminate.into_iter()).await; + + filtered_peer_by_address + } else { + peer_by_address.clone() + }; + + for peer in peers_by_address.values() { + match peer.outbound_type() { + Some(obt) => { + let net_addr = NetAddress::new(peer.net_address().ip().into(), peer.net_address().port()); + active_outbound.insert(net_addr); + match obt { + PeerOutboundType::Perigee => num_active_perigee_outbound += 1, + PeerOutboundType::RandomGraph => num_active_random_graph_outbound += 1, + _ => continue, + }; + } + None => continue, + }; } - let mut missing_connections = self.outbound_target - active_outbound.len(); - let mut addr_iter = self.address_manager.lock().iterate_prioritized_random_addresses(active_outbound); + let num_active_outbound_respecting_peers = num_active_perigee_outbound + num_active_random_graph_outbound; + + info!( + "Connection manager: outbound respecting connections: {}/{} (Perigee: {}/{}, RandomGraph: {}/{}); Others: {} )", + num_active_outbound_respecting_peers, + self.outbound_target(), + num_active_perigee_outbound, + self.perigee_outbound_target(), + num_active_random_graph_outbound, + self.random_graph_target, + active_outbound.len().saturating_sub(num_active_outbound_respecting_peers) + ); + + let mut missing_connections = self.outbound_target().saturating_sub(num_active_outbound_respecting_peers); + + if missing_connections == 0 { + let random_graph_overflow = num_active_random_graph_outbound.saturating_sub(self.random_graph_target); + if random_graph_overflow > 0 { + let to_terminate_keys = active_outbound + .iter() + .filter_map(|addr| match peer_by_address.get(&SocketAddr::new(addr.ip.into(), addr.port)) { + Some(peer) if peer.is_random_graph() => Some(peer.key()), + _ => None, + }) + .choose_multiple(&mut thread_rng(), random_graph_overflow); + + info!( + "Connection manager: terminating {} excess random graph outbound connections to respect the target of {}", + random_graph_overflow, self.random_graph_target + ); + self.terminate_peers(to_terminate_keys.into_iter()).await; + }; + let perigee_overflow = num_active_perigee_outbound.saturating_sub(self.perigee_outbound_target()); + if perigee_overflow > 0 { + let to_terminate_keys = { + let mut pm = self.perigee_manager.as_ref().unwrap().lock(); + pm.trim_peers(peers_by_address).into_iter().collect::>() + }; + + info!( + "Connection manager: terminating {} excess perigee outbound connections to respect the target of {}", + perigee_overflow, + self.perigee_outbound_target() + ); + self.terminate_peers(to_terminate_keys.into_iter()).await; + } + } + + let mut missing_random_graph_connections = self.random_graph_target.saturating_sub(num_active_random_graph_outbound); + + let mut missing_perigee_connections = missing_connections.saturating_sub(missing_random_graph_connections); + + // Use a boxed ExactSizeIterator so the `else` branch can return the + // address-manager iterator directly (no extra collect). Only the + // perigee branch builds a Vec which is necessary to prepend persisted + // addresses. + let mut addr_iter = if active_outbound.is_empty() && self.perigee_config.as_ref().is_some_and(|pc| pc.persistence) { + // On fresh start-up (or some other full peer clearing event), and with perigee persistence, + // we prioritize perigee peers saved to the DB from some previous round. + let persistent_perigee_addresses = self.address_manager.lock().get_perigee_addresses(); + + let leverage_target = self.perigee_config.as_ref().unwrap().leverage_target; + + // Collect the persisted perigee addresses first. + let priorities = persistent_perigee_addresses.into_iter().take(leverage_target).collect(); + + self.address_manager.lock().iterate_prioritized_random_addresses(priorities, active_outbound) + } else { + self.address_manager.lock().iterate_prioritized_random_addresses(vec![], active_outbound) + }; let mut progressing = true; let mut connecting = true; @@ -175,9 +458,14 @@ impl ConnectionManager { if self.shutdown_signal.trigger.is_triggered() { return; } + let mut addrs_to_connect = Vec::with_capacity(missing_connections); let mut jobs = Vec::with_capacity(missing_connections); - for _ in 0..missing_connections { + let mut random_graph_addrs = HashSet::new(); + let mut perigee_addrs = HashSet::new(); + + // Because we potentially prioritized perigee connections to the start of addr_iter, we should start with perigee peers. + for _ in 0..missing_perigee_connections { let Some(net_addr) = addr_iter.next() else { connecting = false; break; @@ -185,23 +473,36 @@ impl ConnectionManager { let socket_addr = SocketAddr::new(net_addr.ip.into(), net_addr.port).to_string(); debug!("Connecting to {}", &socket_addr); addrs_to_connect.push(net_addr); - jobs.push(self.p2p_adaptor.connect_peer(socket_addr.clone())); + perigee_addrs.insert(net_addr); + jobs.push(self.p2p_adaptor.connect_peer(socket_addr.clone(), PeerOutboundType::Perigee)); + } + + for _ in 0..missing_random_graph_connections { + let Some(net_addr) = addr_iter.next() else { + connecting = false; + break; + }; + let socket_addr = SocketAddr::new(net_addr.ip.into(), net_addr.port).to_string(); + debug!("Connecting to {}", &socket_addr); + addrs_to_connect.push(net_addr); + random_graph_addrs.insert(net_addr); + jobs.push(self.p2p_adaptor.connect_peer(socket_addr.clone(), PeerOutboundType::RandomGraph)); } if progressing && !jobs.is_empty() { // Log only if progress was made info!( - "Connection manager: has {}/{} outgoing P2P connections, trying to obtain {} additional connection(s)...", - self.outbound_target - missing_connections, - self.outbound_target, + "Connection manager: trying to obtain {} additional outbound connection(s) ({}/{}).", jobs.len(), + self.outbound_target() - missing_connections, + self.outbound_target(), ); progressing = false; } else { debug!( "Connection manager: outgoing: {}/{} , connecting: {}, iterator: {}", - self.outbound_target - missing_connections, - self.outbound_target, + self.outbound_target() - missing_connections, + self.outbound_target(), jobs.len(), addr_iter.len(), ); @@ -211,6 +512,11 @@ impl ConnectionManager { match res { Ok(_) => { self.address_manager.lock().mark_connection_success(net_addr); + if perigee_addrs.contains(&net_addr) { + missing_perigee_connections -= 1; + } else { + missing_random_graph_connections -= 1; + } missing_connections -= 1; progressing = true; } @@ -227,7 +533,7 @@ impl ConnectionManager { } if missing_connections > 0 && !self.dns_seeders.is_empty() { - if missing_connections > self.outbound_target / 2 { + if missing_connections > self.outbound_target() / 2 { // If we are missing more than half of our target, query all in parallel. // This will always be the case on new node start-up and is the most resilient strategy in such a case. self.dns_seed_many(self.dns_seeders.len()).await; @@ -238,19 +544,26 @@ impl ConnectionManager { } } - async fn handle_inbound_connections(self: &Arc, peer_by_address: &HashMap) { + fn spawn_handle_inbound_connections(self: &Arc, peer_by_address: Arc>) -> JoinHandle<()> { + let cmgr = self.clone(); + tokio::spawn(async move { cmgr.handle_inbound_connections(peer_by_address).await }) + } + + async fn handle_inbound_connections(self: &Arc, peer_by_address: Arc>) { let active_inbound = peer_by_address.values().filter(|peer| !peer.is_outbound()).collect_vec(); let active_inbound_len = active_inbound.len(); + + info!("Connection manager: inbound connections: {}/{}", active_inbound_len, self.inbound_limit,); + if self.inbound_limit >= active_inbound_len { return; } - let mut futures = Vec::with_capacity(active_inbound_len - self.inbound_limit); - for peer in active_inbound.choose_multiple(&mut thread_rng(), active_inbound_len - self.inbound_limit) { - debug!("Disconnecting from {} because we're above the inbound limit", peer.net_address()); - futures.push(self.p2p_adaptor.terminate(peer.key())); - } - join_all(futures).await; + let to_terminate = active_inbound.choose_multiple(&mut thread_rng(), active_inbound_len - self.inbound_limit); + + info!("Connection manager: terminating {} inbound peers", to_terminate.len()); + + self.terminate_peers(to_terminate.into_iter().map(|peer| peer.key())).await; } /// Queries DNS seeders in random order, one after the other, until obtaining `min_addresses_to_fetch` addresses @@ -315,7 +628,7 @@ impl ConnectionManager { if self.ip_has_permanent_connection(ip).await { return; } - for peer in self.p2p_adaptor.active_peers() { + for peer in self.p2p_adaptor.active_peers(false) { if peer.net_address().ip() == ip { self.p2p_adaptor.terminate(peer.key()).await; } @@ -337,4 +650,21 @@ impl ConnectionManager { pub async fn ip_has_permanent_connection(&self, ip: IpAddr) -> bool { self.connection_requests.lock().await.iter().any(|(address, request)| request.is_permanent && address.ip() == ip) } + + pub fn outbound_target(&self) -> usize { + self.random_graph_target + self.perigee_outbound_target() + } + + pub fn perigee_outbound_target(&self) -> usize { + self.perigee_config.as_ref().map_or(0, |config| config.perigee_outbound_target) + } + + async fn terminate_peers(&self, peer_keys: impl IntoIterator) { + let mut futures = Vec::new(); + for peer_key in peer_keys.into_iter() { + debug!("Terminating peer: {}", peer_key.sock_addr()); + futures.push(self.p2p_adaptor.terminate(peer_key)); + } + join_all(futures).await; + } } diff --git a/components/perigeemanager/Cargo.toml b/components/perigeemanager/Cargo.toml new file mode 100644 index 0000000000..9465907204 --- /dev/null +++ b/components/perigeemanager/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "kaspa-perigeemanager" +description = "Kaspa perigee manager" +rust-version.workspace = true +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +kaspa-consensus-core.workspace = true +kaspa-p2p-lib.workspace = true +kaspa-hashes.workspace = true +kaspa-core.workspace = true + +log.workspace = true +itertools.workspace = true +parking_lot.workspace = true +rand.workspace = true + +[dev-dependencies] +kaspa-p2p-lib = { workspace = true, features = ["test-utils"] } +kaspa-utils.workspace = true +uuid.workspace = true + + +[lints] +workspace = true diff --git a/components/perigeemanager/src/lib.rs b/components/perigeemanager/src/lib.rs new file mode 100644 index 0000000000..128dfb491b --- /dev/null +++ b/components/perigeemanager/src/lib.rs @@ -0,0 +1,977 @@ +use std::{ + cmp::{min, Ordering}, + collections::{hash_map::Entry, HashMap, HashSet}, + fmt::Display, + net::SocketAddr, + sync::{atomic::AtomicBool, Arc}, + time::{Duration, Instant}, +}; + +use itertools::Itertools; +use kaspa_consensus_core::{BlockHashSet, Hash, HashMapCustomHasher}; +use kaspa_core::{debug, info, trace}; +use kaspa_p2p_lib::{Peer, PeerKey, Router}; +use parking_lot::Mutex; +use rand::{seq::IteratorRandom, thread_rng, Rng}; + +// Tolerance for the number of blocks verified in a round to trigger evaluation. +// For example, at 0.175, if we expect to see 200 blocks verified in a round, but fewer or more than +// 175 or 225 (respectively) are verified, we skip the leverage evaluation for this round. +// The reasoning is that network conditions are not considered stable enough to make a good decision, +// and we would rather skip and wait for the next round. +// Note that exploration can still happen even if this threshold is not met. +// This ensures that we continue to explore in case network conditions are the fault of the connect peers, not network-wide. +const BLOCKS_VERIFIED_FAULT_TOLERANCE: f64 = 0.175; +const IDENT: &str = "PerigeeManager"; + +/// Holds the score for a peer. +#[derive(Debug)] +pub struct PeerScore { + p90: u64, + p95: u64, + p97_5: u64, +} + +impl PeerScore { + const MAX: PeerScore = PeerScore { p90: u64::MAX, p95: u64::MAX, p97_5: u64::MAX }; + + #[inline(always)] + fn new(p90: u64, p95: u64, p97_5: u64) -> Self { + PeerScore { p90, p95, p97_5 } + } +} + +impl PartialEq for PeerScore { + #[inline(always)] + fn eq(&self, other: &Self) -> bool { + (self.p90, self.p95, self.p97_5) == (other.p90, other.p95, other.p97_5) + } +} + +impl Eq for PeerScore {} + +impl PartialOrd for PeerScore { + #[inline(always)] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PeerScore { + #[inline(always)] + fn cmp(&self, other: &Self) -> Ordering { + (self.p90, self.p95, self.p97_5).cmp(&(other.p90, other.p95, other.p97_5)) + } +} + +/// Configuration for the perigee manager. +#[derive(Debug, Clone)] +pub struct PerigeeConfig { + pub perigee_outbound_target: usize, + pub leverage_target: usize, + pub exploration_target: usize, + pub round_frequency: usize, + pub round_duration: Duration, + pub expected_blocks_per_round: u64, + pub statistics: bool, + pub persistence: bool, +} + +impl PerigeeConfig { + pub fn new( + perigee_outbound_target: usize, + leverage_target: usize, + exploration_target: usize, + round_duration: usize, + connection_manager_tick_duration: Duration, + statistics: bool, + persistence: bool, + bps: u64, + ) -> Self { + let expected_blocks_per_round = bps * round_duration as u64; + let round_duration = Duration::from_secs(round_duration as u64); + Self { + perigee_outbound_target, + leverage_target, + exploration_target, + round_frequency: round_duration.as_secs() as usize / connection_manager_tick_duration.as_secs() as usize, + round_duration, + expected_blocks_per_round, + statistics, + persistence, + } + } +} + +impl Display for PerigeeConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Perigee outbound target: {}, Leverage target: {}, Exploration target: {}, Round duration: {:2} secs, Expected blocks per round: {}, Statistics: {}, Persistence: {}", + self.perigee_outbound_target, + self.leverage_target, + self.exploration_target, + self.round_duration.as_secs(), + self.expected_blocks_per_round, + self.statistics, + self.persistence + ) + } +} + +#[derive(Debug)] + +/// Manages peer selection and scoring. +pub struct PerigeeManager { + verified_blocks: BlockHashSet, // holds blocks that are consensus verified. + first_seen: HashMap, + last_round_leveraged_peers: Vec, + round_start: Instant, + round_counter: u64, + config: PerigeeConfig, + is_ibd_running: Arc, +} + +impl PerigeeManager { + pub fn new(config: PerigeeConfig, is_ibd_running: Arc) -> Mutex { + Mutex::new(Self { + verified_blocks: BlockHashSet::new(), + first_seen: HashMap::new(), + last_round_leveraged_peers: Vec::new(), + round_start: Instant::now(), + round_counter: 0, + config, + is_ibd_running, + }) + } + + pub fn insert_perigee_timestamp(&mut self, router: &Arc, hash: Hash, timestamp: Instant, verify: bool) { + // Inserts and updates the perigee timestamp for the given router + // and into the local state. + if router.is_perigee() || (self.config.statistics && router.is_random_graph()) { + router.add_perigee_timestamp(hash, timestamp); + } + if verify { + self.verify_block(hash); + } + self.maybe_insert_first_seen(hash, timestamp); + } + + pub fn set_initial_persistent_peers(&mut self, peer_keys: Vec) { + debug!("PerigeeManager: Setting initial persistent perigee peers for first round"); + self.last_round_leveraged_peers = peer_keys + } + + pub fn is_first_round(&self) -> bool { + self.round_counter == 0 + } + + pub fn trim_peers(&mut self, peers_by_address: Arc>) -> Vec { + // Contains logic to trim excess perigee peers beyond the configured target + // without executing a full evaluation. + + debug!("PerigeeManager: Trimming excess peers from perigee"); + let perigee_peers = peers_by_address.values().filter(|p| p.is_perigee()).cloned().collect::>(); + let to_remove_amount = perigee_peers.len().saturating_sub(self.config.perigee_outbound_target); + let excused_peers = self.get_excused_peers(&perigee_peers); + + perigee_peers + .iter() + // Ensure we do not remove leveraged or excused peers + .filter(|r| !self.last_round_leveraged_peers.contains(&r.key()) && !excused_peers.contains(&r.key())) + .map(|r| r.key()) + .choose_multiple(&mut thread_rng(), to_remove_amount) + .iter() + // In cases where we do not have enough non-excused/non-leveraged peers to remove, + // we fill the remaining slots with excused peers. + // Note: We do not expect to ever need to chain with last round's leveraged peers. + .chain(excused_peers.iter()) + .take(to_remove_amount) + .cloned() + .collect() + } + + pub fn evaluate_round(&mut self, peer_by_address: &HashMap) -> (Vec, HashSet, bool) { + self.round_counter += 1; + debug!("[{}]: evaluating round: {}", IDENT, self.round_counter); + + let (mut peer_table, perigee_peers) = self.build_table(peer_by_address); + + let is_ibd_running = self.is_ibd_running(); + + // First, we excuse all peers with insufficient data this round + self.excuse(&mut peer_table, &perigee_peers); + + // This excludes peers that have been excused, as well as those that have not provided any data this round. + let amount_of_contributing_perigee_peers = peer_table.len(); + // In contrast, this is the total number of perigee peers registered in the hub. + let amount_of_perigee_peers = perigee_peers.len(); + debug!( + "[{}]: amount_of_perigee_peers: {}, amount_of_contributing_perigee_peers: {}", + IDENT, amount_of_perigee_peers, amount_of_contributing_perigee_peers + ); + // For should_leverage, we are conservative and require that we have enough contributing peers for sufficient data. + let should_leverage = self.should_leverage(is_ibd_running, amount_of_contributing_perigee_peers); + // For should_explore, we are more aggressive and only require that we have enough total perigee peers. + // As insufficient data may be malicious behavior by some peers, we prefer to continue churning peers. + let should_explore = self.should_explore(is_ibd_running, amount_of_perigee_peers); + + let mut has_leveraged_changed = false; + + if !should_leverage && !should_explore { + // In this case we skip leveraging and exploration this round. + // We maintain the last round leveraged peers as-is. + debug!("[{}]: skipping leveraging and exploration this round", IDENT); + return (self.last_round_leveraged_peers.clone(), HashSet::new(), has_leveraged_changed); + } + + // i.e. the peers that we mark as "to leverage" this round. + let selected_peers = if should_leverage { + let selected_peers = self.leverage(&mut peer_table); + debug!( + "[{}]: Selected peers for leveraging this round: {:?}", + IDENT, + selected_peers.iter().map(|pk| pk.to_string()).collect_vec() + ); + // We consider rank changes as well as peer changes here, + if self.last_round_leveraged_peers != selected_peers { + // Leveraged peers has changed + debug!("[{}]: Leveraged peers have changed this round", IDENT); + has_leveraged_changed = true; + // Update last round's leveraged peers to the newly selected peers + self.last_round_leveraged_peers = selected_peers.clone(); + } + // Return the newly selected peers + selected_peers + } else { + debug!("[{}]: skipping leveraging this round", IDENT); + // Remove all previously leveraged peers from the peer table to avoid eviction + for pk in self.last_round_leveraged_peers.iter() { + peer_table.remove(pk); + } + // Return the previous set + self.last_round_leveraged_peers.clone() + }; + + // i.e. the peers that we mark as "to evict" this round. + let deselected_peers = if should_explore { + debug!("[{}]: exploring peers this round", IDENT); + self.explore(&mut peer_table, amount_of_perigee_peers) + } else { + debug!("[{}]: skipping exploration this round", IDENT); + HashSet::new() + }; + + (selected_peers, deselected_peers, has_leveraged_changed) + } + + fn leverage(&self, peer_table: &mut HashMap>) -> Vec { + // This is a greedy algorithm, and does not guarantee a globally optimal set of peers. + + // Sanity check + assert!(peer_table.len() >= self.config.leverage_target, "Potentially entering an endless loop"); + + // We use this Vec to maintain track and ordering of selected peers + let mut selected_peers: Vec> = Vec::new(); + let mut num_peers_selected = 0; + let mut remaining_table; + + // Counts the outer loop only + let mut i = 0; + + // Outer loop: (re)starts the building of an optimal set of peers from scratch, based on a joint subset scoring mechanism. + // Note: This potential repetition is not defined in the original Perigee paper, but even with extensive tie-breaking, + // and with large numbers of perigee peers (i.e., a leverage target > 16), building a single optimal set of peers quickly runs out of peers to select. + // As such, to ensure we utilize the full leveraging space, we re-run this outer loop + // to build additional independent complementary sets of peers, thereby reducing reliance on a single such set of peers. + 'outer: while num_peers_selected < self.config.leverage_target { + debug!( + "[{}]: Starting new outer loop iteration for leveraging peers, currently selected {} peers", + IDENT, num_peers_selected + ); + + selected_peers.push(Vec::new()); + + // First, we create a new empty selected peer table for this iteration + let mut selected_table = HashMap::new(); + + // We redefine the remaining table for this iteration as a clone of the original peer table + // Note: If we knew that we would not be re-entering this outer loop, we could avoid this clone. + remaining_table = peer_table.clone(); + + // Start with the last best score as max + let mut last_score = PeerScore::MAX; + + // Inner loop: This loop selects peers one by one and rates them based on contributions to advancing the current set's joint score, + // it does this until we reach the leverage target, the available peers are exhausted, or until a local optimum is reached. + 'inner: while num_peers_selected < self.config.leverage_target { + trace!( + "[{}]: New inner loop iteration for leveraging peers, currently selected {} peers", + IDENT, + selected_peers.get(i).map(|current_set| current_set.len()).unwrap_or(0) + ); + + // Get the top ranked peer from the remaining table + let (top_ranked, top_ranked_score) = match self.get_top_ranked_peer(&remaining_table) { + (Some(peer), score) => (peer, score), + _ => { + break 'outer; // no more peers to select from + } + }; + + if top_ranked_score == last_score { + // Break condition: local optimum reached. + if top_ranked_score == PeerScore::MAX { + // All remaining peers are unrankable; we cannot proceed further. + break 'outer; + } else { + // We have reached a local optimum; + if num_peers_selected < self.config.leverage_target { + // Build additional sets of leveraged peers + break 'inner; + } else { + break 'outer; + } + } + } + + selected_table.insert(top_ranked, remaining_table.remove(&top_ranked).unwrap()); + selected_peers[i].push(top_ranked); + num_peers_selected += 1; + + if num_peers_selected == self.config.leverage_target { + // Reached our target + break 'outer; + } else { + // Transform the remaining table accounting also for the newly selected peer + self.transform_peer_table(&mut selected_table, &mut remaining_table); + } + last_score = top_ranked_score; + } + + // Remove already selected peers from the global peer table + for already_selected in selected_peers[i].iter() { + peer_table.remove(already_selected); + } + + i += 1; + } + + for already_selected in selected_peers[i].iter() { + peer_table.remove(already_selected); + } + + if num_peers_selected < self.config.leverage_target { + // choose randomly from remaining peers to fill the gap + let to_choose = self.config.leverage_target - num_peers_selected; + debug!("[{}]: Leveraging did not reach intended target, randomly selecting {} remaining peers", IDENT, to_choose); + let random_keys: Vec = + peer_table.keys().choose_multiple(&mut thread_rng(), to_choose).into_iter().copied().collect(); + + for pk in random_keys { + selected_peers[i].push(pk); + peer_table.remove(&pk); + } + } + + selected_peers.into_iter().flatten().collect() + } + + fn excuse(&self, peer_table: &mut HashMap>, perigee_peers: &[Peer]) { + // Removes excused peers from the peer table so they are not considered for eviction. + for k in self.get_excused_peers(perigee_peers) { + peer_table.remove(&k); + } + } + + fn explore(&self, peer_table: &mut HashMap>, amount_of_active_perigee: usize) -> HashSet { + // This is conceptually simple: we randomly choose peers to evict from the passed peer table. + // It is expected that other logic, such as leveraging and excusing peers, has already been applied to the peer table. + let to_remove_target = std::cmp::min( + self.config.exploration_target, + amount_of_active_perigee.saturating_sub(self.config.perigee_outbound_target - self.config.exploration_target), + ); + + peer_table.keys().choose_multiple(&mut thread_rng(), to_remove_target).into_iter().cloned().collect() + } + + pub fn start_new_round(&mut self) { + // Clears state and starts a new round timer + self.clear(); + self.round_start = Instant::now(); + } + + pub fn config(&self) -> PerigeeConfig { + self.config.clone() + } + + fn maybe_insert_first_seen(&mut self, hash: Hash, timestamp: Instant) { + // Inserts the first-seen timestamp for a block if it is earlier than the existing one + // or if it does not exist yet. + match self.first_seen.entry(hash) { + Entry::Occupied(mut o) => { + let current = o.get_mut(); + if timestamp.lt(current) { + *current = timestamp; + } + } + Entry::Vacant(v) => { + v.insert(timestamp); + } + } + } + + fn verify_block(&mut self, hash: Hash) { + // Marks a block as verified for this round. + // I.e., this block will be considered in the current round's evaluation. + self.verified_blocks.insert(hash); + } + + fn clear(&mut self) { + // Resets state for a new round + debug!("[{}]: Clearing state for new round", IDENT); + self.verified_blocks.clear(); + self.first_seen.clear(); + } + + fn get_excused_peers(&self, perigee_peers: &[Peer]) -> Vec { + // Define excused peers as those that joined perigee after the round started. + // They should not be penalized for not having enough data in this round. + // We also sort them by connection time to give more trimming security to the longest connected peers first, + // This allows them more time to complete a full round. + perigee_peers + .iter() + .sorted_by_key(|p| p.connection_started()) + .filter(|p| p.connection_started() > self.round_start) + .map(|p| p.key()) + .collect() + } + + fn rate_peer(&self, values: &[u64]) -> PeerScore { + // Rates a peer based on its transformed delay values + + if values.is_empty() { + return PeerScore::MAX; + } + + // Sort values for percentile calculations + let sorted_values = { + let mut sv = values.to_owned(); + sv.sort_unstable(); + sv + }; + + let len = sorted_values.len(); + + // This is defined as the scoring mechanism in the corresponding original perigee paper. + // It favors good connectivity to the bulk of the network while still considering tail-end delays. + let p90 = sorted_values[((0.90 * len as f64) as usize).min(len - 1)]; + + // This is a deviation from the paper; + // We rate beyond the p90 to tie-break + // Testing has shown that full coverage of the p90 range often only requires ~4-6 perigee peers. + // This leaves remaining perigee peers without contribution to latency reduction. + // As such, we rate these even deeper into the tail-end delays to try to increase coverage of outlier blocks. + let p95 = sorted_values[((0.95 * len as f64) as usize).min(len - 1)]; + let p97_5 = sorted_values[((0.975 * len as f64) as usize).min(len - 1)]; + // Beyond p97_5 might be too sensitive to noise. + + PeerScore::new(p90, p95, p97_5) + } + + fn get_top_ranked_peer(&self, peer_table: &HashMap>) -> (Option, PeerScore) { + // Finds the peer with the best score in the given peer table + let mut best_peer: Option = None; + let mut best_score = PeerScore::MAX; + let mut tied_count = 0; + + for (peer, delays) in peer_table.iter() { + let score = self.rate_peer(delays); + if score < best_score { + best_score = score; + best_peer = Some(*peer); + } else if score == best_score { + tied_count += 1; + // Randomly replace with probability 1/tied_count + // This ensures we don't choose peers based on iteration / HashMap order + if thread_rng().gen_ratio(1, tied_count) { + best_peer = Some(*peer); + } + } + } + + debug!( + "[{}]: Top ranked peer from current peer table is {:?} with score p90: {}, p95: {}, p97.5: {}", + IDENT, best_peer, best_score.p90, best_score.p95, best_score.p97_5, + ); + (best_peer, best_score) + } + + fn transform_peer_table(&self, selected_peers: &mut HashMap>, candidates: &mut HashMap>) { + // Transforms the candidate peer table to min(selected peers' delay scores, candidate delay scores) + // for each delay score. This is one of the key components of the Perigee algorithm for joint subset selection. + + debug!("[{}]: Transforming peer table", IDENT); + + for j in 0..self.verified_blocks.len() { + let selected_min_j = selected_peers.values().map(|vec| vec[j]).min().unwrap(); + for candidate in candidates.values_mut() { + // We transform the delay of candidate at position j to min(candidate_delay_score[j], min(selected_peers_delay_score_at_pos[j])). + candidate[j] = min(candidate[j], selected_min_j); + } + } + } + + fn should_leverage(&self, is_ibd_running: bool, amount_of_contributing_perigee_peers: usize) -> bool { + // Conditions that need to be met to trigger leveraging: + + // 1. IBD is not running + !is_ibd_running && + // 2. Sufficient blocks have been verified this round + self.block_threshold_reached() && + // 3. We have enough contributing perigee peers to choose from + amount_of_contributing_perigee_peers >= self.config.leverage_target + } + + fn should_explore(&self, is_ibd_running: bool, amount_of_perigee_peers: usize) -> bool { + // Conditions that should trigger exploration: + + // 1. IBD is not running + !is_ibd_running && + // 2. We are within bounds to evict at least one peer - else we prefer to wait on more peers joining perigee first. + amount_of_perigee_peers > (self.config.perigee_outbound_target - self.config.exploration_target) + } + + fn block_threshold_reached(&self) -> bool { + // Checks whether the amount of verified blocks this round is within the expected bounds to consider leveraging. + // If this is not the case, the node is likely experiencing network issues, and we rather skip leveraging this round. + let verified_count = self.verified_blocks.len(); + let expected_count = self.config.expected_blocks_per_round; + let lower_bound = (expected_count as f64 * (1.0 - BLOCKS_VERIFIED_FAULT_TOLERANCE)) as usize; + let upper_bound = (expected_count as f64 * (1.0 + BLOCKS_VERIFIED_FAULT_TOLERANCE)) as usize; + debug!( + "[{}]: block_threshold_reached: verified_count={}, expected_count={}, lower_bound={}, upper_bound={}", + IDENT, verified_count, expected_count, lower_bound, upper_bound + ); + verified_count >= lower_bound && verified_count <= upper_bound + } + + fn is_ibd_running(&self) -> bool { + self.is_ibd_running.load(std::sync::atomic::Ordering::SeqCst) + } + + fn iterate_verified_first_seen(&self) -> impl Iterator { + // Iterates over first_seen entries that correspond to verified blocks only. + self.first_seen.iter().filter(move |(hash, _)| self.verified_blocks.contains(hash)) + } + + fn build_table(&self, peer_by_address: &HashMap) -> (HashMap>, Vec) { + // Builds the peer delay table for all perigee peers. + debug!("[{}]: Building peer table", IDENT); + let mut peer_table: HashMap> = HashMap::new(); + + // Pre-fetch perigee timestamps for all perigee peers. + // Calling the .perigee_timestamps() method in the loop would become expensive. + let mut perigee_timestamps = HashMap::new(); + let mut perigee_peers = Vec::new(); + for p in peer_by_address.values() { + if p.is_perigee() { + perigee_timestamps.insert(p.key(), p.perigee_timestamps()); + perigee_peers.push(p.clone()); + } + } + + for (hash, first_ts) in self.iterate_verified_first_seen() { + for (peer_key, peer_timestamps) in perigee_timestamps.iter_mut() { + let mut timestamps = peer_timestamps.as_ref().clone(); + match timestamps.entry(*hash) { + Entry::Occupied(o) => { + let delay = o.get().duration_since(*first_ts).as_millis() as u64; + peer_table.entry(*peer_key).or_default().push(delay); + } + Entry::Vacant(_) => { + // Peer did not report this block this round; assign max delay + peer_table.entry(*peer_key).or_default().push(u64::MAX); + } + } + } + } + (peer_table, perigee_peers) + } + + pub fn log_statistics(&self, peer_by_address: &HashMap) { + // Note: this function has been artificially compressed for code-sparsity, as it is not mission critical, but is rather verbose. + let (perigee_ts, rg_ts): (Vec<_>, Vec<_>) = + peer_by_address.values().filter(|p| p.is_perigee() || p.is_random_graph()).partition_map(|p| { + if p.is_perigee() { + itertools::Either::Left((p.key(), p.perigee_timestamps())) + } else { + itertools::Either::Right((p.key(), p.perigee_timestamps())) + } + }); + + let (mut p_delays, mut rg_delays, mut p_wins, mut rg_wins, mut ties) = (vec![], vec![], 0usize, 0usize, 0usize); + + for (hash, ts) in self.iterate_verified_first_seen() { + let p_d = perigee_ts.iter().filter_map(|(_, hm)| hm.get(hash).map(|t| t.duration_since(*ts).as_millis() as u64)).min(); + let rg_d = rg_ts.iter().filter_map(|(_, hm)| hm.get(hash).map(|t| t.duration_since(*ts).as_millis() as u64)).min(); + match (p_d, rg_d) { + (Some(p), Some(rg)) => { + p_delays.push(p); + rg_delays.push(rg); + match p.cmp(&rg) { + Ordering::Less => p_wins += 1, + Ordering::Greater => rg_wins += 1, + Ordering::Equal => ties += 1, + } + } + (Some(p), None) => { + p_delays.push(p); + p_wins += 1; + } + (None, Some(rg)) => { + rg_delays.push(rg); + rg_wins += 1; + } + _ => {} + } + } + + if p_delays.is_empty() && rg_delays.is_empty() { + debug!("PerigeeManager Statistics: No data available for this round"); + return; + } + + let stats = |d: &mut [u64]| -> (usize, f64, u64, u64, u64, u64, u64, u64) { + if d.is_empty() { + return (0, 0.0, 0, 0, 0, 0, 0, 0); + } + d.sort_unstable(); + let n = d.len(); + let pct = |p: f64| d[((n as f64 * p) as usize).min(n - 1)]; + (n, d.iter().sum::() as f64 / n as f64, d[n / 2], d[0], d[n - 1], pct(0.90), pct(0.95), pct(0.99)) + }; + + let (pc, pm, pmed, pmin, pmax, p90, p95, p99) = stats(&mut p_delays); + let (rc, rm, rmed, rmin, rmax, r90, r95, r99) = stats(&mut rg_delays); + let total = p_wins + rg_wins + ties; + let pct = |p, t| if t == 0 { 0.0 } else { p as f64 / t as f64 * 100.0 }; + let imp = |p: f64, r: f64| if r == 0.0 { 0.0 } else { (r - p) / r * 100.0 }; + + info!( + "[{}]\n\ + ════════════════════════════════════════════════════════════════════════════ \n\ + PERIGEE STATISTICS - Round {:4} \n\ + ════════════════════════════════════════════════════════════════════════════ \n\ + Config: Out={:<2} Leverage={:<2} Explore={:<2} Duration={:<5}s \n\ + Peers: Perigee={:<2} ({:<5} blks) | Random={:<2} ({:<5} blks) \n\ + Blocks: Verified={:<5} | Seen={:<5} \n\ + ════════════════════════════════════════════════════════════════════════════ \n\ + BLOCK DELIVERY RACE \n\ + Perigee Wins: {:5} ({:5.1}%) \n\ + Random Graph Wins: {:5} ({:5.1}%) \n\ + Ties: {:5} ({:5.1}%) \n\ + ════════════════════════════════════════════════════════════════════════════ \n\ + DELAY STATISTICS (ms) │ Perigee │ Random Graph │ Improvement \n\ + ─────────────────────────────┼───────────┼──────────────┼────────────────── \n\ + Count │ {:9} │ {:12} │ \n\ + Mean │ {:9.2} │ {:12.2} │ {:7.2} ({:5.1}%) \n\ + Median │ {:9} │ {:12} │ {:7} ({:5.1}%) \n\ + Min │ {:9} │ {:12} │ \n\ + Max │ {:9} │ {:12} │ \n\ + P90 │ {:9} │ {:12} │ {:7} ({:5.1}%) \n\ + P95 │ {:9} │ {:12} │ \n\ + P99 │ {:9} │ {:12} │ \n\ + ════════════════════════════════════════════════════════════════════════════ ", + IDENT, + self.round_counter, + self.config.perigee_outbound_target, + self.config.leverage_target, + self.config.exploration_target, + self.config.round_duration.as_secs(), + perigee_ts.len(), + perigee_ts.iter().map(|(_, hm)| hm.len()).sum::(), + rg_ts.len(), + rg_ts.iter().map(|(_, hm)| hm.len()).sum::(), + self.verified_blocks.len(), + self.first_seen.len(), + p_wins, + pct(p_wins, total), + rg_wins, + pct(rg_wins, total), + ties, + pct(ties, total), + pc, + rc, + pm, + rm, + rm - pm, + imp(pm, rm), + pmed, + rmed, + rmed as i64 - pmed as i64, + imp(pmed as f64, rmed as f64), + pmin, + rmin, + pmax, + rmax, + p90, + r90, + r90 as i64 - p90 as i64, + imp(p90 as f64, r90 as f64), + p95, + r95, + p99, + r99 + ); + } +} +#[cfg(test)] +mod tests { + use super::*; + use kaspa_consensus_core::config::params::TESTNET_PARAMS; + use kaspa_hashes::Hash; + use kaspa_p2p_lib::test_utils::RouterTestExt; + use kaspa_p2p_lib::PeerOutboundType; + use kaspa_utils::networking::PeerId; + + use std::collections::HashMap; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc; + use std::time::Instant; + use uuid::Uuid; + + /// Generates a unique Router wit incremental IPv4 SocketAddr and PeerId for testing purposes. + fn generate_unique_router(time_connected: Instant) -> std::sync::Arc { + static ROUTER_COUNTER: AtomicU64 = AtomicU64::new(1); + + let id = ROUTER_COUNTER.fetch_add(1, Ordering::Relaxed); + let ip_seed = id; + let octet1 = ((ip_seed >> 24) & 0xFF) as u8; + let octet2 = ((ip_seed >> 16) & 0xFF) as u8; + let octet3 = ((ip_seed >> 8) & 0xFF) as u8; + let octet4 = (ip_seed & 0xFF) as u8; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(octet1, octet2, octet3, octet4)), TESTNET_PARAMS.default_p2p_port()); + let peer_id = PeerId::new(Uuid::from_u128(id as u128)); + RouterTestExt::test_new(peer_id, addr, Some(PeerOutboundType::Perigee), time_connected) + } + + // Helper to generate a default PerigeeConfig for testing purposes + fn generate_config() -> PerigeeConfig { + PerigeeConfig::new(8, 4, 2, 30, std::time::Duration::from_secs(30), true, true, TESTNET_PARAMS.bps()) + } + + // Helper to generate a globally unique block hash + fn generate_unique_block_hash() -> Hash { + static HASH_COUNTER: AtomicU64 = AtomicU64::new(1); + + Hash::from_u64_word(HASH_COUNTER.fetch_add(1, Ordering::Relaxed)) + } + + #[test] + fn test_insertions() { + let routers = (0..2).map(|_| generate_unique_router(Instant::now())).collect::>(); + let manager = PerigeeManager::new(generate_config(), Arc::new(std::sync::atomic::AtomicBool::new(false))); + let now: Vec<_> = (0..4).map(|_| Instant::now()).collect(); + let block_hashes: Vec<_> = (0..4).map(|_| generate_unique_block_hash()).collect(); + for (i, (now, block_hash)) in now.iter().zip(block_hashes.iter()).enumerate() { + manager.lock().insert_perigee_timestamp(&routers[(i + 1) % 2].clone(), *block_hash, *now, (i + 1) % 2 == 0); + } + + let manager = manager.lock(); + // Check first_seen and router timestamps + for (i, (now, block_hash)) in now.iter().zip(block_hashes.iter()).enumerate() { + let ts = manager.first_seen.get(block_hash).unwrap(); + assert_eq!(ts, now); + // Only the router that received the block should have it, and timestamp should match + let idx = (i + 1) % 2; + if idx == 0 { + assert!(manager.verified_blocks.contains(block_hash), "Block should be verified for even indices"); + } else { + assert!(!manager.verified_blocks.contains(block_hash), "Block should not be verified for odd indices"); + } + let perigee_timestamps = &routers[idx].perigee_timestamps(); + let router_ts = perigee_timestamps.get(block_hash).unwrap(); + assert_eq!(router_ts, now, "Router's perigee_timestamps should match inserted timestamp"); + assert_eq!(router_ts, ts, "Router's perigee_timestamps should match manager's first_seen"); + // The other router should NOT have this block_hash + let other_perigee_timestamps = &routers[1 - idx].perigee_timestamps(); + assert!(!other_perigee_timestamps.contains_key(block_hash), "Other router should not have this block hash"); + } + + // Check lengths + assert_eq!(manager.first_seen.len(), block_hashes.len(), "first_seen should have all inserted blocks"); + assert_eq!(manager.verified_blocks.len(), block_hashes.len().div_ceil(2), "verified_blocks should have half the blocks"); + for router in &routers { + let perigee_timestamps = router.perigee_timestamps(); + assert_eq!(perigee_timestamps.len(), block_hashes.len() / 2, "Each router should have half the block hashes"); + } + } + + #[test] + fn test_trim_peers() { + // Set-up environment + let config = generate_config(); + let manager = PerigeeManager::new(config.clone(), Arc::new(std::sync::atomic::AtomicBool::new(false))); + let leverage_target = config.leverage_target; + let perigee_outbound_target = config.perigee_outbound_target; + let excused_count = perigee_outbound_target + 1 - leverage_target; + // Set up so that all non-leveraged, non-excused peers are needed to fill the outbound target, so only one excused peer can be trimmed + let total_peers = leverage_target + excused_count; + + // Create leveraged peers (should not be trimmed) + let now = Instant::now() - std::time::Duration::from_secs(3600); + let mut routers = Vec::new(); + for _ in 0..leverage_target { + routers.push(generate_unique_router(now)); + } + + // Create excused peers, joined after round start, should be excused and and only trimmed as a last resort (ordered by connection time) + let mut excused_routers = Vec::new(); + for i in 0..excused_count { + let t = now + std::time::Duration::from_secs(10 + i as u64); + excused_routers.push(generate_unique_router(t)); + } + + // Build all peers + let mut peers = HashMap::new(); + for router in routers.iter().chain(excused_routers.iter()) { + let peer = Peer::from((&**router, true)); + peers.insert(router.key(), peer.clone()); + } + + // Build peer_by_addr + let mut peer_by_addr = HashMap::new(); + for peer in peers.values() { + peer_by_addr.insert(peer.net_address(), peer.clone()); + } + + // Set leveraged and excused peers in manager + manager.lock().set_initial_persistent_peers(routers.iter().map(|r| r.key()).collect()); + manager.lock().round_start = now; // Set round start to 'now' for excused logic + + // Call trim_peers + let to_remove = manager.lock().trim_peers(Arc::new(peer_by_addr)); + + // Assert correct number trimmed + let expected_trim = total_peers - perigee_outbound_target; + assert_eq!(to_remove.len(), expected_trim, "Should trim down to perigee_outbound_target"); + + // Assert no leveraged peer is trimmed + let leveraged_keys: Vec<_> = routers.iter().map(|r| r.key()).collect(); + for k in &to_remove { + assert!(!leveraged_keys.contains(k), "Leveraged peer should not be trimmed"); + } + + // Assert that exactly one excused peer is trimmed (evicted), and the rest are not + let excused_keys: Vec<_> = excused_routers.iter().map(|r| r.key()).collect(); + let excused_trimmed: Vec<_> = excused_keys.iter().filter(|k| to_remove.contains(k)).collect(); + assert_eq!(excused_trimmed.len(), 1, "Exactly one excused peer should be trimmed as a last resort"); + // The rest of the excused peers should not be trimmed + let excused_not_trimmed: Vec<_> = excused_keys.iter().filter(|k| !to_remove.contains(k)).collect(); + assert_eq!(excused_not_trimmed.len(), excused_count - 1, "All but one excused peer should remain"); + // Check excused ordering by connection time (still valid for remaining excused) + let mut excused_peers: Vec<_> = excused_routers.iter().map(|r| peers.get(&r.key()).unwrap()).collect(); + excused_peers.sort_by_key(|p| p.connection_started()); + for w in excused_peers.windows(2) { + assert!(w[0].connection_started() <= w[1].connection_started(), "Excused peers should be ordered by connection time"); + } + } + + #[test] + fn test_peer_rating() { + let score = (0..1000).collect::>(); + let manager = PerigeeManager::new(generate_config(), Arc::new(std::sync::atomic::AtomicBool::new(false))); + let peer_score = manager.lock().rate_peer(&score); + let expected_peer_score = PeerScore::new(900, 950, 975); + assert_eq!(peer_score, expected_peer_score); + } + + #[test] + fn test_perigee_round_leverage_and_eviction() { + run_round(false); + } + + #[test] + fn test_perigee_round_skips_while_ibd_running() { + run_round(true); + } + + fn run_round(ibd_running: bool) { + kaspa_core::log::try_init_logger("debug"); + + // Set up environment + let is_ibd_running = Arc::new(std::sync::atomic::AtomicBool::new(ibd_running)); + let mut config = generate_config(); + let now = Instant::now() - std::time::Duration::from_secs(3600); + let peer_count = config.perigee_outbound_target; + let blocks_per_router = 300; + config.expected_blocks_per_round = blocks_per_router as u64; + let manager = PerigeeManager::new(config, is_ibd_running); + let mut routers = Vec::new(); + for _ in 0..peer_count { + let router = generate_unique_router(now); + routers.push(router); + } + + // Insert blocks using a deterministic delay pattern via bucketing ts + let leverage_target = manager.lock().config.leverage_target; + for block_idx in 0..blocks_per_router { + let block_hash = generate_unique_block_hash(); + let base_ts = now + std::time::Duration::from_millis((block_idx as u64) * 10_000); + for (i, router) in routers.iter().enumerate() { + let ts = if i < leverage_target { + let bucket_start = (i as u64) * 10; + let delay = bucket_start + (block_idx as u64 % 10); + base_ts + std::time::Duration::from_millis(delay) + } else { + base_ts + std::time::Duration::from_millis(100_000 + (i as u64) * 10) + }; + manager.lock().insert_perigee_timestamp(router, block_hash, ts, true); + } + } + + assert!(manager.lock().verified_blocks.len() == blocks_per_router); + + // Build peers and peer_by_addr after all timestamps are inserted + let mut peers = HashMap::new(); + for router in &routers { + let peer = Peer::from((&**router, true)); + peers.insert(router.key(), peer.clone()); + } + let mut peer_by_addr = HashMap::new(); + for peer in peers.values() { + peer_by_addr.insert(peer.net_address(), peer.clone()); + } + + // Execute a perigee round + let (leveraged, evicted, has_leveraged_changed) = manager.lock().evaluate_round(&peer_by_addr); + debug!("Leveraged peers: {:?}", leveraged); + debug!("Evicted peers: {:?}", evicted); + + // Perform assertions: + if ibd_running { + // While IBD is running, no leveraging or eviction should occur + assert!(!has_leveraged_changed, "Leveraging should be skipped while IBD is running"); + assert!(leveraged.is_empty(), "No peers should be leveraged while IBD is running"); + assert!(evicted.is_empty(), "No peers should be evicted while IBD is running"); + return; + }; + assert!(has_leveraged_changed, "Leveraging should not be skipped in this test"); + assert_eq!( + leveraged, + routers.iter().take(leverage_target).map(|r| r.key()).collect::>(), + "Leverage set should match actual deterministic selection (order and membership)" + ); + // No leveraged peer should be evicted + assert!(leveraged.iter().all(|p| !evicted.contains(p)), "No leveraged peer should be evicted"); + assert_eq!(evicted.len(), manager.lock().config.exploration_target); + + // Reset round. + manager.lock().start_new_round(); + // Ensure state is cleared. + assert!(manager.lock().verified_blocks.is_empty(), "Verified blocks should be cleared after starting new round"); + assert!(manager.lock().first_seen.is_empty(), "First seen timestamps should be cleared after starting new round"); + } +} diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 34fc530330..48fa9674f4 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -691,7 +691,7 @@ impl VirtualStateProcessor { /// Assumes: /// 1. `selected_parent` is a UTXO-valid block /// 2. `candidates` are an antichain ordered in descending blue work order - /// 3. `candidates` do not contain `selected_parent` and `selected_parent.blue work > max(candidates.blue_work)` + /// 3. `candidates` do not contain `selected_parent` and `selected_parent.blue work > max(candidates.blue_work)` pub(super) fn pick_virtual_parents( &self, selected_parent: Hash, diff --git a/database/src/registry.rs b/database/src/registry.rs index cd59165f5f..8f27ca904c 100644 --- a/database/src/registry.rs +++ b/database/src/registry.rs @@ -69,6 +69,7 @@ pub enum DatabaseStorePrefixes { // ---- Components ---- Addresses = 128, BannedAddresses = 129, + PerigeeAddresses = 130, // ---- Indexes ---- UtxoIndex = 192, diff --git a/kaspad/Cargo.toml b/kaspad/Cargo.toml index 91373baf50..fe74732d67 100644 --- a/kaspad/Cargo.toml +++ b/kaspad/Cargo.toml @@ -19,6 +19,7 @@ kaspa-alloc.workspace = true # This changes the global allocator for all of the kaspa-addresses.workspace = true kaspa-addressmanager.workspace = true +kaspa-connectionmanager.workspace = true kaspa-consensus-core.workspace = true kaspa-consensus-notify.workspace = true kaspa-consensus.workspace = true @@ -33,6 +34,7 @@ kaspa-notify.workspace = true kaspa-p2p-flows.workspace = true kaspa-p2p-lib.workspace = true kaspa-p2p-mining.workspace = true +kaspa-perigeemanager.workspace = true kaspa-perf-monitor.workspace = true kaspa-rpc-core.workspace = true kaspa-rpc-service.workspace = true diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index f079766831..7781dac6c6 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -56,6 +56,13 @@ pub struct Args { pub reset_db: bool, #[serde(rename = "outpeers")] pub outbound_target: usize, + pub blk_perigee_peers: usize, // total number of perigee peers, must be <= outpeers, the diff will be routed through random graph. + pub blk_perigee_exploration: usize, + pub blk_perigee_leverage: usize, + pub blk_perigee_duration: usize, // round duration in seconds (rounded to the nearest 30 seconds), and clamped between: min 30 and max 300. + pub blk_perigee_stats: bool, + pub blk_perigee_persist: bool, // whether to persist perigee peers between restarts (This saves the perigee peers to the address db for re-use) + pub blk_perigee_reset: bool, // whether to reset perigee persisted data on startup #[serde(rename = "maxinpeers")] pub inbound_limit: usize, #[serde(rename = "rpcmaxclients")] @@ -111,6 +118,13 @@ impl Default for Args { utxoindex: false, reset_db: false, outbound_target: 8, + blk_perigee_peers: 0, + blk_perigee_leverage: 0, // 0 will default to 50% of perigee peers + blk_perigee_exploration: 0, // 0 will default to 25% of perigee peers + blk_perigee_duration: 30, // Round duration will be 30 secs + blk_perigee_stats: false, + blk_perigee_persist: false, + blk_perigee_reset: false, inbound_limit: 128, rpc_max_clients: 128, max_tracked_addresses: 0, @@ -307,9 +321,62 @@ pub fn cli() -> Command { .value_parser(clap::value_parser!(usize)) .help("Target number of outbound peers (default: 8)."), ) + .arg( + Arg::new("blk-perigee-peers") + .long("blk-perigee-peers") + .env("KASPAD_BLK_PERIGEE_PEERS") + .value_name("blk-perigee-peers") + .require_equals(true) + .value_parser(clap::value_parser!(usize)) + .help("Target number of block perigee peers (default: 0, which disables block perigee). Note: total number of perigee peers, must be <= outpeers, the diff will be routed through random graph"), + ) + .arg( + Arg::new("blk-perigee-exploration") + .long("blk-perigee-exploration") + .env("KASPAD_BLK_PERIGEE_EXPLORATION") + .require_equals(true) + .value_parser(clap::value_parser!(usize)) + .help("Number of block perigee peers to drop per round (default: 0, [the default value will set the target to 25%, rounded down, of the block perigee target])."), + ) + .arg( + Arg::new("blk-perigee-leverage") + .long("blk-perigee-leverage") + .env("KASPAD_BLK_PERIGEE_LEVERAGE") + .require_equals(true) + .value_parser(clap::value_parser!(usize)) + .help("Number of block perigee peers to leverage per round (default: 0, [the default value will set the target to 50%, rounded down, of the block perigee target])."), + ) + .arg( + Arg::new("blk-perigee-duration") + .long("blk-perigee-duration") + .env("KASPAD_BLK_PERIGEE_DURATION") + .require_equals(true) + .value_parser(clap::value_parser!(usize)) + .help("Round duration in seconds (rounded to the nearest 30 seconds), Note: this is clamped between 30 and 300 (default: 30, min 30, max 300)."), + ) + .arg( + Arg::new( "blk-perigee-stats" ) + .long("blk-perigee-stats") + .env("KASPAD_BLK_PERIGEE_STATS") + .action(ArgAction::SetTrue) + .help("Log block perigee statistics after each round, Note: this evaluates and compares against other random graph outbound peers for testing purposes,\nas such, this requires significantly more resources.\nFor optimal comparison `blkperigeepeers` should equal `outboundpeers / 2`" + ) + ) + .arg(Arg::new("blk-perigee-persist") + .long("blk-perigee-persist") + .env("KASPAD_BLK_PERIGEE_PERSIST") + .action(ArgAction::SetTrue) + .help("Persist block perigee data between restarts."), + ) + .arg(Arg::new("blk-perigee-reset") + .long("blk-perigee-reset") + .env("KASPAD_BLK_PERIGEE_RESET") + .action(ArgAction::SetTrue) + .help("Reset block perigee persisted data on startup."), + ) .arg( Arg::new("maxinpeers") - .long("maxinpeers") + .long("maxinpeers") .env("KASPAD_MAXINPEERS") .value_name("maxinpeers") .require_equals(true) @@ -342,8 +409,8 @@ pub fn cli() -> Command { .env("KASPAD_MAX_TRACKED_ADDRESSES") .require_equals(true) .value_parser(clap::value_parser!(usize)) - .help(format!("Max (preallocated) number of addresses being tracked for UTXO changed events (default: {}, maximum: {}). -Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0 memory footprint as long as unused but to sub-optimal footprint if used.", + .help(format!("Max (preallocated) number of addresses being tracked for UTXO changed events (default: {}, maximum: {}). +Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0 memory footprint as long as unused but to sub-optimal footprint if used.", 0, Tracker::MAX_ADDRESS_UPPER_BOUND, Tracker::DEFAULT_MAX_ADDRESSES)), ) .arg(arg!(--testnet "Use the test network").env("KASPAD_TESTNET")) @@ -496,6 +563,13 @@ impl Args { add_peers: arg_match_many_unwrap_or::(&m, "add-peers", defaults.add_peers), listen: m.get_one::("listen").cloned().or(defaults.listen), outbound_target: arg_match_unwrap_or::(&m, "outpeers", defaults.outbound_target), + blk_perigee_peers: arg_match_unwrap_or::(&m, "blk-perigee-peers", defaults.blk_perigee_peers), + blk_perigee_exploration: arg_match_unwrap_or::(&m, "blk-perigee-exploration", defaults.blk_perigee_exploration), + blk_perigee_leverage: arg_match_unwrap_or::(&m, "blk-perigee-leverage", defaults.blk_perigee_leverage), + blk_perigee_duration: arg_match_unwrap_or::(&m, "blk-perigee-duration", defaults.blk_perigee_duration), + blk_perigee_stats: arg_match_unwrap_or::(&m, "blk-perigee-stats", defaults.blk_perigee_stats), + blk_perigee_persist: arg_match_unwrap_or::(&m, "blk-perigee-persist", defaults.blk_perigee_persist), + blk_perigee_reset: arg_match_unwrap_or::(&m, "blk-perigee-reset", defaults.blk_perigee_reset), inbound_limit: arg_match_unwrap_or::(&m, "maxinpeers", defaults.inbound_limit), rpc_max_clients: arg_match_unwrap_or::(&m, "rpcmaxclients", defaults.rpc_max_clients), max_tracked_addresses: arg_match_unwrap_or::(&m, "max-tracked-addresses", defaults.max_tracked_addresses), @@ -571,6 +645,21 @@ fn arg_match_many_unwrap_or(m: &clap::ArgMatch --listen= Add an interface/port to listen for connections (default all interfaces port: 16111, testnet: 16211) --outpeers= Target number of outbound peers (default: 8) + --blk-perigee-peers= Target number of block perigee peers (default: 0, which disables block perigee), + Note: total number of perigee peers, must be <= outpeers, + the diff will be routed through random graph` + --blk-perigee-exploration= Number of perigee peers to drop per round of perigee + (default: 0, [the default value will set the target to 25%, rounded down, of the perigee target]). + --blk-perigee-leverage= Number of perigee peers to leverage per round of perigee + (default: 0, [the default value will set the target to 50%, rounded down, of the perigee target]). + --blk-perigee-duration= Round duration in seconds (rounded to the nearest 30 seconds), + Note: this is clamped between 30 and 300 (default: 30, min 30, max 300). + --blk-perigee-stats log perigee statistics after each round. + Note: this evaluates and compares against other random graph outbound peers for testing purposes, + as such, this requires significantly more resources. + For optimal comparison `perigeepeers` should equal `outboundpeers / 2` + --blk-perigee-persist Persist perigee data between restarts. + --blk-perigee-reset Reset perigee persisted data on startup. --maxinpeers= Max number of inbound peers (default: 117) --enablebanning Enable banning of misbehaving peers --banduration= How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 5f7390ef8d..193aef65e4 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -1,6 +1,7 @@ use std::{fs, path::PathBuf, process::exit, sync::Arc, time::Duration}; use async_channel::unbounded; +use kaspa_connectionmanager::EVENT_LOOP_TIMER; use kaspa_consensus_core::{ config::ConfigBuilder, constants::TRANSIENT_BYTE_TO_MASS_FACTOR, @@ -18,6 +19,7 @@ use kaspa_grpc_server::service::GrpcService; use kaspa_notify::{address::tracker::Tracker, subscription::context::SubscriptionContext}; use kaspa_p2p_lib::Hub; use kaspa_p2p_mining::rule_engine::MiningRuleEngine; +use kaspa_perigeemanager::{PerigeeConfig, PerigeeManager}; use kaspa_rpc_service::service::RpcCoreService; use kaspa_txscript::caches::TxScriptCacheCounters; use kaspa_utils::git; @@ -267,6 +269,76 @@ fn configure_rocksdb(args: &Args) -> (RocksDbPreset, Option, Option PerigeeConfig { + assert!( + perigee_target <= outbound_target, + "Perigee target of {} cannot exceed total outbound target of {}", + perigee_target, + outbound_target + ); + + // We only perform within at [`EVENT_LOOP_TIMER`] granularity, + let round_granularity = EVENT_LOOP_TIMER.as_secs() as usize; + let min_duration = round_granularity; + let max_duration = 300; + + assert!(min_duration - round_granularity == 0, "Min perigee round duration be at least the event loop timer granularity"); + assert!( + max_duration % round_granularity == 0, + "Max perigee round duration must be a multiple of the event loop timer granularity" + ); + + // clamp to valid ranges (>300 seconds is not allowed, as to limit excessive data accumulation), (<30 seconds is under the bounds set by the [`EVENT_LOOP_TIMER`] interval) + let round_duration = round_duration.clamp(min_duration, max_duration); + // We only perform within at [`EVENT_LOOP_TIMER`] granularity, so we round the duration to the nearest multiple of it + let round_duration = (round_duration as f64 / round_granularity as f64) as usize * round_granularity; + + let leverage_target = if leverage_target == 0 { + // Apply default to 50% of total target + perigee_target / 2 // integer division rounds down by default + } else { + leverage_target + }; + let exploration_target = if exploration_target == 0 { + // Apply default to 25% of total target + perigee_target / 4 // integer division rounds down by default + } else { + exploration_target + }; + + // assert valid targets + assert!( + (leverage_target + exploration_target) <= perigee_target, + "{}", + format!( + "Leverage target of {0} Plus the Exploration target of {1} cannot exceed the Total perigee target of {2}", + leverage_target, exploration_target, perigee_target + ) + ); + + PerigeeConfig::new( + perigee_target, + leverage_target, + exploration_target, + round_duration, + EVENT_LOOP_TIMER, + statistics, + persistence, + network_bps, + ) +} + /// Create [`Core`] instance with supplied [`Args`] and [`Runtime`]. /// /// Usage semantics: @@ -545,6 +617,33 @@ Do you confirm? (y/n)"; let p2p_server_addr = args.listen.unwrap_or(ContextualNetAddress::unspecified()).normalize(config.default_p2p_port()); // connect_peers means no DNS seeding and no outbound/inbound peers let outbound_target = if connect_peers.is_empty() { args.outbound_target } else { 0 }; + let mut random_graph_target = outbound_target; + + // Handle the Perigee configuration + let perigee_config = if args.blk_perigee_peers == 0 { + debug!("Perigee disabled: perigee target is set to 0 (default behavior)"); + None + } else if !connect_peers.is_empty() { + // We supply an explicit log here, as the user most have tried to enable perigee, and probably wants to know why, and that, it is disabled + info!("Perigee disabled: outbound target is set to 0 because `--connect-peers` argument was supplied"); + None + } else { + let perigee_config = create_perigee_config( + outbound_target, + args.blk_perigee_peers, + args.blk_perigee_duration, + args.blk_perigee_leverage, + args.blk_perigee_exploration, + args.blk_perigee_persist, + args.blk_perigee_stats, + config.bps(), + ); + // Reduce random graph outbound target by perigee outbound target + random_graph_target -= perigee_config.perigee_outbound_target; + info!("Perigee enabled - Perigee Configuration: {}", perigee_config); + Some(perigee_config) + }; + let inbound_limit = if connect_peers.is_empty() { args.inbound_limit } else { 0 }; let dns_seeders = if connect_peers.is_empty() && !args.disable_dns_seeding { config.dns_seeders } else { &[] }; @@ -623,6 +722,9 @@ Do you confirm? (y/n)"; }; let (address_manager, port_mapping_extender_svc) = AddressManager::new(config.clone(), meta_db, tick_service.clone()); + if args.blk_perigee_reset { + address_manager.lock().reset_perigee_data(); + } let mining_manager = MiningManagerProxy::new(Arc::new(MiningManager::new_with_extended_config( config.target_time_per_block(), @@ -644,6 +746,11 @@ Do you confirm? (y/n)"; hub.clone(), mining_rules, )); + + // Ibd running flag, is created here to be potentially shared with the perigee manager + let is_ibd_running = Arc::new(std::sync::atomic::AtomicBool::default()); + let perigee_manager = perigee_config.map(|perigee_config| Arc::new(PerigeeManager::new(perigee_config, is_ibd_running.clone()))); + let flow_context = Arc::new(FlowContext::new( consensus_manager.clone(), address_manager, @@ -653,13 +760,16 @@ Do you confirm? (y/n)"; notification_root, hub.clone(), mining_rule_engine.clone(), + is_ibd_running, + perigee_manager, )); + let p2p_service = Arc::new(P2pService::new( flow_context.clone(), connect_peers, add_peers, p2p_server_addr, - outbound_target, + random_graph_target, inbound_limit, dns_seeders, config.default_p2p_port(), diff --git a/protocol/flows/Cargo.toml b/protocol/flows/Cargo.toml index 0a00f1436e..f416fffd43 100644 --- a/protocol/flows/Cargo.toml +++ b/protocol/flows/Cargo.toml @@ -20,6 +20,7 @@ kaspa-utils-tower.workspace = true kaspa-hashes.workspace = true kaspa-muhash.workspace = true kaspa-connectionmanager.workspace = true +kaspa-perigeemanager.workspace = true kaspa-addressmanager.workspace = true kaspa-consensusmanager.workspace = true kaspa-mining.workspace = true diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index cafe38e3f3..3e39f64a36 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -36,6 +36,7 @@ use kaspa_p2p_lib::{ ConnectionInitializer, Hub, KaspadHandshake, PeerKey, PeerProperties, Router, }; use kaspa_p2p_mining::rule_engine::MiningRuleEngine; +use kaspa_perigeemanager::{PerigeeConfig, PerigeeManager}; use kaspa_utils::iter::IterExtensions; use kaspa_utils::networking::PeerId; use parking_lot::{Mutex, RwLock}; @@ -237,6 +238,9 @@ pub struct FlowContextInner { // Mining rule engine mining_rule_engine: Arc, + + // perigee manager + pub perigee_manager: Option>>, } #[derive(Clone)] @@ -256,7 +260,7 @@ impl Drop for IbdRunningGuard { } #[derive(Debug, Clone, Copy)] -struct IbdMetadata { +pub struct IbdMetadata { /// The peer from which current IBD is syncing from peer: PeerKey, /// The DAA score of the relay block which triggered the current IBD @@ -311,6 +315,8 @@ impl FlowContext { notification_root: Arc, hub: Hub, mining_rule_engine: Arc, + is_ibd_running: Arc, + perigee_manager: Option>>, ) -> Self { let bps = config.bps() as usize; let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (bps as f64).log2().ceil() as u32; @@ -326,7 +332,7 @@ impl FlowContext { shared_block_requests: Arc::new(Mutex::new(HashMap::new())), transactions_spread: AsyncRwLock::new(TransactionsSpread::new(hub.clone())), shared_transaction_requests: Arc::new(Mutex::new(HashMap::new())), - is_ibd_running: Default::default(), + is_ibd_running, ibd_metadata: Default::default(), hub, address_manager, @@ -340,6 +346,7 @@ impl FlowContext { max_orphans, config, mining_rule_engine, + perigee_manager, }), } } @@ -705,6 +712,27 @@ impl FlowContext { pub async fn broadcast_transactions>(&self, transaction_ids: I, should_throttle: bool) { self.transactions_spread.write().await.broadcast_transactions(transaction_ids, should_throttle).await } + + pub fn is_perigee_active(&self) -> bool { + self.perigee_manager.is_some() + } + + pub async fn maybe_add_perigee_timestamp(&self, router: Arc, hash: Hash, timestamp: Instant, verify: bool) { + if let Some(ref manager) = self.perigee_manager { + let mut manager = manager.lock(); + manager.insert_perigee_timestamp(&router, hash, timestamp, verify); + } + } + + pub fn perigee_config(&self) -> Option { + match self.perigee_manager { + Some(ref manager) => { + let manager = manager.lock(); + Some(manager.config().clone()) + } + None => None, + } + } } #[async_trait] @@ -771,7 +799,7 @@ impl ConnectionInitializer for FlowContext { // Send and receive the ready signal handshake.exchange_ready_messages().await?; - info!("Registering p2p flows for peer {} for protocol version {}", router, applied_protocol_version); + debug!("Registering p2p flows for peer {} for protocol version {}", router, applied_protocol_version); // Launch all flows. Note we launch only after the ready signal was exchanged for flow in flows { diff --git a/protocol/flows/src/service.rs b/protocol/flows/src/service.rs index 1633e26db0..b984b8d325 100644 --- a/protocol/flows/src/service.rs +++ b/protocol/flows/src/service.rs @@ -19,7 +19,7 @@ pub struct P2pService { connect_peers: Vec, add_peers: Vec, listen: NetAddress, - outbound_target: usize, + random_graph_target: usize, inbound_limit: usize, dns_seeders: &'static [&'static str], default_port: u16, @@ -33,7 +33,7 @@ impl P2pService { connect_peers: Vec, add_peers: Vec, listen: NetAddress, - outbound_target: usize, + random_graph_target: usize, inbound_limit: usize, dns_seeders: &'static [&'static str], default_port: u16, @@ -45,7 +45,7 @@ impl P2pService { add_peers, shutdown: SingleTrigger::default(), listen, - outbound_target, + random_graph_target, inbound_limit, dns_seeders, default_port, @@ -71,9 +71,11 @@ impl AsyncService for P2pService { Adaptor::bidirectional(self.listen, self.flow_context.hub().clone(), self.flow_context.clone(), self.counters.clone()) .unwrap() }; + let connection_manager = ConnectionManager::new( p2p_adaptor.clone(), - self.outbound_target, + self.random_graph_target, + self.flow_context.perigee_manager.clone(), self.inbound_limit, self.dns_seeders, self.default_port, @@ -85,9 +87,17 @@ impl AsyncService for P2pService { // Launch the service and wait for a shutdown signal Box::pin(async move { - for peer_address in self.connect_peers.iter().cloned().chain(self.add_peers.iter().cloned()) { - connection_manager.add_connection_request(peer_address.into(), true).await; - } + connection_manager + .clone() + .add_connection_requests( + self.connect_peers + .iter() + .cloned() + .chain(self.add_peers.iter().cloned()) + .map(|addr| (core::net::SocketAddr::new(*addr.ip, addr.port), true)) + .collect(), + ) + .await; // Keep the P2P server running until a service shutdown signal is received shutdown_signal.await; diff --git a/protocol/flows/src/v7/blockrelay/flow.rs b/protocol/flows/src/v7/blockrelay/flow.rs index 8d398ca01c..4376c6b642 100644 --- a/protocol/flows/src/v7/blockrelay/flow.rs +++ b/protocol/flows/src/v7/blockrelay/flow.rs @@ -10,12 +10,12 @@ use kaspa_hashes::Hash; use kaspa_p2p_lib::{ common::ProtocolError, convert::header::{HeaderFormat, Versioned}, - dequeue, dequeue_with_timeout, make_message, make_request, + dequeue_with_timeout, dequeue_with_timestamp, make_message, make_request, pb::{kaspad_message::Payload, InvRelayBlockMessage, RequestBlockLocatorMessage, RequestRelayBlocksMessage}, IncomingRoute, Router, SharedIncomingRoute, }; use kaspa_utils::channel::{JobSender, JobTrySendError as TrySendError}; -use std::{collections::VecDeque, sync::Arc}; +use std::{collections::VecDeque, sync::Arc, time::Instant}; pub struct RelayInvMessage { hash: Hash, @@ -26,6 +26,9 @@ pub struct RelayInvMessage { /// Indicates whether this inv is already known to be within orphan resolution range known_within_range: bool, + + // Time when this message was first dequeued -> of interest only for direct invs in conjunction with pergiee + timestamp: Option, } /// Encapsulates an incoming invs route which also receives data locally @@ -41,16 +44,21 @@ impl TwoWayIncomingRoute { pub fn enqueue_indirect_invs>(&mut self, iter: I, known_within_range: bool) { // All indirect invs are orphan roots; not all are known to be within orphan resolution range - self.indirect_invs.extend(iter.into_iter().map(|h| RelayInvMessage { hash: h, is_orphan_root: true, known_within_range })) + self.indirect_invs.extend(iter.into_iter().map(|h| RelayInvMessage { + hash: h, + is_orphan_root: true, + known_within_range, + timestamp: None, + })) } pub async fn dequeue(&mut self) -> Result { if let Some(inv) = self.indirect_invs.pop_front() { Ok(inv) } else { - let msg = dequeue!(self.incoming_route, Payload::InvRelayBlock)?; + let (msg, ts) = dequeue_with_timestamp!(self.incoming_route, Payload::InvRelayBlock)?; let inv = msg.try_into()?; - Ok(RelayInvMessage { hash: inv, is_orphan_root: false, known_within_range: false }) + Ok(RelayInvMessage { hash: inv, is_orphan_root: false, known_within_range: false, timestamp: Some(ts) }) } } } @@ -105,18 +113,27 @@ impl HandleRelayInvsFlow { return Err(ProtocolError::OtherOwned(format!("sent inv of an invalid block {}", inv.hash))); } _ => { - // Block is already known, skip to next inv debug!("Relay block {} already exists, continuing...", inv.hash); + if should_signal_perigee(&self.ctx, &inv, self.ctx.is_ibd_running()) { + self.spawn_perigee_timestamp_signal(inv.hash, inv.timestamp.unwrap(), false); + } continue; } } match self.ctx.get_orphan_roots_if_known(&session, inv.hash).await { - OrphanOutput::Unknown => {} // Keep processing this inv - OrphanOutput::NoRoots(_) => continue, // Existing orphan w/o missing roots + OrphanOutput::Unknown => {} // Keep processing this inv + OrphanOutput::NoRoots(_) => { + if should_signal_perigee(&self.ctx, &inv, self.ctx.is_ibd_running()) { + self.spawn_perigee_timestamp_signal(inv.hash, inv.timestamp.unwrap(), false); + } + } // Existing orphan w/o missing roots OrphanOutput::Roots(roots) => { // Known orphan with roots to enqueue self.enqueue_orphan_roots(inv.hash, roots, inv.known_within_range); + if should_signal_perigee(&self.ctx, &inv, self.ctx.is_ibd_running()) { + self.spawn_perigee_timestamp_signal(inv.hash, inv.timestamp.unwrap(), false); + } continue; } } @@ -131,8 +148,12 @@ impl HandleRelayInvsFlow { // We keep the request scope alive until consensus processes the block let Some((block, request_scope)) = self.request_block(inv.hash, self.msg_route.id(), self.header_format).await? else { debug!("Relay block {} was already requested from another peer, continuing...", inv.hash); + if should_signal_perigee(&self.ctx, &inv, self.ctx.is_ibd_running()) { + self.spawn_perigee_timestamp_signal(inv.hash, inv.timestamp.unwrap(), false); + } continue; }; + request_scope.report_obtained(); if block.is_header_only() { @@ -192,6 +213,9 @@ impl HandleRelayInvsFlow { } ancestor_batch } else { + if should_signal_perigee(&self.ctx, &inv, self.ctx.is_ibd_running()) { + self.spawn_perigee_timestamp_signal(inv.hash, inv.timestamp.unwrap(), false); + } continue; } } @@ -223,13 +247,26 @@ impl HandleRelayInvsFlow { // We spawn post-processing as a separate task so that this loop // can continue processing the following relay blocks let ctx = self.ctx.clone(); + let router = self.router.clone(); tokio::spawn(async move { ctx.on_new_block(&session, ancestor_batch, block, virtual_state_task).await; + if should_signal_perigee(&ctx, &inv, ctx.is_ibd_running()) { + ctx.maybe_add_perigee_timestamp(router, inv.hash, inv.timestamp.unwrap(), true).await; + } ctx.log_block_event(BlockLogEvent::Relay(inv.hash)); }); } } + fn spawn_perigee_timestamp_signal(&self, hash: Hash, timestamp: Instant, verify: bool) { + let ctx = self.ctx.clone(); + let router = self.router.clone(); + + tokio::spawn(async move { + ctx.maybe_add_perigee_timestamp(router, hash, timestamp, verify).await; + }); + } + fn enqueue_orphan_roots(&mut self, _orphan: Hash, roots: Vec, known_within_range: bool) { self.invs_route.enqueue_indirect_invs(roots, known_within_range) } @@ -376,3 +413,7 @@ impl HandleRelayInvsFlow { } } } + +fn should_signal_perigee(ctx: &FlowContext, inv: &RelayInvMessage, is_ibd_running: bool) -> bool { + !inv.is_orphan_root && ctx.is_perigee_active() && !is_ibd_running +} diff --git a/protocol/p2p/Cargo.toml b/protocol/p2p/Cargo.toml index b03a941b07..78fabb13e8 100644 --- a/protocol/p2p/Cargo.toml +++ b/protocol/p2p/Cargo.toml @@ -1,3 +1,6 @@ +[features] +test-utils = [] + [package] name = "kaspa-p2p-lib" description = "Kaspa p2p library" @@ -41,7 +44,7 @@ rand.workspace = true seqlock.workspace = true serde.workspace = true thiserror.workspace = true -tokio = { workspace = true, features = [ "rt-multi-thread", "macros", "signal" ] } +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } tokio-stream = { workspace = true, features = ["net"] } tonic = { workspace = true, features = ["tls", "gzip"] } uuid.workspace = true diff --git a/protocol/p2p/src/bin/client.rs b/protocol/p2p/src/bin/client.rs index b5cee2d38c..1865f9b33a 100644 --- a/protocol/p2p/src/bin/client.rs +++ b/protocol/p2p/src/bin/client.rs @@ -1,5 +1,5 @@ use kaspa_core::debug; -use kaspa_p2p_lib::echo::EchoFlowInitializer; +use kaspa_p2p_lib::{echo::EchoFlowInitializer, PeerOutboundType}; use std::{sync::Arc, time::Duration}; #[tokio::main] @@ -13,7 +13,8 @@ async fn main() { let ip_port = String::from("[::1]:50051"); for i in 0..1 { debug!("P2P, p2p_client::main - starting peer:{}", i); - let _peer_key = adaptor.connect_peer_with_retries(ip_port.clone(), 16, Duration::from_secs(1)).await; + let _peer_key = + adaptor.connect_peer_with_retries(ip_port.clone(), 16, Duration::from_secs(1), PeerOutboundType::UserSupplied).await; } // [2] - wait a few seconds and terminate tokio::time::sleep(Duration::from_secs(5)).await; diff --git a/protocol/p2p/src/bin/server.rs b/protocol/p2p/src/bin/server.rs index 9ce333911c..eb289b081f 100644 --- a/protocol/p2p/src/bin/server.rs +++ b/protocol/p2p/src/bin/server.rs @@ -1,5 +1,5 @@ use kaspa_core::debug; -use kaspa_p2p_lib::echo::EchoFlowInitializer; +use kaspa_p2p_lib::{echo::EchoFlowInitializer, PeerOutboundType}; use kaspa_utils::networking::NetAddress; use std::{str::FromStr, sync::Arc, time::Duration}; @@ -15,7 +15,8 @@ async fn main() { let ip_port = String::from("[::1]:16111"); for i in 0..1 { debug!("P2P, p2p_client::main - starting peer:{}", i); - let _peer_key = adaptor.connect_peer_with_retries(ip_port.clone(), 16, Duration::from_secs(1)).await; + let _peer_key = + adaptor.connect_peer_with_retries(ip_port.clone(), 16, Duration::from_secs(1), PeerOutboundType::UserSupplied).await; } // [2] - wait for ~60 sec and terminate tokio::time::sleep(Duration::from_secs(64)).await; diff --git a/protocol/p2p/src/common.rs b/protocol/p2p/src/common.rs index a0b314fb6d..6e538ddfad 100644 --- a/protocol/p2p/src/common.rs +++ b/protocol/p2p/src/common.rs @@ -179,6 +179,21 @@ macro_rules! unwrap_message_with_request_id { }}; } +#[macro_export] +macro_rules! unwrap_message_with_timestamp { + ($op:expr, $pattern:path) => {{ + if let Some(msg) = $op { + if let Some($pattern(inner_msg)) = msg.payload { + Ok((inner_msg, Instant::now())) + } else { + Err($crate::common::ProtocolError::UnexpectedMessage(stringify!($pattern), msg.payload.as_ref().map(|v| v.into()))) + } + } else { + Err($crate::common::ProtocolError::ConnectionClosed) + } + }}; +} + /// Macro to await a channel `Receiver::recv` call with a default/specified timeout and expect a specific payload type. /// Usage: /// ```ignore @@ -218,6 +233,13 @@ macro_rules! dequeue { }}; } +#[macro_export] +macro_rules! dequeue_with_timestamp { + ($receiver:expr, $pattern:path) => {{ + $crate::unwrap_message_with_timestamp!($receiver.recv().await, $pattern) + }}; +} + #[macro_export] macro_rules! dequeue_with_request_id { ($receiver:expr, $pattern:path) => {{ diff --git a/protocol/p2p/src/core/adaptor.rs b/protocol/p2p/src/core/adaptor.rs index 4c3872702d..dd4138855d 100644 --- a/protocol/p2p/src/core/adaptor.rs +++ b/protocol/p2p/src/core/adaptor.rs @@ -1,5 +1,6 @@ use crate::common::ProtocolError; use crate::core::hub::Hub; +use crate::core::peer::PeerOutboundType; use crate::ConnectionError; use crate::{core::connection_handler::ConnectionHandler, Router}; use kaspa_utils::networking::NetAddress; @@ -70,8 +71,8 @@ impl Adaptor { } /// Connect to a new peer (no retries) - pub async fn connect_peer(&self, peer_address: String) -> Result { - self.connection_handler.connect_with_retry(peer_address, 1, Default::default()).await.map(|r| r.key()) + pub async fn connect_peer(&self, peer_address: String, outbound_type: PeerOutboundType) -> Result { + self.connection_handler.connect_with_retry(peer_address, 1, Default::default(), outbound_type).await.map(|r| r.key()) } /// Connect to a new peer (with params controlling retry behavior) @@ -80,8 +81,9 @@ impl Adaptor { peer_address: String, retry_attempts: u8, retry_interval: Duration, + outbound_type: PeerOutboundType, ) -> Result { - self.connection_handler.connect_with_retry(peer_address, retry_attempts, retry_interval).await.map(|r| r.key()) + self.connection_handler.connect_with_retry(peer_address, retry_attempts, retry_interval, outbound_type).await.map(|r| r.key()) } /// Terminates all peers and cleans up any additional async resources diff --git a/protocol/p2p/src/core/connection_handler.rs b/protocol/p2p/src/core/connection_handler.rs index 54d387043c..7dc15dd4a7 100644 --- a/protocol/p2p/src/core/connection_handler.rs +++ b/protocol/p2p/src/core/connection_handler.rs @@ -1,5 +1,6 @@ use crate::common::ProtocolError; use crate::core::hub::HubEvent; +use crate::core::peer::PeerOutboundType; use crate::pb::{ p2p_client::P2pClient as ProtoP2pClient, p2p_server::P2p as ProtoP2p, p2p_server::P2pServer as ProtoP2pServer, KaspadMessage, }; @@ -94,7 +95,7 @@ impl ConnectionHandler { } /// Connect to a new peer - pub(crate) async fn connect(&self, peer_address: String) -> Result, ConnectionError> { + pub(crate) async fn connect(&self, peer_address: String, outbound_type: PeerOutboundType) -> Result, ConnectionError> { let Some(socket_address) = peer_address.to_socket_addrs()?.next() else { return Err(ConnectionError::NoAddress); }; @@ -120,7 +121,7 @@ impl ConnectionHandler { let (outgoing_route, outgoing_receiver) = mpsc_channel(Self::outgoing_network_channel_size()); let incoming_stream = client.message_stream(ReceiverStream::new(outgoing_receiver)).await?.into_inner(); - let router = Router::new(socket_address, true, self.hub_sender.clone(), incoming_stream, outgoing_route).await; + let router = Router::new(socket_address, Some(outbound_type), self.hub_sender.clone(), incoming_stream, outgoing_route).await; // For outbound peers, we perform the initialization as part of the connect logic match self.initializer.initialize_connection(router.clone()).await { @@ -147,22 +148,29 @@ impl ConnectionHandler { address: String, retry_attempts: u8, retry_interval: Duration, + outbound_type: PeerOutboundType, ) -> Result, ConnectionError> { let mut counter = 0; loop { counter += 1; - match self.connect(address.clone()).await { + match self.connect(address.clone(), outbound_type).await { Ok(router) => { - debug!("P2P, Client connected, peer: {:?}", address); + debug!("P2P, Client connected, peer: {:?}, outbound type: {} ", address, outbound_type); return Ok(router); } Err(ConnectionError::ProtocolError(err)) => { // On protocol errors we avoid retrying - debug!("P2P, connect retry #{} failed with error {:?}, peer: {:?}, aborting retries", counter, err, address); + debug!( + "P2P, connect retry #{} failed with error {:?}, peer: {:?}, outbound type: {}", + counter, err, address, outbound_type + ); return Err(ConnectionError::ProtocolError(err)); } Err(err) => { - debug!("P2P, connect retry #{} failed with error {:?}, peer: {:?}", counter, err, address); + debug!( + "P2P, connect retry #{} failed with error {:?}, peer: {:?}, outbound type: {}", + counter, err, address, outbound_type + ); if counter < retry_attempts { // Await `retry_interval` time before retrying tokio::time::sleep(retry_interval).await; @@ -212,7 +220,7 @@ impl ProtoP2p for ConnectionHandler { let incoming_stream = request.into_inner(); // Build the router object - let router = Router::new(remote_address, false, self.hub_sender.clone(), incoming_stream, outgoing_route).await; + let router = Router::new(remote_address, None, self.hub_sender.clone(), incoming_stream, outgoing_route).await; // Notify the central Hub about the new peer self.hub_sender.send(HubEvent::NewPeer(router)).await.expect("hub receiver should never drop before senders"); diff --git a/protocol/p2p/src/core/hub.rs b/protocol/p2p/src/core/hub.rs index 2ac0f03405..2c62c0c0a0 100644 --- a/protocol/p2p/src/core/hub.rs +++ b/protocol/p2p/src/core/hub.rs @@ -39,12 +39,22 @@ impl Hub { HubEvent::NewPeer(new_router) => { // If peer is outbound then connection initialization was already performed as part of the connect logic if new_router.is_outbound() { - info!("P2P Connected to outgoing peer {} (outbound: {})", new_router, self.peers_query(true) + 1); + info!( + "P2P Connected to outgoing peer {} (protocol ver: {}, outbound: {})", + new_router, + new_router.protocol_version(), + self.peers_query(true) + 1 + ); self.insert_new_router(new_router).await; } else { match initializer.initialize_connection(new_router.clone()).await { Ok(()) => { - info!("P2P Connected to incoming peer {} (inbound: {})", new_router, self.peers_query(false) + 1); + info!( + "P2P Connected to incoming peer {} (protocol ver: {}, inbound: {})", + new_router, + new_router.protocol_version(), + self.peers_query(false) + 1 + ); self.insert_new_router(new_router).await; } Err(err) => { @@ -192,8 +202,22 @@ impl Hub { } /// Returns a list of all currently active peers - pub fn active_peers(&self) -> Vec { - self.peers.read().values().map(|r| r.as_ref().into()).collect() + pub fn active_peers(&self, include_perigee_data: bool) -> Vec { + self.peers.read().values().map(|r| (r.as_ref(), include_perigee_data).into()).collect() + } + + pub fn random_graph_routers(&self) -> Vec> { + self.peers.read().values().filter(|r| r.is_random_graph()).cloned().collect() + } + + pub fn perigee_routers(&self) -> Vec> { + self.peers.read().values().filter(|r| r.is_perigee()).cloned().collect() + } + + pub async fn clear_perigee_timestamps(&self) { + for router in self.peers.read().values().filter(|r| r.is_outbound()) { + router.clear_perigee_timestamps(); + } } /// Returns the number of currently active peers diff --git a/protocol/p2p/src/core/peer.rs b/protocol/p2p/src/core/peer.rs index 90b52e9ee3..914704857d 100644 --- a/protocol/p2p/src/core/peer.rs +++ b/protocol/p2p/src/core/peer.rs @@ -1,6 +1,23 @@ -use kaspa_consensus_core::subnets::SubnetworkId; +use kaspa_consensus_core::{subnets::SubnetworkId, Hash as BlockHash}; use kaspa_utils::networking::{IpAddress, PeerId}; -use std::{fmt::Display, net::SocketAddr, sync::Arc, time::Instant}; +use std::{collections::HashMap, fmt::Display, hash::Hash, net::SocketAddr, sync::Arc, time::Instant}; + +#[derive(Copy, Debug, Clone)] +pub enum PeerOutboundType { + Perigee, + RandomGraph, + UserSupplied, +} + +impl Display for PeerOutboundType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PeerOutboundType::Perigee => write!(f, "perigee"), + PeerOutboundType::RandomGraph => write!(f, "random graph"), + PeerOutboundType::UserSupplied => write!(f, "user supplied"), + } + } +} #[derive(Debug, Clone, Default)] pub struct PeerProperties { @@ -13,26 +30,28 @@ pub struct PeerProperties { pub time_offset: i64, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Peer { identity: PeerId, net_address: SocketAddr, - is_outbound: bool, + outbound_type: Option, connection_started: Instant, properties: Arc, last_ping_duration: u64, + perigee_timestamps: Arc>, } impl Peer { pub fn new( identity: PeerId, net_address: SocketAddr, - is_outbound: bool, + outbound_type: Option, connection_started: Instant, properties: Arc, last_ping_duration: u64, + perigee_timestamps: Arc>, ) -> Self { - Self { identity, net_address, is_outbound, connection_started, properties, last_ping_duration } + Self { identity, net_address, outbound_type, connection_started, properties, last_ping_duration, perigee_timestamps } } /// Internal identity of this peer @@ -49,9 +68,29 @@ impl Peer { self.into() } + pub fn outbound_type(&self) -> Option { + self.outbound_type + } + /// Indicates whether this connection is an outbound connection pub fn is_outbound(&self) -> bool { - self.is_outbound + self.outbound_type.is_some() + } + + pub fn is_user_supplied(&self) -> bool { + matches!(self.outbound_type, Some(PeerOutboundType::UserSupplied)) + } + + pub fn is_perigee(&self) -> bool { + matches!(self.outbound_type, Some(PeerOutboundType::Perigee)) + } + + pub fn is_random_graph(&self) -> bool { + matches!(self.outbound_type, Some(PeerOutboundType::RandomGraph)) + } + + pub fn connection_started(&self) -> Instant { + self.connection_started } pub fn time_connected(&self) -> u64 { @@ -65,23 +104,49 @@ impl Peer { pub fn last_ping_duration(&self) -> u64 { self.last_ping_duration } + + pub fn perigee_timestamps(&self) -> Arc> { + self.perigee_timestamps.clone() + } } -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +#[derive(Debug, Copy, Clone)] pub struct PeerKey { identity: PeerId, ip: IpAddress, + /// port is ignored for equality and hashing, but useful for reconstructing the socket address from the key only. + port: u16, } impl PeerKey { - pub fn new(identity: PeerId, ip: IpAddress) -> Self { - Self { identity, ip } + pub fn new(identity: PeerId, ip: IpAddress, port: u16) -> Self { + Self { identity, ip, port } + } + + pub fn sock_addr(&self) -> SocketAddr { + SocketAddr::new(self.ip.into(), self.port) + } +} + +impl Hash for PeerKey { + // Custom hash implementation that ignores port + fn hash(&self, state: &mut H) { + self.identity.hash(state); + self.ip.hash(state); + } +} + +impl PartialEq for PeerKey { + fn eq(&self, other: &Self) -> bool { + self.identity == other.identity && self.ip == other.ip } } +impl Eq for PeerKey {} + impl From<&Peer> for PeerKey { fn from(value: &Peer) -> Self { - Self::new(value.identity, value.net_address.ip().into()) + Self::new(value.identity, value.net_address.ip().into(), value.net_address.port()) } } @@ -90,3 +155,39 @@ impl Display for PeerKey { write!(f, "{}+{}", self.identity, self.ip) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + use std::net::IpAddr; + use uuid::Uuid; + + #[test] + fn test_peer_key_equality() { + let peer1 = PeerKey::new(PeerId::new(Uuid::from_u128(1u128)), IpAddr::V4([192, 168, 1, 1].into()).into(), 8080); + let peer2 = PeerKey::new(PeerId::new(Uuid::from_u128(1u128)), IpAddr::V4([192, 168, 1, 1].into()).into(), 9090); + let peer3 = PeerKey::new(PeerId::new(Uuid::from_u128(2u128)), IpAddr::V4([192, 168, 1, 1].into()).into(), 8080); + + assert_eq!(peer1, peer2); + assert_ne!(peer1, peer3); + } + + #[test] + fn test_peer_key_hashing() { + let peer1 = PeerKey::new(PeerId::new(Uuid::from_u128(1u128)), IpAddr::V4([192, 168, 1, 1].into()).into(), 8080); + + let peer2 = PeerKey::new(PeerId::new(Uuid::from_u128(1u128)), IpAddr::V4([192, 168, 1, 1].into()).into(), 9090); + + let mut hasher1 = DefaultHasher::new(); + peer1.hash(&mut hasher1); + let hash1 = hasher1.finish(); + + let mut hasher2 = DefaultHasher::new(); + peer2.hash(&mut hasher2); + let hash2 = hasher2.finish(); + + assert_eq!(hash1, hash2); + } +} diff --git a/protocol/p2p/src/core/router.rs b/protocol/p2p/src/core/router.rs index 55caf26c43..68d34310fb 100644 --- a/protocol/p2p/src/core/router.rs +++ b/protocol/p2p/src/core/router.rs @@ -1,18 +1,21 @@ use crate::core::hub::HubEvent; +use crate::core::peer::PeerOutboundType; use crate::pb::RejectMessage; use crate::pb::{kaspad_message::Payload as KaspadMessagePayload, KaspadMessage}; use crate::{common::ProtocolError, KaspadMessagePayloadType}; use crate::{make_message, Peer}; +use kaspa_consensus_core::Hash; use kaspa_core::{debug, error, info, trace, warn}; use kaspa_utils::networking::PeerId; use parking_lot::{Mutex, RwLock}; use seqlock::SeqLock; +use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::net::SocketAddr; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use std::time::Instant; -use std::{collections::HashMap, sync::Arc}; use tokio::select; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{channel as mpsc_channel, Receiver as MpscReceiver, Sender as MpscSender}; @@ -102,6 +105,8 @@ struct RouterMutableState { /// Duration of the last ping to this peer last_ping_duration: u64, + + perigee_timestamps: HashMap, } impl RouterMutableState { @@ -120,8 +125,8 @@ pub struct Router { /// The socket address of this peer net_address: SocketAddr, - /// Indicates whether this connection is an outbound connection - is_outbound: bool, + /// Indicates whether this connection is an outbound connection, and if so under which outbound type + outbound_type: Option, /// Time of creation of this object and the connection it holds connection_started: Instant, @@ -149,19 +154,27 @@ impl Display for Router { impl From<&Router> for PeerKey { fn from(value: &Router) -> Self { - Self::new(value.identity.read(), value.net_address.ip().into()) + Self::new(value.identity.read(), value.net_address.ip().into(), value.net_address.port()) } } -impl From<&Router> for Peer { - fn from(router: &Router) -> Self { +impl From<(&Router, bool)> for Peer { + /// the bool indicates whether to include perigee data + fn from(item: (&Router, bool)) -> Self { + let (router, include_perigee_data) = item; Self::new( router.identity(), router.net_address, - router.is_outbound, + router.outbound_type, router.connection_started, router.properties(), router.last_ping_duration(), + if include_perigee_data { + let perigee_timestamps = router.perigee_timestamps(); + Arc::new(perigee_timestamps) + } else { + Arc::new(HashMap::new()) + }, ) } } @@ -175,7 +188,7 @@ fn message_summary(msg: &KaspadMessage) -> impl Debug { impl Router { pub(crate) async fn new( net_address: SocketAddr, - is_outbound: bool, + outbound_type: Option, hub_sender: MpscSender, mut incoming_stream: Streaming, outgoing_route: MpscSender, @@ -186,7 +199,7 @@ impl Router { let router = Arc::new(Router { identity: Default::default(), net_address, - is_outbound, + outbound_type, connection_started: Instant::now(), routing_map_by_type: RwLock::new(HashMap::new()), routing_map_by_id: RwLock::new(HashMap::new()), @@ -266,7 +279,19 @@ impl Router { /// Indicates whether this connection is an outbound connection pub fn is_outbound(&self) -> bool { - self.is_outbound + self.outbound_type.is_some() + } + + pub fn is_user_supplied(&self) -> bool { + matches!(self.outbound_type, Some(PeerOutboundType::UserSupplied)) + } + + pub fn is_perigee(&self) -> bool { + matches!(self.outbound_type, Some(PeerOutboundType::Perigee)) + } + + pub fn is_random_graph(&self) -> bool { + matches!(self.outbound_type, Some(PeerOutboundType::RandomGraph)) } pub fn connection_started(&self) -> Instant { @@ -290,6 +315,18 @@ impl Router { self.mutable_state.lock().last_ping_duration = last_ping_duration; } + pub fn add_perigee_timestamp(&self, hash: Hash, timestamp: Instant) { + self.mutable_state.lock().perigee_timestamps.insert(hash, timestamp); + } + + pub fn clear_perigee_timestamps(&self) { + self.mutable_state.lock().perigee_timestamps.clear(); + } + + pub fn perigee_timestamps(&self) -> HashMap { + self.mutable_state.lock().perigee_timestamps.clone() + } + pub fn last_ping_duration(&self) -> u64 { self.mutable_state.lock().last_ping_duration } @@ -298,6 +335,10 @@ impl Router { 256 } + pub fn protocol_version(&self) -> u32 { + self.mutable_state.lock().properties.protocol_version + } + /// Send a signal to start this router's receive loop pub fn start(&self) { // Acquire state mutex and send the start signal @@ -470,3 +511,43 @@ fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> { err = err.source()?; } } + +// --- TEST UTILS --- +#[cfg(feature = "test-utils")] +pub trait RouterTestExt { + fn test_new( + identity: PeerId, + net_address: std::net::SocketAddr, + outbound_type: Option, + connection_started: std::time::Instant, + ) -> std::sync::Arc + where + Self: Sized; +} + +#[cfg(any(test, feature = "test-utils"))] +impl RouterTestExt for Router { + fn test_new( + identity: PeerId, + net_address: std::net::SocketAddr, + outbound_type: Option, + connection_started: std::time::Instant, + ) -> std::sync::Arc { + use tokio::sync::mpsc; + let (hub_sender, _hub_receiver) = mpsc::channel(1); + let (outgoing_route, _outgoing_receiver) = mpsc::channel(1); + // Create a dummy streaming object (not actually used in this test context) + // let dummy_stream = Streaming::::new_empty(...); // not needed for struct + std::sync::Arc::new(Router { + identity: seqlock::SeqLock::new(identity), + net_address, + outbound_type, + connection_started, + routing_map_by_type: parking_lot::RwLock::new(std::collections::HashMap::new()), + routing_map_by_id: parking_lot::RwLock::new(std::collections::HashMap::new()), + outgoing_route, + hub_sender, + mutable_state: parking_lot::Mutex::new(RouterMutableState::new(None, None)), + }) + } +} diff --git a/protocol/p2p/src/echo.rs b/protocol/p2p/src/echo.rs index 07a26aac6b..78ea88caf1 100644 --- a/protocol/p2p/src/echo.rs +++ b/protocol/p2p/src/echo.rs @@ -148,7 +148,7 @@ mod tests { use std::{str::FromStr, time::Duration}; use super::*; - use crate::{Adaptor, Hub}; + use crate::{Adaptor, Hub, PeerOutboundType}; use kaspa_core::debug; use kaspa_utils::networking::NetAddress; @@ -164,15 +164,15 @@ mod tests { // Initiate the connection from `adaptor1` (outbound) to `adaptor2` (inbound) let peer2_id = adaptor1 - .connect_peer_with_retries(String::from("[::1]:50054"), 16, Duration::from_secs(1)) + .connect_peer_with_retries(String::from("[::1]:50054"), 16, Duration::from_secs(1), PeerOutboundType::UserSupplied) .await .expect("peer connection failed"); // Wait for handshake completion tokio::time::sleep(std::time::Duration::from_secs(2)).await; - let adaptor1_initial_peers = adaptor1.active_peers(); - let adaptor2_initial_peers = adaptor2.active_peers(); + let adaptor1_initial_peers = adaptor1.active_peers(false); + let adaptor2_initial_peers = adaptor2.active_peers(false); // For now assert the handshake by checking the peer exists (since peer is removed on handshake error) assert_eq!(adaptor1_initial_peers.len(), 1, "handshake failed -- outbound peer is missing"); @@ -185,8 +185,8 @@ mod tests { tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Make sure the peers are cleaned-up on both sides - assert_eq!(adaptor1.active_peers().len(), 0, "peer termination failed -- outbound peer was not removed"); - assert_eq!(adaptor2.active_peers().len(), 0, "peer termination failed -- inbound peer was not removed"); + assert_eq!(adaptor1.active_peers(false).len(), 0, "peer termination failed -- outbound peer was not removed"); + assert_eq!(adaptor2.active_peers(false).len(), 0, "peer termination failed -- inbound peer was not removed"); adaptor1.close().await; adaptor2.close().await; diff --git a/protocol/p2p/src/lib.rs b/protocol/p2p/src/lib.rs index 6fde0d48fb..b4f87d610b 100644 --- a/protocol/p2p/src/lib.rs +++ b/protocol/p2p/src/lib.rs @@ -1,3 +1,5 @@ +#[cfg(any(test, feature = "test-utils"))] +pub mod test_utils; pub mod pb { // this one includes messages.proto + p2p.proto + rcp.proto tonic::include_proto!("protowire"); @@ -14,6 +16,6 @@ pub use crate::core::adaptor::{Adaptor, ConnectionInitializer}; pub use crate::core::connection_handler::ConnectionError; pub use crate::core::hub::Hub; pub use crate::core::payload_type::KaspadMessagePayloadType; -pub use crate::core::peer::{Peer, PeerKey, PeerProperties}; +pub use crate::core::peer::{Peer, PeerKey, PeerOutboundType, PeerProperties}; pub use crate::core::router::{IncomingRoute, Router, SharedIncomingRoute, BLANK_ROUTE_ID}; pub use handshake::KaspadHandshake; diff --git a/protocol/p2p/src/test_utils.rs b/protocol/p2p/src/test_utils.rs new file mode 100644 index 0000000000..44afbd76d5 --- /dev/null +++ b/protocol/p2p/src/test_utils.rs @@ -0,0 +1,5 @@ +//! Test utilities for kaspa_p2p_lib, only available with the "test-utils" feature. +#![allow(dead_code)] + +#[cfg(any(test, feature = "test-utils"))] +pub use crate::core::router::RouterTestExt; diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index 248336f49c..a2633bceb6 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -988,7 +988,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and } let peer_address = request.peer_address.normalize(self.config.net.default_p2p_port()); if let Some(connection_manager) = self.flow_context.connection_manager() { - connection_manager.add_connection_request(peer_address.into(), request.is_permanent).await; + connection_manager.add_connection_requests(vec![(peer_address.into(), request.is_permanent)]).await; } else { return Err(RpcError::NoConnectionManager); } @@ -1040,7 +1040,7 @@ NOTE: This error usually indicates an RPC conversion error between the node and _connection: Option<&DynRpcConnection>, _: GetConnectedPeerInfoRequest, ) -> RpcResult { - let peers = self.flow_context.hub().active_peers(); + let peers = self.flow_context.hub().active_peers(false); let peer_info = self.protocol_converter.get_peers_info(&peers); Ok(GetConnectedPeerInfoResponse::new(peer_info)) } diff --git a/utils/src/networking.rs b/utils/src/networking.rs index b7a3397780..456321fb27 100644 --- a/utils/src/networking.rs +++ b/utils/src/networking.rs @@ -16,7 +16,7 @@ use wasm_bindgen::prelude::*; const TS_IP_ADDRESS: &'static str = r#" /** * Generic network address representation. - * + * * @category General */ export interface INetworkAddress {