diff --git a/Cargo.lock b/Cargo.lock index 073165e5b..ae96aab56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1445,6 +1445,7 @@ dependencies = [ name = "carbon-core" version = "0.12.0" dependencies = [ + "ahash 0.8.12", "async-trait", "axum 0.8.7", "base64 0.22.1", @@ -1454,10 +1455,13 @@ dependencies = [ "carbon-macros", "carbon-proc-macros", "carbon-test-utils", + "hashbrown 0.14.5", "juniper", "juniper_axum", "log", + "lru 0.12.5", "num-traits", + "once_cell", "serde", "serde_json", "solana-account", @@ -1592,14 +1596,14 @@ dependencies = [ "base64 0.22.1", "carbon-core", "log", - "reqwest", + "reqwest 0.12.24", "serde", "serde_json", "solana-account", "solana-client", "solana-pubkey 3.0.0", "tokio", - "tokio-util", + "tokio-util 0.7.17", ] [[package]] @@ -1713,11 +1717,20 @@ dependencies = [ name = "carbon-jupiter-swap-decoder" version = "0.12.0" dependencies = [ + "async-trait", + "base64 0.22.1", + "borsh", "carbon-core", + "carbon-test-utils", + "juniper", "serde", + "serde-big-array", + "serde_json", "solana-account", "solana-instruction", "solana-pubkey 3.0.0", + "sqlx", + "sqlx_migrator", ] [[package]] @@ -2169,7 +2182,7 @@ dependencies = [ "solana-commitment-config", "solana-pubkey 3.0.0", "tokio", - "tokio-util", + "tokio-util 0.7.17", ] [[package]] @@ -4017,6 +4030,10 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash 0.8.12", + "allocator-api2", +] [[package]] name = "hashbrown" @@ -5354,6 +5371,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -7034,7 +7060,7 @@ dependencies = [ "cc", "libc", "libm", - "lru", + "lru 0.7.8", "parking_lot 0.11.2", "smallvec", "spin", @@ -9131,7 +9157,7 @@ dependencies = [ "indexmap 2.12.1", "itertools 0.12.1", "log", - "lru", + "lru 0.7.8", "num-traits", "rand 0.8.5", "rand_chacha 0.3.1", @@ -9348,7 +9374,7 @@ dependencies = [ "itertools 0.12.1", "lazy-lru", "log", - "lru", + "lru 0.7.8", "mockall", "num_cpus", "num_enum", @@ -11090,7 +11116,7 @@ checksum = "bffb1d9707e2170bdc1e2828d63398e559f77b76002b18c82fb9399fc06ab8ff" dependencies = [ "async-trait", "log", - "lru", + "lru 0.7.8", "quinn", "rustls 0.23.35", "solana-clock", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 7f92b0aa7..e9a3fa9e5 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -38,6 +38,10 @@ thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } uuid = { workspace = true, features = ["v4"] } +ahash = "0.8" +hashbrown = "0.14" +lru = "0.12" +once_cell = "1" # Optional macro dependencies carbon-macros = { workspace = true, optional = true } diff --git a/crates/core/src/account_utils.rs b/crates/core/src/account_utils.rs index 2d6cf9101..c880e004d 100644 --- a/crates/core/src/account_utils.rs +++ b/crates/core/src/account_utils.rs @@ -6,21 +6,45 @@ use solana_pubkey::Pubkey; /// # Usage /// - Use with `?` to indicate the account is required: /// ``` -/// let required = next_account(&mut iter)?; +/// use carbon_core::account_utils::next_account; +/// use solana_instruction::AccountMeta; +/// use solana_pubkey::Pubkey; +/// +/// let accounts = vec![ +/// AccountMeta::new(Pubkey::new_unique(), false), +/// AccountMeta::new(Pubkey::new_unique(), false), +/// ]; +/// let mut iter = accounts.iter(); +/// let required = next_account(&mut iter).ok_or("missing account")?; // propagates None if missing +/// Ok::<(), &'static str>(()) /// ``` -/// This will propagate `None` if the account is missing. /// - Use without `?` to handle optional accounts: /// ``` -/// let optional = next_account(&mut iter); +/// use carbon_core::account_utils::next_account; +/// use solana_instruction::AccountMeta; +/// use solana_pubkey::Pubkey; +/// +/// let accounts = vec![AccountMeta::new(Pubkey::new_unique(), false)]; +/// let mut iter = accounts.iter(); +/// let optional = next_account(&mut iter); // Option +/// Ok::<(), &'static str>(()) /// ``` -/// This returns `Option` that you can match or use directly. /// /// # Example /// ``` +/// use carbon_core::account_utils::next_account; +/// use solana_instruction::AccountMeta; +/// use solana_pubkey::Pubkey; +/// +/// let accounts = vec![ +/// AccountMeta::new(Pubkey::new_unique(), false), +/// AccountMeta::new(Pubkey::new_unique(), false), +/// ]; /// let mut iter = accounts.iter(); -/// let required = next_account(&mut iter)?; // required account -/// let optional = next_account(&mut iter); // optional account -/// ` +/// let required = next_account(&mut iter).ok_or("missing account")?; // required account +/// let optional = next_account(&mut iter); // optional account +/// Ok::<(), &'static str>(()) +/// ``` pub fn next_account<'a>(iter: &mut impl Iterator) -> Option { Some(iter.next()?.pubkey) } diff --git a/crates/core/src/datasource.rs b/crates/core/src/datasource.rs index e154581d9..242cf7303 100644 --- a/crates/core/src/datasource.rs +++ b/crates/core/src/datasource.rs @@ -194,6 +194,10 @@ impl DatasourceId { pub fn new_named(name: &str) -> Self { Self(name.to_string()) } + + pub fn as_str(&self) -> &str { + &self.0 + } } /// Represents a data update in the `carbon-core` pipeline, encompassing diff --git a/crates/core/src/filter.rs b/crates/core/src/filter.rs index a0efe9332..7de9a3d51 100644 --- a/crates/core/src/filter.rs +++ b/crates/core/src/filter.rs @@ -37,9 +37,13 @@ //! Custom filter implementation: //! ``` //! use carbon_core::{ -//! datasource::{DatasourceId, BlockDetails}, +//! account::AccountMetadata, +//! datasource::{AccountDeletion, DatasourceId, BlockDetails}, //! filter::Filter, +//! instruction::{NestedInstruction, NestedInstructions}, +//! transaction::TransactionMetadata, //! }; +//! use solana_account::Account; //! //! struct BlockHeightFilter { //! min_height: u64, @@ -51,14 +55,19 @@ //! _datasource_id: &DatasourceId, //! block_details: &BlockDetails, //! ) -> bool { -//! block_details.block_height >= self.min_height +//! block_details.block_height.unwrap_or(0) >= self.min_height //! } //! //! // Implement other methods with default behavior -//! fn filter_account(&self, _: &DatasourceId, _: &_, _: &_) -> bool { true } -//! fn filter_instruction(&self, _: &DatasourceId, _: &_) -> bool { true } -//! fn filter_transaction(&self, _: &DatasourceId, _: &_, _: &_) -> bool { true } -//! fn filter_account_deletion(&self, _: &DatasourceId, _: &_) -> bool { true } +//! fn filter_account(&self, _: &DatasourceId, _: &AccountMetadata, _: &Account) -> bool { true } +//! fn filter_instruction(&self, _: &DatasourceId, _: &NestedInstruction) -> bool { true } +//! fn filter_transaction( +//! &self, +//! _: &DatasourceId, +//! _: &TransactionMetadata, +//! _: &NestedInstructions, +//! ) -> bool { true } +//! fn filter_account_deletion(&self, _: &DatasourceId, _: &AccountDeletion) -> bool { true } //! } //! ``` @@ -96,9 +105,13 @@ use crate::{ /// A simple datasource-based filter: /// ``` /// use carbon_core::{ -/// datasource::{DatasourceId, BlockDetails}, +/// account::AccountMetadata, +/// datasource::{AccountDeletion, DatasourceId, BlockDetails}, /// filter::Filter, +/// instruction::{NestedInstruction, NestedInstructions}, +/// transaction::TransactionMetadata, /// }; +/// use solana_account::Account; /// /// struct MyFilter { /// allowed_datasource: DatasourceId, @@ -114,10 +127,15 @@ use crate::{ /// } /// /// // Implement other methods with default behavior -/// fn filter_account(&self, _: &DatasourceId, _: &_, _: &_) -> bool { true } -/// fn filter_instruction(&self, _: &DatasourceId, _: &_) -> bool { true } -/// fn filter_transaction(&self, _: &DatasourceId, _: &_, _: &_) -> bool { true } -/// fn filter_account_deletion(&self, _: &DatasourceId, _: &_) -> bool { true } +/// fn filter_account(&self, _: &DatasourceId, _: &AccountMetadata, _: &Account) -> bool { true } +/// fn filter_instruction(&self, _: &DatasourceId, _: &NestedInstruction) -> bool { true } +/// fn filter_transaction( +/// &self, +/// _: &DatasourceId, +/// _: &TransactionMetadata, +/// _: &NestedInstructions, +/// ) -> bool { true } +/// fn filter_account_deletion(&self, _: &DatasourceId, _: &AccountDeletion) -> bool { true } /// } /// ``` pub trait Filter { @@ -260,7 +278,7 @@ pub trait Filter { /// /// Using with pipeline builders: /// ``` -/// use carbon_core::{datasource::DatasourceId, filter::DatasourceFilter}; +/// use carbon_core::{datasource::DatasourceId, filter::{DatasourceFilter, Filter}}; /// /// let filter = DatasourceFilter::new(DatasourceId::new_named("mainnet")); /// let filters = vec![Box::new(filter) as Box]; diff --git a/crates/core/src/pipeline.rs b/crates/core/src/pipeline.rs index 6e853ceb4..e8d84e1a0 100644 --- a/crates/core/src/pipeline.rs +++ b/crates/core/src/pipeline.rs @@ -74,10 +74,89 @@ use { }, core::time, serde::de::DeserializeOwned, - std::{convert::TryInto, sync::Arc, time::Instant}, + std::{ + collections::{HashMap, VecDeque}, + convert::TryInto, + hash::Hash, + sync::Arc, + time::{Duration, Instant}, + }, tokio_util::sync::CancellationToken, }; +use ahash::AHasher; +use ahash::RandomState; +use hashbrown::HashSet; +use lru::LruCache; +use once_cell::sync::Lazy; +use solana_signature::Signature; +use std::hash::Hasher; +use std::num::NonZeroUsize; +use std::sync::RwLock; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +struct AccountKey { + pubkey: solana_pubkey::Pubkey, + owner: solana_pubkey::Pubkey, + lamports: u64, + rent_epoch: u64, + executable: bool, + data_fingerprint: u64, + slot: u64, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +struct DeletionKey { + pubkey: solana_pubkey::Pubkey, + slot: u64, +} + +struct DedupeCache { + hot: HashSet, + cold: HashSet, + last_rotate: Instant, + rotate_every: Duration, +} + +impl DedupeCache { + fn new(capacity: usize, rotate_every: Duration) -> Self { + Self { + hot: HashSet::with_capacity_and_hasher(capacity, RandomState::default()), + cold: HashSet::with_capacity_and_hasher(capacity, RandomState::default()), + last_rotate: Instant::now(), + rotate_every, + } + } + + #[inline] + fn insert_if_fresh(&mut self, key: K) -> bool { + if self.hot.contains(&key) { + return false; + } + if self.cold.contains(&key) { + return false; + } + self.hot.insert(key); + if self.last_rotate.elapsed() >= self.rotate_every { + self.rotate(); + } + true + } + + #[inline] + fn rotate(&mut self) { + std::mem::swap(&mut self.hot, &mut self.cold); + self.hot.clear(); + self.last_rotate = Instant::now(); + } +} + +#[derive(Default)] +struct DatasourceStats { + last_slot: u64, + wins_per_minute: VecDeque<(u64, u64)>, +} + /// Defines the shutdown behavior for the pipeline. /// /// `ShutdownStrategy` determines how the pipeline will behave when it receives @@ -116,7 +195,7 @@ pub enum ShutdownStrategy { /// `channel_buffer_size` is not explicitly set during pipeline construction. /// /// The default size is 10,000 updates, which provides a reasonable balance -pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000; +pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 10_000; /// Represents the primary data processing pipeline in the `carbon-core` /// framework. @@ -220,9 +299,185 @@ pub struct Pipeline { pub datasource_cancellation_token: Option, pub shutdown_strategy: ShutdownStrategy, pub channel_buffer_size: usize, + + // dedupe caches + tx_dedupe: LruCache, + account_dedupe: DedupeCache, + deletion_dedupe: DedupeCache, + block_dedupe: DedupeCache, + + // per datasource stats + datasource_stats: HashMap, + + // slot -> block_time index to stabilize timestamps for early arriving txs + slot_time_index: LruCache, +} + +static DATASOURCE_LAST_SLOTS: Lazy>> = + Lazy::new(|| RwLock::new(HashMap::new())); + +#[inline] +fn extract_slot(update: &Update) -> Option { + match update { + Update::Transaction(tx) => Some(tx.slot), + Update::Account(acc) => Some(acc.slot), + Update::AccountDeletion(del) => Some(del.slot), + Update::BlockDetails(b) => Some(b.slot), + } +} + +pub fn get_datasource_last_slots() -> HashMap { + DATASOURCE_LAST_SLOTS.read().unwrap().clone() } impl Pipeline { + fn ensure_stats_entry(&mut self, id: &DatasourceId) { + if !self.datasource_stats.contains_key(id) { + self.datasource_stats.insert( + id.clone(), + DatasourceStats { + last_slot: 0, + wins_per_minute: VecDeque::with_capacity(64), + }, + ); + } + } + + fn record_win(&mut self, id: &DatasourceId) { + self.ensure_stats_entry(id); + let stats = self.datasource_stats.get_mut(id).unwrap(); + let now_minute = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + / 60; + if let Some(back) = stats.wins_per_minute.back_mut() { + if back.0 == now_minute { + back.1 += 1; + } else { + stats.wins_per_minute.push_back((now_minute, 1)); + } + } else { + stats.wins_per_minute.push_back((now_minute, 1)); + } + while stats.wins_per_minute.len() > 1440 { + stats.wins_per_minute.pop_front(); + } + } + + fn record_last_slot(&mut self, id: &DatasourceId, slot: u64) { + self.ensure_stats_entry(id); + let stats = self.datasource_stats.get_mut(id).unwrap(); + if slot > stats.last_slot { + stats.last_slot = slot; + } + } + + pub async fn emit_datasource_metrics(&self) -> CarbonResult<()> { + let max_slot = self + .datasource_stats + .values() + .map(|s| s.last_slot) + .max() + .unwrap_or(0); + for (id, stats) in &self.datasource_stats { + let id_str = id.as_str(); + let lag = if stats.last_slot > 0 { + max_slot.saturating_sub(stats.last_slot) + } else { + 0 + }; + self.metrics + .update_gauge( + &format!("datasource_last_slot_{id_str}"), + stats.last_slot as f64, + ) + .await?; + self.metrics + .update_gauge(&format!("datasource_lag_vs_max_{id_str}"), lag as f64) + .await?; + + { + let mut guard = DATASOURCE_LAST_SLOTS.write().unwrap(); + guard.insert(id_str.to_string(), stats.last_slot); + } + + let now_minute = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + / 60; + let mut sum_5m = 0u64; + let mut sum_1h = 0u64; + let mut sum_6h = 0u64; + let mut sum_24h = 0u64; + for (minute, wins) in stats.wins_per_minute.iter().rev() { + let age = now_minute.saturating_sub(*minute); + if age < 5 { + sum_5m += *wins; + } + if age < 60 { + sum_1h += *wins; + } + if age < 360 { + sum_6h += *wins; + } + if age < 1440 { + sum_24h += *wins; + } + } + self.metrics + .update_gauge(&format!("datasource_wins_5m_{id_str}"), sum_5m as f64) + .await?; + self.metrics + .update_gauge(&format!("datasource_wins_1h_{id_str}"), sum_1h as f64) + .await?; + self.metrics + .update_gauge(&format!("datasource_wins_6h_{id_str}"), sum_6h as f64) + .await?; + self.metrics + .update_gauge(&format!("datasource_wins_24h_{id_str}"), sum_24h as f64) + .await?; + } + Ok(()) + } + + fn is_duplicate(&mut self, update: &Update) -> bool { + let fresh = match update { + Update::Transaction(tx) => { + if self.tx_dedupe.get(&tx.signature).is_some() { + false + } else { + self.tx_dedupe.put(tx.signature, ()); + true + } + } + Update::Account(acc) => { + let mut hasher = AHasher::default(); + hasher.write(&acc.account.data); + let data_fingerprint = hasher.finish(); + let key = AccountKey { + pubkey: acc.pubkey, + owner: acc.account.owner, + lamports: acc.account.lamports, + rent_epoch: acc.account.rent_epoch, + executable: acc.account.executable, + data_fingerprint, + slot: acc.slot, + }; + self.account_dedupe.insert_if_fresh(key) + } + Update::AccountDeletion(del) => { + let key = DeletionKey { + pubkey: del.pubkey, + slot: del.slot, + }; + self.deletion_dedupe.insert_if_fresh(key) + } + Update::BlockDetails(block) => self.block_dedupe.insert_if_fresh(block.slot), + }; + !fresh + } /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`. /// /// The `builder` method returns a `PipelineBuilder` that allows you to @@ -397,6 +652,7 @@ impl Pipeline { } } _ = interval.tick() => { + self.emit_datasource_metrics().await?; self.metrics.flush_metrics().await?; } update = update_receiver.recv() => { @@ -407,6 +663,35 @@ impl Pipeline { .await?; let start = Instant::now(); + // dedupe: if duplicate drop and continue + if self.is_duplicate(&update) { + self.metrics.increment_counter("updates_duplicate_dropped", 1).await?; + continue; + } + + // normalize timestamps and record slot->time index without delay + let mut update = update; + match &mut update { + Update::BlockDetails(block) => { + if let Some(bt) = block.block_time { + self.slot_time_index.put(block.slot, bt); + } + } + Update::Transaction(tx) => { + if tx.block_time.is_none() { + if let Some(bt) = self.slot_time_index.get(&tx.slot).copied() { + tx.block_time = Some(bt); + } + } + } + _ => {} + } + + if let Some(slot) = extract_slot(&update) { + self.record_win(&datasource_id); + self.record_last_slot(&datasource_id, slot); + } + let process_result = self.process(update.clone(), datasource_id.clone()).await; let time_taken_nanoseconds = start.elapsed().as_nanos(); let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000; @@ -1409,6 +1694,12 @@ impl PipelineBuilder { metrics_flush_interval: self.metrics_flush_interval, datasource_cancellation_token: self.datasource_cancellation_token, channel_buffer_size: self.channel_buffer_size, + tx_dedupe: LruCache::new(NonZeroUsize::new(1_000_000).unwrap()), + account_dedupe: DedupeCache::new(1_000_000, Duration::from_millis(150)), + deletion_dedupe: DedupeCache::new(200_000, Duration::from_millis(150)), + block_dedupe: DedupeCache::new(200_000, Duration::from_millis(150)), + datasource_stats: HashMap::new(), + slot_time_index: LruCache::new(NonZeroUsize::new(200_000).unwrap()), }) } } diff --git a/metrics/log-metrics/src/lib.rs b/metrics/log-metrics/src/lib.rs index 52b5d8b49..2d0944f58 100644 --- a/metrics/log-metrics/src/lib.rs +++ b/metrics/log-metrics/src/lib.rs @@ -1,7 +1,10 @@ use { async_trait::async_trait, carbon_core::{error::CarbonResult, metrics::Metrics}, - std::{collections::HashMap, time::Instant}, + std::{ + collections::{BTreeSet, HashMap}, + time::Instant, + }, tokio::sync::RwLock, }; @@ -75,46 +78,130 @@ impl Metrics for LogMetrics { let updates_processing_times_min = *updates_processing_times.iter().min().unwrap_or(&0); let updates_processing_times_max = *updates_processing_times.iter().max().unwrap_or(&0); - let updates_received = self.updates_received.read().await; - let updates_queued = self.updates_queued.read().await; + let updates_queued = *self.updates_queued.read().await; + let updates_successful = *self.updates_successful.read().await; + let updates_failed = *self.updates_failed.read().await; + let updates_processed = *self.updates_processed.read().await; - let total_updates_received = *updates_received + *updates_queued; + let start = *self.start.read().await; + let mut last_flush = self.last_flush.write().await; + let last_elapsed = last_flush.elapsed(); - let updates_successful = self.updates_successful.read().await; - let updates_failed = self.updates_failed.read().await; - let updates_processed = self.updates_processed.read().await; + let counters_snapshot = self.counters.read().await.clone(); + let gauges_snapshot = self.gauges.read().await.clone(); + let histograms_snapshot = self.histograms.read().await.clone(); - let start = self.start.read().await; - let mut last_flush = self.last_flush.write().await; + let processed_den = updates_processed + updates_queued; + let processed_pct = if processed_den > 0 { + (updates_processed * 100) / processed_den + } else { + 0 + }; log::info!( "{:02}:{:02}:{:02} (+{:?}) | {} processed ({}%), {} successful, {} failed ({}%), {} in queue, avg: {}ms, min: {}ms, max: {}ms", start.elapsed().as_secs() / 3600, (start.elapsed().as_secs() % 3600) / 60, start.elapsed().as_secs() % 60, - last_flush.elapsed(), + last_elapsed, updates_processed, - if total_updates_received > 0 {(*updates_processed * 100) / total_updates_received} else {0}, + processed_pct, updates_successful, updates_failed, - if *updates_processed > 0 {(*updates_failed * 100) / *updates_processed} else {0}, + if updates_processed > 0 { (updates_failed * 100) / updates_processed } else { 0 }, updates_queued, updates_processing_times_avg, updates_processing_times_min, updates_processing_times_max ); - for counter in self.counters.read().await.iter() { - log::info!("{}: {}", counter.0, counter.1); + for (k, v) in counters_snapshot.iter() { + log::info!("{k}: {v}"); } - for gauge in self.gauges.read().await.iter() { - log::info!("{}: {}", gauge.0, gauge.1); + let mut datasource_ids: BTreeSet = BTreeSet::new(); + for k in gauges_snapshot.keys() { + if let Some(id) = k.strip_prefix("datasource_last_slot_") { + datasource_ids.insert(id.to_string()); + } + } + if !datasource_ids.is_empty() { + let mut ids: Vec<_> = datasource_ids.into_iter().collect(); + ids.sort(); + let id_width = ids.iter().map(|s| s.len()).max().unwrap_or(0); + let total_5m: u64 = ids + .iter() + .map(|id| { + *gauges_snapshot + .get(&format!("datasource_wins_5m_{id}")) + .unwrap_or(&0.0) as u64 + }) + .sum(); + let total_1h: u64 = ids + .iter() + .map(|id| { + *gauges_snapshot + .get(&format!("datasource_wins_1h_{id}")) + .unwrap_or(&0.0) as u64 + }) + .sum(); + let total_6h: u64 = ids + .iter() + .map(|id| { + *gauges_snapshot + .get(&format!("datasource_wins_6h_{id}")) + .unwrap_or(&0.0) as u64 + }) + .sum(); + + log::info!("datasources (share of first-arrival):"); + for id in ids { + let w5m = *gauges_snapshot + .get(&format!("datasource_wins_5m_{id}")) + .unwrap_or(&0.0) as u64; + let w1h = *gauges_snapshot + .get(&format!("datasource_wins_1h_{id}")) + .unwrap_or(&0.0) as u64; + let w6h = *gauges_snapshot + .get(&format!("datasource_wins_6h_{id}")) + .unwrap_or(&0.0) as u64; + + let p5 = if total_5m > 0 { + w5m * 100 / total_5m + } else { + 0 + }; + let p1 = if total_1h > 0 { + w1h * 100 / total_1h + } else { + 0 + }; + let p6 = if total_6h > 0 { + w6h * 100 / total_6h + } else { + 0 + }; + + log::info!("{id:id_width$} | 5m: {p5:>3}% | 1h: {p1:>3}% | 6h: {p6:>3}%"); + } } - for histogram in self.histograms.read().await.iter() { - let histogram_values = histogram.1; + let mut other_gauges: Vec<_> = gauges_snapshot + .iter() + .filter(|(k, _)| !k.starts_with("datasource_")) + .map(|(k, v)| (k.clone(), *v)) + .collect(); + if !other_gauges.is_empty() { + other_gauges.sort_by(|a, b| a.0.cmp(&b.0)); + let line = other_gauges + .into_iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(", "); + log::info!("gauges: {line}"); + } + for (name, histogram_values) in histograms_snapshot.iter() { let avg = if !histogram_values.is_empty() { histogram_values.iter().sum::() / histogram_values.len() as f64 } else { @@ -131,13 +218,7 @@ impl Metrics for LogMetrics { .copied() .unwrap_or(0.0); - log::info!( - "{} -> avg: {}, min: {}, max: {}", - histogram.0, - avg, - min, - max - ); + log::info!("hist {name}: avg={avg} min={min} max={max}"); } self.histograms.write().await.clear();