Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 222 additions & 2 deletions crates/core/src/rpc/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use jsonrpc_pubsub::{
};
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionConfig},
rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionConfig, RpcTransactionLogsFilter},
rpc_response::{
ProcessedSignatureResult, ReceivedSignatureResult, RpcResponseContext, RpcSignatureResult,
ProcessedSignatureResult, ReceivedSignatureResult, RpcLogsResponse, RpcResponseContext,
RpcSignatureResult,
},
};
use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
Expand Down Expand Up @@ -450,6 +451,131 @@ pub trait Rpc {
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;

/// Subscribe to logs notifications.
///
/// This method allows clients to subscribe to transaction log messages
/// emitted during transaction execution. It supports filtering by signature,
/// account mentions, or all transactions.
///
/// ## Parameters
/// - `meta`: WebSocket metadata containing RPC context and connection information.
/// - `subscriber`: The subscription sink for sending log notifications to the client.
/// - `mentions`: Optional filter for the subscription: can be a specific signature, account, or `"all"`.
/// - `commitment`: Optional commitment level for filtering logs by block finality.
///
/// ## Returns
/// This method establishes a continuous WebSocket subscription that streams
/// `RpcLogsResponse` notifications as new transactions are processed.
///
/// ## Example WebSocket Request
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "id": 1,
/// "method": "logsSubscribe",
/// "params": [
/// {
/// "mentions": ["11111111111111111111111111111111"]
/// },
/// {
/// "commitment": "finalized"
/// }
/// ]
/// }
/// ```
///
/// ## Example WebSocket Response (Subscription Confirmation)
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "result": 42,
/// "id": 1
/// }
/// ```
///
/// ## Example WebSocket Notification
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "method": "logsNotification",
/// "params": {
/// "result": {
/// "signature": "3s6n...",
/// "err": null,
/// "logs": ["Program 111111... invoke [1]", "Program 111111... success"]
/// },
/// "subscription": 42
/// }
/// }
/// ```
///
/// ## Notes
/// - The subscription remains active until explicitly unsubscribed or the connection is closed.
/// - Each log subscription runs independently and supports filtering.
/// - Log messages may be truncated depending on cluster configuration.
///
/// ## See Also
/// - `logsUnsubscribe`: Remove an active logs subscription.
#[pubsub(subscription = "logsNotification", subscribe, name = "logsSubscribe")]
fn logs_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
mentions: Option<RpcTransactionLogsFilter>,
commitment: Option<CommitmentLevel>,
);

/// Unsubscribe from logs notifications.
///
/// This method removes an active logs subscription, stopping further notifications
/// for the specified subscription ID.
///
/// ## Parameters
/// - `meta`: Optional WebSocket metadata containing connection information.
/// - `subscription`: The subscription ID to remove, as returned by `logsSubscribe`.
///
/// ## Returns
/// A `Result<bool>` indicating whether the unsubscription was successful:
/// - `Ok(true)` if the subscription was successfully removed.
/// - `Err(Error)` with `InvalidParams` if the subscription ID is not recognized.
///
/// ## Example WebSocket Request
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "id": 1,
/// "method": "logsUnsubscribe",
/// "params": [42]
/// }
/// ```
///
/// ## Example WebSocket Response
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "result": true,
/// "id": 1
/// }
/// ```
///
/// ## Notes
/// - Unsubscribing from a non-existent subscription ID returns an error.
/// - Successfully unsubscribed clients will no longer receive logs notifications.
/// - This method is thread-safe and may be called concurrently.
///
/// ## See Also
/// - `logsSubscribe`: Create a logs subscription.
#[pubsub(
subscription = "logsNotification",
unsubscribe,
name = "logsUnsubscribe"
)]
fn logs_unsubscribe(
&self,
meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool>;
}

/// WebSocket RPC server implementation for Surfpool.
Expand Down Expand Up @@ -492,6 +618,7 @@ pub struct SurfpoolWsRpc {
pub account_subscription_map:
Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<UiAccount>>>>>,
pub slot_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>>,
pub logs_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcLogsResponse>>>>>,
pub tokio_handle: tokio::runtime::Handle,
}

Expand Down Expand Up @@ -936,4 +1063,97 @@ impl Rpc for SurfpoolWsRpc {
})
}
}

fn logs_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
mentions: Option<RpcTransactionLogsFilter>,
commitment: Option<CommitmentLevel>,
) {
let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
let sub_id = SubscriptionId::Number(id as u64);
let sink = match subscriber.assign_id(sub_id.clone()) {
Ok(sink) => sink,
Err(e) => {
log::error!("Failed to assign subscription ID: {:?}", e);
return;
}
};

let mentions = mentions.unwrap_or(RpcTransactionLogsFilter::All);
let commitment = commitment.unwrap_or(CommitmentLevel::Confirmed);

let logs_active = Arc::clone(&self.logs_subscription_map);
let meta = meta.clone();

let svm_locker = match meta.get_svm_locker() {
Ok(locker) => locker,
Err(e) => {
log::error!("Failed to get SVM locker for slot subscription: {e}");
if let Err(e) = sink.notify(Err(e.into())) {
log::error!(
"Failed to send error notification to client for SVM locker failure: {e}"
);
}
return;
}
};

self.tokio_handle.spawn(async move {
if let Ok(mut guard) = logs_active.write() {
guard.insert(sub_id.clone(), sink);
} else {
log::error!("Failed to acquire write lock on slot_subscription_map");
return;
}

let rx = svm_locker.subscribe_for_logs_updates(&commitment, &mentions);

loop {
// if the subscription has been removed, break the loop
if let Ok(guard) = logs_active.read() {
if guard.get(&sub_id).is_none() {
break;
}
} else {
log::error!("Failed to acquire read lock on slot_subscription_map");
break;
}

if let Ok((slot, value)) = rx.try_recv() {
if let Ok(guard) = logs_active.read() {
if let Some(sink) = guard.get(&sub_id) {
let _ = sink.notify(Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value,
}));
}
}
}
}
});
}

fn logs_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
subscription: SubscriptionId,
) -> Result<bool> {
let removed = if let Ok(mut guard) = self.logs_subscription_map.write() {
guard.remove(&subscription)
} else {
log::error!("Failed to acquire write lock on logs_subscription_map");
None
};
if removed.is_some() {
Ok(true)
} else {
Err(Error {
code: ErrorCode::InvalidParams,
message: "Invalid subscription.".into(),
data: None,
})
}
}
}
1 change: 1 addition & 0 deletions crates/core/src/runloops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ async fn start_ws_rpc_server_runloop(
signature_subscription_map: Arc::new(RwLock::new(HashMap::new())),
account_subscription_map: Arc::new(RwLock::new(HashMap::new())),
slot_subscription_map: Arc::new(RwLock::new(HashMap::new())),
logs_subscription_map: Arc::new(RwLock::new(HashMap::new())),
tokio_handle: tokio_handle.clone(),
}
.to_delegate(),
Expand Down
27 changes: 24 additions & 3 deletions crates/core/src/surfnet/locker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use solana_address_lookup_table_interface::state::AddressLookupTable;
use solana_client::{
rpc_config::{
RpcAccountInfoConfig, RpcBlockConfig, RpcLargestAccountsConfig, RpcLargestAccountsFilter,
RpcSignaturesForAddressConfig, RpcTransactionConfig,
RpcSignaturesForAddressConfig, RpcTransactionConfig, RpcTransactionLogsFilter,
},
rpc_filter::RpcFilterType,
rpc_request::TokenAccountsFilter,
rpc_response::{
RpcAccountBalance, RpcConfirmedTransactionStatusWithSignature, RpcKeyedAccount,
RpcTokenAccountBalance,
RpcLogsResponse, RpcTokenAccountBalance,
},
};
use solana_clock::Slot;
Expand Down Expand Up @@ -779,6 +779,7 @@ impl SurfnetSvmLocker {
})
.collect::<Vec<_>>()
.clone();
let mut logs = vec![];

// if not skipping preflight, simulate the transaction
if !skip_preflight {
Expand All @@ -792,6 +793,7 @@ impl SurfnetSvmLocker {
res.err
)));
let meta = convert_transaction_metadata_from_canonical(&res.meta);
logs = meta.logs.clone();
let _ = status_tx.try_send(TransactionStatusEvent::SimulationFailure((
res.err.clone(),
meta,
Expand All @@ -800,7 +802,13 @@ impl SurfnetSvmLocker {
SignatureSubscriptionType::processed(),
&signature,
latest_absolute_slot,
Some(res.err.clone()),
);
svm_writer.notify_logs_subscribers(
&signature,
Some(res.err),
logs,
CommitmentLevel::Processed,
);
return Ok::<(), SurfpoolError>(());
}
Expand All @@ -811,6 +819,7 @@ impl SurfnetSvmLocker {
.send_transaction(transaction.clone(), false /* cu_analysis_enabled */)
{
Ok(res) => {
logs = res.logs.clone();
let accounts_after = pubkeys_from_message
.iter()
.map(|p| svm_writer.inner.get_account(p))
Expand Down Expand Up @@ -931,8 +940,9 @@ impl SurfnetSvmLocker {
SignatureSubscriptionType::processed(),
&signature,
latest_absolute_slot,
err,
err.clone(),
);
svm_writer.notify_logs_subscribers(&signature, err, logs, CommitmentLevel::Processed);
Ok(())
})?;

Expand Down Expand Up @@ -1963,6 +1973,17 @@ impl SurfnetSvmLocker {
self.with_svm_writer(|svm_writer| svm_writer.subscribe_for_slot_updates())
}

/// Subscribes for logs updates and returns a receiver of logs updates.
pub fn subscribe_for_logs_updates(
&self,
commitment_level: &CommitmentLevel,
filter: &RpcTransactionLogsFilter,
) -> Receiver<(Slot, RpcLogsResponse)> {
self.with_svm_writer(|svm_writer| {
svm_writer.subscribe_for_logs_updates(commitment_level, filter)
})
}

fn snapshot_get_account_result(
&self,
capture: &mut BTreeMap<Pubkey, Option<UiAccount>>,
Expand Down
7 changes: 7 additions & 0 deletions crates/core/src/surfnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use jsonrpc_core::Result as RpcError;
use locker::SurfnetSvmLocker;
use solana_account::Account;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{rpc_config::RpcTransactionLogsFilter, rpc_response::RpcLogsResponse};
use solana_clock::Slot;
use solana_commitment_config::CommitmentLevel;
use solana_epoch_info::EpochInfo;
Expand Down Expand Up @@ -79,6 +80,12 @@ pub type SignatureSubscriptionData = (
pub type AccountSubscriptionData =
HashMap<Pubkey, Vec<(Option<UiAccountEncoding>, Sender<UiAccount>)>>;

pub type LogsSubscriptionData = (
CommitmentLevel,
RpcTransactionLogsFilter,
Sender<(Slot, RpcLogsResponse)>,
);

#[derive(Debug, Clone, PartialEq)]
pub enum SignatureSubscriptionType {
Received,
Expand Down
Loading