Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async-trait = { workspace = true }
borsh = { workspace = true }
bs58 = { workspace = true }
base64 = { workspace = true }
chrono = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use solana_transaction_status::Rewards;
use {
crate::{error::CarbonResult, metrics::MetricsCollection},
async_trait::async_trait,
chrono::{DateTime, Utc},
solana_account::Account,
solana_pubkey::Pubkey,
solana_signature::Signature,
Expand All @@ -47,6 +48,15 @@ use {
tokio_util::sync::CancellationToken,
};

#[derive(Debug, Clone)]
pub struct DatasourceDisconnection {
pub source: String,
pub disconnect_time: DateTime<Utc>,
pub last_slot_before_disconnect: u64,
pub first_slot_after_reconnect: u64,
pub missed_slots: u64,
}

/// Defines the interface for data sources that produce updates for accounts,
/// transactions, and account deletions.
///
Expand Down
1 change: 1 addition & 0 deletions datasources/rpc-block-subscribe-datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ solana-hash = { workspace = true }
carbon-core = { workspace = true }

async-trait = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
tokio = { workspace = true, features = ["full"] }
Expand Down
70 changes: 66 additions & 4 deletions datasources/rpc-block-subscribe-datasource/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use carbon_core::datasource::{BlockDetails, DatasourceId};
use carbon_core::datasource::{BlockDetails, DatasourceDisconnection, DatasourceId};
use chrono::Utc;
use solana_hash::Hash;
use std::str::FromStr;

Expand All @@ -18,6 +19,7 @@ use {
rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
},
std::sync::Arc,
tokio::sync::mpsc,
tokio::sync::mpsc::Sender,
tokio_util::sync::CancellationToken,
};
Expand Down Expand Up @@ -46,13 +48,15 @@ impl Filters {
pub struct RpcBlockSubscribe {
pub rpc_ws_url: String,
pub filters: Filters,
pub disconnect_notifier: Option<mpsc::Sender<DatasourceDisconnection>>,
}

impl RpcBlockSubscribe {
pub const fn new(rpc_ws_url: String, filters: Filters) -> Self {
pub const fn new(rpc_ws_url: String, filters: Filters, disconnect_notifier: Option<mpsc::Sender<DatasourceDisconnection>>) -> Self {
Self {
rpc_ws_url,
filters,
disconnect_notifier,
}
}
}
Expand All @@ -67,6 +71,10 @@ impl Datasource for RpcBlockSubscribe {
metrics: Arc<MetricsCollection>,
) -> CarbonResult<()> {
let mut reconnection_attempts = 0;
let mut last_processed_slot = 0u64;
let mut last_disconnect_time = None;
let mut last_slot_before_disconnect = None;
let disconnect_tx_clone = self.disconnect_notifier.clone();

loop {
if cancellation_token.is_cancelled() {
Expand Down Expand Up @@ -119,11 +127,65 @@ impl Datasource for RpcBlockSubscribe {
log::info!("Cancellation requested, stopping subscription...");
return Ok(());
}
block_event = block_stream.next() => {
match block_event {
block_event_result = tokio::time::timeout(
Duration::from_secs(30),
block_stream.next()
) => {
let block_event = match block_event_result {
Ok(Some(event)) => event,
Ok(None) => {
log::warn!("Block stream closed");
if last_disconnect_time.is_none() {
last_disconnect_time = Some(Utc::now());
last_slot_before_disconnect = Some(last_processed_slot);
log::warn!("Disconnected at slot {}", last_processed_slot);
}
break;
}
Err(_) => {
log::warn!("Block stream timeout - no messages for 30 seconds");
if last_disconnect_time.is_none() {
last_disconnect_time = Some(Utc::now());
last_slot_before_disconnect = Some(last_processed_slot);
log::warn!("Disconnected at slot {} (timeout)", last_processed_slot);
}
break;
}
};

match Some(block_event) {
Some(tx_event) => {
let slot = tx_event.context.slot;

if last_processed_slot > 0 {
if let (Some(disconnect_time), Some(last_slot)) =
(last_disconnect_time.take(), last_slot_before_disconnect.take())
{
let missed = if slot > last_slot { slot - last_slot } else { 0 };

log::warn!("Reconnected: last_slot={}, new_slot={}, missed={}", last_slot, slot, missed);

let disconnection = DatasourceDisconnection {
source: "rpc-websocket".to_string(),
disconnect_time,
last_slot_before_disconnect: last_slot,
first_slot_after_reconnect: slot,
missed_slots: missed,
};

if let Some(tx) = &disconnect_tx_clone {
match tx.try_send(disconnection) {
Ok(_) => log::warn!("Disconnection event sent successfully"),
Err(e) => log::error!("Failed to send disconnection event: {:?}", e),
}
} else {
log::warn!("No disconnect channel configured");
}
}
}

last_processed_slot = slot;

if let Some(block) = tx_event.value.block {
let block_start_time = std::time::Instant::now();
let block_hash = Hash::from_str(&block.blockhash).ok();
Expand Down
1 change: 1 addition & 0 deletions datasources/yellowstone-grpc-datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ solana-system-interface = { workspace = true }
carbon-core = { workspace = true }

async-trait = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
tokio = { workspace = true, features = ["full"] }
Expand Down
104 changes: 97 additions & 7 deletions datasources/yellowstone-grpc-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use {
async_trait::async_trait,
carbon_core::{
datasource::{
AccountDeletion, AccountUpdate, Datasource, DatasourceId, TransactionUpdate, Update,
UpdateType,
AccountDeletion, AccountUpdate, Datasource, DatasourceDisconnection, DatasourceId,
TransactionUpdate, Update, UpdateType,
},
error::CarbonResult,
metrics::MetricsCollection,
},
chrono::{DateTime, Utc},
futures::{sink::SinkExt, StreamExt},
solana_account::Account,
solana_pubkey::Pubkey,
Expand All @@ -18,7 +19,7 @@ use {
sync::Arc,
time::Duration,
},
tokio::sync::{mpsc::Sender, RwLock},
tokio::sync::{mpsc, mpsc::Sender, RwLock},
tokio_util::sync::CancellationToken,
yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcBuilderResult, GeyserGrpcClient},
yellowstone_grpc_proto::{
Expand All @@ -43,6 +44,7 @@ pub struct YellowstoneGrpcGeyserClient {
pub block_filters: BlockFilters,
pub account_deletions_tracked: Arc<RwLock<HashSet<Pubkey>>>,
pub geyser_config: YellowstoneGrpcClientConfig,
pub disconnect_notifier: Option<mpsc::Sender<DatasourceDisconnection>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -85,6 +87,7 @@ impl YellowstoneGrpcGeyserClient {
block_filters: BlockFilters,
account_deletions_tracked: Arc<RwLock<HashSet<Pubkey>>>,
geyser_config: YellowstoneGrpcClientConfig,
disconnect_notifier: Option<mpsc::Sender<DatasourceDisconnection>>,
) -> Self {
YellowstoneGrpcGeyserClient {
endpoint,
Expand All @@ -95,6 +98,7 @@ impl YellowstoneGrpcGeyserClient {
block_filters,
account_deletions_tracked,
geyser_config,
disconnect_notifier,
}
}
}
Expand Down Expand Up @@ -181,6 +185,8 @@ impl Datasource for YellowstoneGrpcGeyserClient {
.await
.map_err(|err| carbon_core::error::Error::FailedToConsumeDatasource(err.to_string()))?;

let disconnect_tx_clone = self.disconnect_notifier.clone();

tokio::spawn(async move {
let subscribe_request = SubscribeRequest {
slots: HashMap::new(),
Expand All @@ -198,6 +204,10 @@ impl Datasource for YellowstoneGrpcGeyserClient {

let id_for_loop = id.clone();

let mut last_disconnect_time: Option<DateTime<Utc>> = None;
let mut last_slot_before_disconnect: Option<u64> = None;
let mut last_processed_slot: u64 = 0;

loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
Expand All @@ -207,14 +217,78 @@ impl Datasource for YellowstoneGrpcGeyserClient {
result = geyser_client.subscribe_with_request(Some(subscribe_request.clone())) => {
match result {
Ok((mut subscribe_tx, mut stream)) => {
while let Some(message) = stream.next().await {
let mut first_message_after_reconnect = last_disconnect_time.is_some();

loop {
if cancellation_token.is_cancelled() {
break;
}

let message_result = tokio::time::timeout(
Duration::from_secs(30),
stream.next()
).await;

let message = match message_result {
Ok(Some(msg)) => msg,
Ok(None) => {
log::warn!("Stream closed");
if last_disconnect_time.is_none() {
last_disconnect_time = Some(Utc::now());
last_slot_before_disconnect = Some(last_processed_slot);
log::warn!("Disconnected at slot {}", last_processed_slot);
}
break;
}
Err(_) => {
log::warn!("Stream timeout - no messages for 30 seconds");
if last_disconnect_time.is_none() {
last_disconnect_time = Some(Utc::now());
last_slot_before_disconnect = Some(last_processed_slot);
log::warn!("Disconnected at slot {} (timeout)", last_processed_slot);
}
break;
}
};

match message {
Ok(msg) => match msg.update_oneof {
Ok(msg) => {
if first_message_after_reconnect {
first_message_after_reconnect = false;

let current_slot = match &msg.update_oneof {
Some(UpdateOneof::Account(ref update)) => Some(update.slot),
Some(UpdateOneof::Transaction(ref update)) => Some(update.slot),
Some(UpdateOneof::Block(ref update)) => Some(update.slot),
_ => None,
};

if let Some(slot) = current_slot {
if let (Some(disconnect_time), Some(last_slot)) =
(last_disconnect_time.take(), last_slot_before_disconnect.take())
{
let missed = if slot > last_slot { slot - last_slot } else { 0 };

let disconnection = DatasourceDisconnection {
source: "yellowstone-grpc".to_string(),
disconnect_time,
last_slot_before_disconnect: last_slot,
first_slot_after_reconnect: slot,
missed_slots: missed,
};

if let Some(tx) = &disconnect_tx_clone {
let _ = tx.try_send(disconnection);
}

log::info!("Reconnected. Slots: {} -> {} (missed: {})", last_slot, slot, missed);
}
}
}

match msg.update_oneof {
Some(UpdateOneof::Account(account_update)) => {
last_processed_slot = account_update.slot;
send_subscribe_account_update_info(
account_update.account,
&metrics,
Expand All @@ -227,9 +301,11 @@ impl Datasource for YellowstoneGrpcGeyserClient {
}

Some(UpdateOneof::Transaction(transaction_update)) => {
last_processed_slot = transaction_update.slot;
send_subscribe_update_transaction_info(transaction_update.transaction, &metrics, &sender, id_for_loop.clone(), transaction_update.slot, None).await
}
Some(UpdateOneof::Block(block_update)) => {
last_processed_slot = block_update.slot;
let block_time = block_update.block_time.map(|ts| ts.timestamp);

for transaction_update in block_update.transactions {
Expand Down Expand Up @@ -267,16 +343,30 @@ impl Datasource for YellowstoneGrpcGeyserClient {
}

_ => {}
},
}
}
Err(error) => {
log::error!("Geyser stream error: {error:?}");

if last_disconnect_time.is_none() {
last_disconnect_time = Some(Utc::now());
last_slot_before_disconnect = Some(last_processed_slot);
log::error!("Disconnected at slot {}", last_processed_slot);
}

break;
}
}
}
}
Err(e) => {
log::error!("Failed to subscribe: {e:?}");
log::error!("Failed to subscribe: {:?}", e);

if last_disconnect_time.is_none() {
last_disconnect_time = Some(Utc::now());
last_slot_before_disconnect = Some(last_processed_slot);
}

}
}
}
Expand Down