diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs index a4368fa15..42ca37d6d 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs @@ -1,3 +1,4 @@ +use futures_util::future::join_all; use log::*; use magicblock_core::{ token_programs::{try_derive_eata_address_and_bump, MaybeIntoAta}, @@ -42,17 +43,18 @@ where let mut accounts_to_clone = vec![]; let mut ata_join_set = JoinSet::new(); - // Subscribe first so subsequent fetches are kept up-to-date + // Collect all pubkeys to subscribe to and spawn fetch tasks + let mut pubkeys_to_subscribe = vec![]; + for (ata_pubkey, _, ata_info, ata_account_slot) in &atas { - if let Err(err) = this.subscribe_to_account(ata_pubkey).await { - warn!("Failed to subscribe to ATA {}: {}", ata_pubkey, err); - } + // Collect ATA pubkey for subscription + pubkeys_to_subscribe.push(*ata_pubkey); + if let Some((eata, _)) = try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint) { - if let Err(err) = this.subscribe_to_account(&eata).await { - warn!("Failed to subscribe to derived eATA {}: {}", eata, err); - } + // Collect eATA pubkey for subscription + pubkeys_to_subscribe.push(eata); let effective_slot = if let Some(min_slot) = min_context_slot { min_slot.max(*ata_account_slot) @@ -69,6 +71,23 @@ where } } + // Send all subscription requests in parallel and await their confirmations + let subscription_results = join_all( + pubkeys_to_subscribe + .iter() + .map(|pk| this.subscribe_to_account(pk)), + ) + .await; + + // Log any subscription errors + for (pubkey, result) in + pubkeys_to_subscribe.iter().zip(subscription_results.iter()) + { + if let Err(err) = result { + error!("Failed to subscribe to account {}: {}", pubkey, err); + } + } + let ata_results = ata_join_set.join_all().await; for result in ata_results { diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index 006e6110f..5b58263d2 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -154,131 +154,182 @@ where mut subscription_updates: mpsc::Receiver, ) { tokio::spawn(async move { - while let Some(update) = subscription_updates.recv().await { - trace!("FetchCloner received subscription update for {} at slot {}", - update.pubkey, update.account.slot()); - let pubkey = update.pubkey; - - // Process each subscription update concurrently to avoid blocking on delegation - // record fetches. This allows multiple updates to be processed in parallel. - let this = Arc::clone(&self); - tokio::spawn(async move { - let (resolved_account, deleg_record) = - this.resolve_account_to_clone_from_forwarded_sub_with_unsubscribe(update) - .await; - if let Some(account) = resolved_account { - // Ensure that the subscription update isn't out of order, i.e. we don't already - // hold a newer version of the account in our bank - let out_of_order_slot = this - .accounts_bank - .get_account(&pubkey) - .and_then(|in_bank| { - if in_bank.remote_slot() - >= account.remote_slot() - { - Some(in_bank.remote_slot()) - } else { - None - } - }); - if let Some(in_bank_slot) = out_of_order_slot { - warn!( - "Ignoring out-of-order subscription update for {pubkey}: bank slot {in_bank_slot}, update slot {}", - account.remote_slot() - ); - return; - } - - if let Some(in_bank) = - this.accounts_bank.get_account(&pubkey) - { - if in_bank.undelegating() { - // We expect the account to still be delegated, but with the delegation - // program owner - debug!("Received update for undelegating account {pubkey} \ - in_bank.delegated={}, \ - in_bank.owner={}, \ - in_bank.remote_slot={}, \ - chain.delegated={}, \ - chain.owner={}, \ - chain.remote_slot={}", - in_bank.delegated(), - in_bank.owner(), - in_bank.remote_slot(), - account.delegated(), - account.owner(), - account.remote_slot() - ); + let mut pending_tasks: JoinSet<()> = JoinSet::new(); + + loop { + // Try to receive a new update without blocking to drain tasks first if available + match subscription_updates.try_recv() { + Ok(update) => { + trace!("FetchCloner received subscription update for {} at slot {}", + update.pubkey, update.account.slot()); + let pubkey = update.pubkey; + + // Process each subscription update concurrently to avoid blocking on delegation + // record fetches. This allows multiple updates to be processed in parallel. + let this = Arc::clone(&self); + pending_tasks.spawn(async move { + Self::process_subscription_update( + &this, pubkey, update, + ) + .await; + }); - // This will only be true in the following case: - // 1. a commit was triggered for the account - // 2. a commit + undelegate was triggered for the account -> undelegating - // 3. we receive the update for (1.) - // - // Thus our state is more up to date and we don't need to update our - // bank. - if account_still_undelegating_on_chain( - &pubkey, - account.delegated(), - in_bank.remote_slot(), - deleg_record, - &this.validator_pubkey, - ) { - return; - } - } else if in_bank.owner().eq(&dlp::id()) { - debug!( - "Received update for {pubkey} owned by deleg program not marked as undelegating" - ); + // Drain any completed tasks before continuing + while let Some(result) = pending_tasks.try_join_next() { + if let Err(e) = result { + error!( + "Subscription update task panicked: {e:?}" + ); } - } else { - warn!( - "Received update for {pubkey} which is not in bank" - ); } + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + // No messages available - wait for either a new message or task completion + tokio::select! { + maybe_update = subscription_updates.recv() => { + let Some(update) = maybe_update else { + // Channel closed, drain remaining tasks and exit + while pending_tasks.join_next().await.is_some() {} + break; + }; - // Determine if delegated to another validator - let delegated_to_other = deleg_record - .as_ref() - .and_then(|dr| this.get_delegated_to_other(dr)); - - // Once we clone an account that is delegated to us we no longer need - // to receive updates for it from chain - // The subscription will be turned back on once the committor service schedules - // a commit for it that includes undelegation - if account.delegated() { - if let Err(err) = this - .remote_account_provider - .unsubscribe(&pubkey) - .await - { - error!( - "Failed to unsubscribe from delegated account {pubkey}: {err}" - ); + trace!("FetchCloner received subscription update for {} at slot {}", + update.pubkey, update.account.slot()); + let pubkey = update.pubkey; + + let this = Arc::clone(&self); + pending_tasks.spawn(async move { + Self::process_subscription_update(&this, pubkey, update).await; + }); + } + Some(result) = pending_tasks.join_next(), if !pending_tasks.is_empty() => { + if let Err(e) = result { + error!("Subscription update task panicked: {e:?}"); + } } } + } + Err( + tokio::sync::mpsc::error::TryRecvError::Disconnected, + ) => { + // Channel closed, drain remaining tasks and exit + while pending_tasks.join_next().await.is_some() {} + break; + } + } + } + }); + } - if account.executable() { - this.handle_executable_sub_update(pubkey, account) - .await; - } else if let Err(err) = this - .cloner - .clone_account(AccountCloneRequest { - pubkey, - account, - commit_frequency_ms: None, - delegated_to_other, - }) - .await - { - error!( - "Failed to clone account {pubkey} into bank: {err}" - ); - } + /// Process a single subscription update + async fn process_subscription_update( + this: &Arc, + pubkey: Pubkey, + update: ForwardedSubscriptionUpdate, + ) { + let (resolved_account, deleg_record) = this + .resolve_account_to_clone_from_forwarded_sub_with_unsubscribe( + update, + ) + .await; + if let Some(account) = resolved_account { + // Ensure that the subscription update isn't out of order, i.e. we don't already + // hold a newer version of the account in our bank + let out_of_order_slot = + this.accounts_bank.get_account(&pubkey).and_then(|in_bank| { + if in_bank.remote_slot() >= account.remote_slot() { + Some(in_bank.remote_slot()) + } else { + None } }); + if let Some(in_bank_slot) = out_of_order_slot { + warn!( + "Ignoring out-of-order subscription update for {pubkey}: bank slot {in_bank_slot}, update slot {}", + account.remote_slot() + ); + return; } - }); + + if let Some(in_bank) = this.accounts_bank.get_account(&pubkey) { + if in_bank.undelegating() { + // We expect the account to still be delegated, but with the delegation + // program owner + debug!( + "Received update for undelegating account {pubkey} \ + in_bank.delegated={}, \ + in_bank.owner={}, \ + in_bank.remote_slot={}, \ + chain.delegated={}, \ + chain.owner={}, \ + chain.remote_slot={}", + in_bank.delegated(), + in_bank.owner(), + in_bank.remote_slot(), + account.delegated(), + account.owner(), + account.remote_slot() + ); + + // This will only be true in the following case: + // 1. a commit was triggered for the account + // 2. a commit + undelegate was triggered for the account -> undelegating + // 3. we receive the update for (1.) + // + // Thus our state is more up to date and we don't need to update our + // bank. + if account_still_undelegating_on_chain( + &pubkey, + account.delegated(), + in_bank.remote_slot(), + deleg_record, + &this.validator_pubkey, + ) { + return; + } + } else if in_bank.owner().eq(&dlp::id()) { + debug!( + "Received update for {pubkey} owned by deleg program not marked as undelegating" + ); + } + } else { + warn!("Received update for {pubkey} which is not in bank"); + } + + // Determine if delegated to another validator + let delegated_to_other = deleg_record + .as_ref() + .and_then(|dr| this.get_delegated_to_other(dr)); + + // Once we clone an account that is delegated to us we no longer need + // to receive updates for it from chain + // The subscription will be turned back on once the committor service schedules + // a commit for it that includes undelegation + if account.delegated() { + if let Err(err) = + this.remote_account_provider.unsubscribe(&pubkey).await + { + error!( + "Failed to unsubscribe from delegated account {pubkey}: {err}" + ); + } + } + + if account.executable() { + this.handle_executable_sub_update(pubkey, account).await; + } else if let Err(err) = this + .cloner + .clone_account(AccountCloneRequest { + pubkey, + account, + commit_frequency_ms: None, + delegated_to_other, + }) + .await + { + error!("Failed to clone account {pubkey} into bank: {err}"); + } + } } async fn handle_executable_sub_update( diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index 7822e0066..f172eb4c9 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -15,7 +15,7 @@ use config::RemoteAccountProviderConfig; pub(crate) use errors::{ RemoteAccountProviderError, RemoteAccountProviderResult, }; -use futures_util::future::try_join_all; +use futures_util::future::{join_all, try_join_all}; use log::*; pub use lru_cache::AccountsLruCache; pub(crate) use remote_account::RemoteAccount; @@ -806,10 +806,42 @@ impl RemoteAccountProvider { .join(", "); trace!("Subscribing to accounts: {pubkeys}"); } - for (pubkey, _) in subscribe_and_fetch.iter() { - // Register the subscription for the pubkey (handles LRU cache and eviction first) - self.subscribe(pubkey).await?; + + // Send all subscription requests in parallel (non-fail-fast) + // We use join_all instead of try_join_all to ensure ALL subscribe attempts complete, + // even if some fail. This prevents resource leaks in fetching_accounts and ensures + // all oneshot receivers get a response (either success or error). + let subscription_results = join_all( + subscribe_and_fetch + .iter() + .map(|(pubkey, _)| self.subscribe(pubkey)), + ) + .await; + + // Collect errors and log each individual failure + let mut errors = Vec::new(); + for (result, (pubkey, _)) in + subscription_results.iter().zip(subscribe_and_fetch.iter()) + { + if let Err(err) = result { + error!("Failed to subscribe to account {}: {}", pubkey, err); + errors.push(format!("{}: {}", pubkey, err)); + } + } + + // Fail if ANY subscription failed + if !errors.is_empty() { + return Err( + RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + format!( + "{} subscription(s) failed: [{}]", + errors.len(), + errors.join(", ") + ), + ), + ); } + Ok(()) } diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 106d1dbb9..e60292cf8 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -2231,10 +2231,10 @@ dependencies = [ [[package]] name = "guinea" -version = "0.5.2" +version = "0.5.3" dependencies = [ "bincode", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "serde", "solana-program", ] @@ -3260,7 +3260,7 @@ dependencies = [ [[package]] name = "magicblock-account-cloner" -version = "0.5.2" +version = "0.5.3" dependencies = [ "async-trait", "bincode", @@ -3271,7 +3271,7 @@ dependencies = [ "magicblock-config", "magicblock-core", "magicblock-ledger", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "magicblock-program", "magicblock-rpc-client", "rand 0.9.2", @@ -3292,7 +3292,7 @@ dependencies = [ [[package]] name = "magicblock-accounts" -version = "0.5.2" +version = "0.5.3" dependencies = [ "async-trait", "log", @@ -3314,7 +3314,7 @@ dependencies = [ [[package]] name = "magicblock-accounts-db" -version = "0.5.2" +version = "0.5.3" dependencies = [ "lmdb-rkv", "log", @@ -3330,7 +3330,7 @@ dependencies = [ [[package]] name = "magicblock-aperture" -version = "0.5.2" +version = "0.5.3" dependencies = [ "arc-swap", "base64 0.21.7", @@ -3375,7 +3375,7 @@ dependencies = [ [[package]] name = "magicblock-api" -version = "0.5.2" +version = "0.5.3" dependencies = [ "anyhow", "borsh 1.6.0", @@ -3391,7 +3391,7 @@ dependencies = [ "magicblock-config", "magicblock-core", "magicblock-ledger", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "magicblock-metrics", "magicblock-processor", "magicblock-program", @@ -3430,7 +3430,7 @@ dependencies = [ [[package]] name = "magicblock-chainlink" -version = "0.5.2" +version = "0.5.3" dependencies = [ "arc-swap", "async-trait", @@ -3443,7 +3443,7 @@ dependencies = [ "magicblock-config", "magicblock-core", "magicblock-delegation-program 1.1.3 (git+https://github.com/magicblock-labs/delegation-program.git?rev=1874b4f5f5f55cb9ab54b64de2cc0d41107d1435)", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "magicblock-metrics", "parking_lot", "scc", @@ -3483,7 +3483,7 @@ dependencies = [ [[package]] name = "magicblock-committor-program" -version = "0.5.2" +version = "0.5.3" dependencies = [ "borsh 1.6.0", "paste", @@ -3495,7 +3495,7 @@ dependencies = [ [[package]] name = "magicblock-committor-service" -version = "0.5.2" +version = "0.5.3" dependencies = [ "async-trait", "base64 0.21.7", @@ -3538,7 +3538,7 @@ dependencies = [ [[package]] name = "magicblock-config" -version = "0.5.2" +version = "0.5.3" dependencies = [ "clap", "derive_more", @@ -3556,10 +3556,10 @@ dependencies = [ [[package]] name = "magicblock-core" -version = "0.5.2" +version = "0.5.3" dependencies = [ "flume", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "solana-account", "solana-account-decoder", "solana-hash", @@ -3606,12 +3606,12 @@ dependencies = [ "solana-security-txt", "static_assertions", "strum 0.27.2", - "thiserror 1.0.69", + "thiserror 2.0.17", ] [[package]] name = "magicblock-ledger" -version = "0.5.2" +version = "0.5.3" dependencies = [ "arc-swap", "bincode", @@ -3661,7 +3661,7 @@ dependencies = [ [[package]] name = "magicblock-magic-program-api" -version = "0.5.2" +version = "0.5.3" dependencies = [ "bincode", "serde", @@ -3670,7 +3670,7 @@ dependencies = [ [[package]] name = "magicblock-metrics" -version = "0.5.2" +version = "0.5.3" dependencies = [ "http-body-util", "hyper 1.8.1", @@ -3684,7 +3684,7 @@ dependencies = [ [[package]] name = "magicblock-processor" -version = "0.5.2" +version = "0.5.3" dependencies = [ "bincode", "log", @@ -3719,12 +3719,12 @@ dependencies = [ [[package]] name = "magicblock-program" -version = "0.5.2" +version = "0.5.3" dependencies = [ "bincode", "lazy_static", "magicblock-core", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "num-derive", "num-traits", "parking_lot", @@ -3751,7 +3751,7 @@ dependencies = [ [[package]] name = "magicblock-rpc-client" -version = "0.5.2" +version = "0.5.3" dependencies = [ "log", "solana-account", @@ -3772,7 +3772,7 @@ dependencies = [ [[package]] name = "magicblock-table-mania" -version = "0.5.2" +version = "0.5.3" dependencies = [ "ed25519-dalek", "log", @@ -3798,7 +3798,7 @@ dependencies = [ [[package]] name = "magicblock-task-scheduler" -version = "0.5.2" +version = "0.5.3" dependencies = [ "bincode", "chrono", @@ -3825,7 +3825,7 @@ dependencies = [ [[package]] name = "magicblock-validator-admin" -version = "0.5.2" +version = "0.5.3" dependencies = [ "log", "magicblock-delegation-program 1.1.3 (git+https://github.com/magicblock-labs/delegation-program.git?rev=1874b4f5f5f55cb9ab54b64de2cc0d41107d1435)", @@ -3842,7 +3842,7 @@ dependencies = [ [[package]] name = "magicblock-version" -version = "0.5.2" +version = "0.5.3" dependencies = [ "git-version", "rustc_version", @@ -4751,7 +4751,7 @@ dependencies = [ "bincode", "borsh 1.6.0", "ephemeral-rollups-sdk", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "serde", "solana-program", ] @@ -4775,7 +4775,7 @@ dependencies = [ "borsh 1.6.0", "ephemeral-rollups-sdk", "magicblock-delegation-program 1.1.3 (git+https://github.com/magicblock-labs/delegation-program.git?rev=1874b4f5f5f55cb9ab54b64de2cc0d41107d1435)", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "rkyv 0.7.45", "solana-program", "static_assertions", @@ -5810,7 +5810,7 @@ dependencies = [ "integration-test-tools", "log", "magicblock-core", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "program-schedulecommit", "rand 0.8.5", "schedulecommit-client", @@ -5827,7 +5827,7 @@ version = "0.0.0" dependencies = [ "integration-test-tools", "magicblock-core", - "magicblock-magic-program-api 0.5.2", + "magicblock-magic-program-api 0.5.3", "program-schedulecommit", "program-schedulecommit-security", "schedulecommit-client", @@ -8546,7 +8546,7 @@ dependencies = [ [[package]] name = "solana-storage-proto" -version = "0.5.2" +version = "0.5.3" dependencies = [ "bincode", "bs58", @@ -10047,7 +10047,7 @@ dependencies = [ [[package]] name = "test-kit" -version = "0.5.2" +version = "0.5.3" dependencies = [ "env_logger 0.11.8", "guinea",