diff --git a/Cargo.lock b/Cargo.lock index 073165e5b..02d0c0ec1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1454,6 +1454,7 @@ dependencies = [ "carbon-macros", "carbon-proc-macros", "carbon-test-utils", + "chrono", "juniper", "juniper_axum", "log", @@ -2148,7 +2149,8 @@ version = "0.12.0" dependencies = [ "async-trait", "carbon-core", - "futures 0.3.31", + "chrono", + "futures", "log", "solana-client", "solana-hash 3.0.0", @@ -2396,8 +2398,11 @@ name = "carbon-yellowstone-grpc-datasource" version = "0.12.0" dependencies = [ "async-trait", + "carbon-core", - "futures 0.3.31", + "chrono", + "futures", + "log", "solana-account", "solana-pubkey 3.0.0", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 7f92b0aa7..04ff0930a 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,6 +31,7 @@ async-trait = { workspace = true } borsh = { workspace = true } bs58 = { workspace = true } base64 = { workspace = true } +chrono = { workspace = true } log = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/core/src/datasource.rs b/crates/core/src/datasource.rs index e154581d9..b835cc5da 100644 --- a/crates/core/src/datasource.rs +++ b/crates/core/src/datasource.rs @@ -38,6 +38,7 @@ use solana_transaction_status::Rewards; use { crate::{error::CarbonResult, metrics::MetricsCollection}, async_trait::async_trait, + chrono::{DateTime, Utc}, solana_account::Account, solana_pubkey::Pubkey, solana_signature::Signature, @@ -47,6 +48,15 @@ use { tokio_util::sync::CancellationToken, }; +#[derive(Debug, Clone)] +pub struct DatasourceDisconnection { + pub source: String, + pub disconnect_time: DateTime, + pub last_slot_before_disconnect: u64, + pub first_slot_after_reconnect: u64, + pub missed_slots: u64, +} + /// Defines the interface for data sources that produce updates for accounts, /// transactions, and account deletions. /// diff --git a/datasources/rpc-block-subscribe-datasource/Cargo.toml b/datasources/rpc-block-subscribe-datasource/Cargo.toml index a86c7fb1d..dfb72285e 100644 --- a/datasources/rpc-block-subscribe-datasource/Cargo.toml +++ b/datasources/rpc-block-subscribe-datasource/Cargo.toml @@ -16,6 +16,7 @@ solana-hash = { workspace = true } carbon-core = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true } futures = { workspace = true } log = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/datasources/rpc-block-subscribe-datasource/src/lib.rs b/datasources/rpc-block-subscribe-datasource/src/lib.rs index f1db06db8..ee813759e 100644 --- a/datasources/rpc-block-subscribe-datasource/src/lib.rs +++ b/datasources/rpc-block-subscribe-datasource/src/lib.rs @@ -1,4 +1,5 @@ -use carbon_core::datasource::{BlockDetails, DatasourceId}; +use carbon_core::datasource::{BlockDetails, DatasourceDisconnection, DatasourceId}; +use chrono::Utc; use solana_hash::Hash; use std::str::FromStr; @@ -18,6 +19,7 @@ use { rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter}, }, std::sync::Arc, + tokio::sync::mpsc, tokio::sync::mpsc::Sender, tokio_util::sync::CancellationToken, }; @@ -46,13 +48,15 @@ impl Filters { pub struct RpcBlockSubscribe { pub rpc_ws_url: String, pub filters: Filters, + pub disconnect_notifier: Option>, } impl RpcBlockSubscribe { - pub const fn new(rpc_ws_url: String, filters: Filters) -> Self { + pub const fn new(rpc_ws_url: String, filters: Filters, disconnect_notifier: Option>) -> Self { Self { rpc_ws_url, filters, + disconnect_notifier, } } } @@ -67,6 +71,10 @@ impl Datasource for RpcBlockSubscribe { metrics: Arc, ) -> CarbonResult<()> { let mut reconnection_attempts = 0; + let mut last_processed_slot = 0u64; + let mut last_disconnect_time = None; + let mut last_slot_before_disconnect = None; + let disconnect_tx_clone = self.disconnect_notifier.clone(); loop { if cancellation_token.is_cancelled() { @@ -119,11 +127,65 @@ impl Datasource for RpcBlockSubscribe { log::info!("Cancellation requested, stopping subscription..."); return Ok(()); } - block_event = block_stream.next() => { - match block_event { + block_event_result = tokio::time::timeout( + Duration::from_secs(30), + block_stream.next() + ) => { + let block_event = match block_event_result { + Ok(Some(event)) => event, + Ok(None) => { + log::warn!("Block stream closed"); + if last_disconnect_time.is_none() { + last_disconnect_time = Some(Utc::now()); + last_slot_before_disconnect = Some(last_processed_slot); + log::warn!("Disconnected at slot {}", last_processed_slot); + } + break; + } + Err(_) => { + log::warn!("Block stream timeout - no messages for 30 seconds"); + if last_disconnect_time.is_none() { + last_disconnect_time = Some(Utc::now()); + last_slot_before_disconnect = Some(last_processed_slot); + log::warn!("Disconnected at slot {} (timeout)", last_processed_slot); + } + break; + } + }; + + match Some(block_event) { Some(tx_event) => { let slot = tx_event.context.slot; + if last_processed_slot > 0 { + if let (Some(disconnect_time), Some(last_slot)) = + (last_disconnect_time.take(), last_slot_before_disconnect.take()) + { + let missed = if slot > last_slot { slot - last_slot } else { 0 }; + + log::warn!("Reconnected: last_slot={}, new_slot={}, missed={}", last_slot, slot, missed); + + let disconnection = DatasourceDisconnection { + source: "rpc-websocket".to_string(), + disconnect_time, + last_slot_before_disconnect: last_slot, + first_slot_after_reconnect: slot, + missed_slots: missed, + }; + + if let Some(tx) = &disconnect_tx_clone { + match tx.try_send(disconnection) { + Ok(_) => log::warn!("Disconnection event sent successfully"), + Err(e) => log::error!("Failed to send disconnection event: {:?}", e), + } + } else { + log::warn!("No disconnect channel configured"); + } + } + } + + last_processed_slot = slot; + if let Some(block) = tx_event.value.block { let block_start_time = std::time::Instant::now(); let block_hash = Hash::from_str(&block.blockhash).ok(); diff --git a/datasources/yellowstone-grpc-datasource/Cargo.toml b/datasources/yellowstone-grpc-datasource/Cargo.toml index d273b611c..543375448 100644 --- a/datasources/yellowstone-grpc-datasource/Cargo.toml +++ b/datasources/yellowstone-grpc-datasource/Cargo.toml @@ -21,6 +21,7 @@ solana-system-interface = { workspace = true } carbon-core = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true } futures = { workspace = true } log = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/datasources/yellowstone-grpc-datasource/src/lib.rs b/datasources/yellowstone-grpc-datasource/src/lib.rs index b87a6d719..9fbb620f7 100644 --- a/datasources/yellowstone-grpc-datasource/src/lib.rs +++ b/datasources/yellowstone-grpc-datasource/src/lib.rs @@ -2,12 +2,13 @@ use { async_trait::async_trait, carbon_core::{ datasource::{ - AccountDeletion, AccountUpdate, Datasource, DatasourceId, TransactionUpdate, Update, - UpdateType, + AccountDeletion, AccountUpdate, Datasource, DatasourceDisconnection, DatasourceId, + TransactionUpdate, Update, UpdateType, }, error::CarbonResult, metrics::MetricsCollection, }, + chrono::{DateTime, Utc}, futures::{sink::SinkExt, StreamExt}, solana_account::Account, solana_pubkey::Pubkey, @@ -18,7 +19,7 @@ use { sync::Arc, time::Duration, }, - tokio::sync::{mpsc::Sender, RwLock}, + tokio::sync::{mpsc, mpsc::Sender, RwLock}, tokio_util::sync::CancellationToken, yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcBuilderResult, GeyserGrpcClient}, yellowstone_grpc_proto::{ @@ -43,6 +44,7 @@ pub struct YellowstoneGrpcGeyserClient { pub block_filters: BlockFilters, pub account_deletions_tracked: Arc>>, pub geyser_config: YellowstoneGrpcClientConfig, + pub disconnect_notifier: Option>, } #[derive(Debug, Clone)] @@ -85,6 +87,7 @@ impl YellowstoneGrpcGeyserClient { block_filters: BlockFilters, account_deletions_tracked: Arc>>, geyser_config: YellowstoneGrpcClientConfig, + disconnect_notifier: Option>, ) -> Self { YellowstoneGrpcGeyserClient { endpoint, @@ -95,6 +98,7 @@ impl YellowstoneGrpcGeyserClient { block_filters, account_deletions_tracked, geyser_config, + disconnect_notifier, } } } @@ -181,6 +185,8 @@ impl Datasource for YellowstoneGrpcGeyserClient { .await .map_err(|err| carbon_core::error::Error::FailedToConsumeDatasource(err.to_string()))?; + let disconnect_tx_clone = self.disconnect_notifier.clone(); + tokio::spawn(async move { let subscribe_request = SubscribeRequest { slots: HashMap::new(), @@ -198,6 +204,10 @@ impl Datasource for YellowstoneGrpcGeyserClient { let id_for_loop = id.clone(); + let mut last_disconnect_time: Option> = None; + let mut last_slot_before_disconnect: Option = None; + let mut last_processed_slot: u64 = 0; + loop { tokio::select! { _ = cancellation_token.cancelled() => { @@ -207,14 +217,78 @@ impl Datasource for YellowstoneGrpcGeyserClient { result = geyser_client.subscribe_with_request(Some(subscribe_request.clone())) => { match result { Ok((mut subscribe_tx, mut stream)) => { - while let Some(message) = stream.next().await { + let mut first_message_after_reconnect = last_disconnect_time.is_some(); + + loop { if cancellation_token.is_cancelled() { break; } + let message_result = tokio::time::timeout( + Duration::from_secs(30), + stream.next() + ).await; + + let message = match message_result { + Ok(Some(msg)) => msg, + Ok(None) => { + log::warn!("Stream closed"); + if last_disconnect_time.is_none() { + last_disconnect_time = Some(Utc::now()); + last_slot_before_disconnect = Some(last_processed_slot); + log::warn!("Disconnected at slot {}", last_processed_slot); + } + break; + } + Err(_) => { + log::warn!("Stream timeout - no messages for 30 seconds"); + if last_disconnect_time.is_none() { + last_disconnect_time = Some(Utc::now()); + last_slot_before_disconnect = Some(last_processed_slot); + log::warn!("Disconnected at slot {} (timeout)", last_processed_slot); + } + break; + } + }; + match message { - Ok(msg) => match msg.update_oneof { + Ok(msg) => { + if first_message_after_reconnect { + first_message_after_reconnect = false; + + let current_slot = match &msg.update_oneof { + Some(UpdateOneof::Account(ref update)) => Some(update.slot), + Some(UpdateOneof::Transaction(ref update)) => Some(update.slot), + Some(UpdateOneof::Block(ref update)) => Some(update.slot), + _ => None, + }; + + if let Some(slot) = current_slot { + if let (Some(disconnect_time), Some(last_slot)) = + (last_disconnect_time.take(), last_slot_before_disconnect.take()) + { + let missed = if slot > last_slot { slot - last_slot } else { 0 }; + + let disconnection = DatasourceDisconnection { + source: "yellowstone-grpc".to_string(), + disconnect_time, + last_slot_before_disconnect: last_slot, + first_slot_after_reconnect: slot, + missed_slots: missed, + }; + + if let Some(tx) = &disconnect_tx_clone { + let _ = tx.try_send(disconnection); + } + + log::info!("Reconnected. Slots: {} -> {} (missed: {})", last_slot, slot, missed); + } + } + } + + match msg.update_oneof { Some(UpdateOneof::Account(account_update)) => { + last_processed_slot = account_update.slot; send_subscribe_account_update_info( account_update.account, &metrics, @@ -227,9 +301,11 @@ impl Datasource for YellowstoneGrpcGeyserClient { } Some(UpdateOneof::Transaction(transaction_update)) => { + last_processed_slot = transaction_update.slot; send_subscribe_update_transaction_info(transaction_update.transaction, &metrics, &sender, id_for_loop.clone(), transaction_update.slot, None).await } Some(UpdateOneof::Block(block_update)) => { + last_processed_slot = block_update.slot; let block_time = block_update.block_time.map(|ts| ts.timestamp); for transaction_update in block_update.transactions { @@ -267,16 +343,30 @@ impl Datasource for YellowstoneGrpcGeyserClient { } _ => {} - }, + } + } Err(error) => { log::error!("Geyser stream error: {error:?}"); + + if last_disconnect_time.is_none() { + last_disconnect_time = Some(Utc::now()); + last_slot_before_disconnect = Some(last_processed_slot); + log::error!("Disconnected at slot {}", last_processed_slot); + } + break; } } } } Err(e) => { - log::error!("Failed to subscribe: {e:?}"); + log::error!("Failed to subscribe: {:?}", e); + + if last_disconnect_time.is_none() { + last_disconnect_time = Some(Utc::now()); + last_slot_before_disconnect = Some(last_processed_slot); + } + } } }