Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::future::join_all;
use log::*;
use magicblock_core::{
token_programs::{try_derive_eata_address_and_bump, MaybeIntoAta},
Expand Down Expand Up @@ -42,17 +43,18 @@ where
let mut accounts_to_clone = vec![];
let mut ata_join_set = JoinSet::new();

// Subscribe first so subsequent fetches are kept up-to-date
// Collect all pubkeys to subscribe to and spawn fetch tasks
let mut pubkeys_to_subscribe = vec![];

for (ata_pubkey, _, ata_info, ata_account_slot) in &atas {
if let Err(err) = this.subscribe_to_account(ata_pubkey).await {
warn!("Failed to subscribe to ATA {}: {}", ata_pubkey, err);
}
// Collect ATA pubkey for subscription
pubkeys_to_subscribe.push(*ata_pubkey);

if let Some((eata, _)) =
try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint)
{
if let Err(err) = this.subscribe_to_account(&eata).await {
warn!("Failed to subscribe to derived eATA {}: {}", eata, err);
}
// Collect eATA pubkey for subscription
pubkeys_to_subscribe.push(eata);

let effective_slot = if let Some(min_slot) = min_context_slot {
min_slot.max(*ata_account_slot)
Expand All @@ -69,6 +71,23 @@ where
}
}

// Send all subscription requests in parallel and await their confirmations
let subscription_results = join_all(
pubkeys_to_subscribe
.iter()
.map(|pk| this.subscribe_to_account(pk)),
)
.await;

// Log any subscription errors
for (pubkey, result) in
pubkeys_to_subscribe.iter().zip(subscription_results.iter())
{
if let Err(err) = result {
error!("Failed to subscribe to account {}: {}", pubkey, err);
}
}

let ata_results = ata_join_set.join_all().await;

for result in ata_results {
Expand Down
281 changes: 166 additions & 115 deletions magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,131 +154,182 @@ where
mut subscription_updates: mpsc::Receiver<ForwardedSubscriptionUpdate>,
) {
tokio::spawn(async move {
while let Some(update) = subscription_updates.recv().await {
trace!("FetchCloner received subscription update for {} at slot {}",
update.pubkey, update.account.slot());
let pubkey = update.pubkey;

// Process each subscription update concurrently to avoid blocking on delegation
// record fetches. This allows multiple updates to be processed in parallel.
let this = Arc::clone(&self);
tokio::spawn(async move {
let (resolved_account, deleg_record) =
this.resolve_account_to_clone_from_forwarded_sub_with_unsubscribe(update)
.await;
if let Some(account) = resolved_account {
// Ensure that the subscription update isn't out of order, i.e. we don't already
// hold a newer version of the account in our bank
let out_of_order_slot = this
.accounts_bank
.get_account(&pubkey)
.and_then(|in_bank| {
if in_bank.remote_slot()
>= account.remote_slot()
{
Some(in_bank.remote_slot())
} else {
None
}
});
if let Some(in_bank_slot) = out_of_order_slot {
warn!(
"Ignoring out-of-order subscription update for {pubkey}: bank slot {in_bank_slot}, update slot {}",
account.remote_slot()
);
return;
}

if let Some(in_bank) =
this.accounts_bank.get_account(&pubkey)
{
if in_bank.undelegating() {
// We expect the account to still be delegated, but with the delegation
// program owner
debug!("Received update for undelegating account {pubkey} \
in_bank.delegated={}, \
in_bank.owner={}, \
in_bank.remote_slot={}, \
chain.delegated={}, \
chain.owner={}, \
chain.remote_slot={}",
in_bank.delegated(),
in_bank.owner(),
in_bank.remote_slot(),
account.delegated(),
account.owner(),
account.remote_slot()
);
let mut pending_tasks: JoinSet<()> = JoinSet::new();

loop {
// Try to receive a new update without blocking to drain tasks first if available
match subscription_updates.try_recv() {
Ok(update) => {
trace!("FetchCloner received subscription update for {} at slot {}",
update.pubkey, update.account.slot());
let pubkey = update.pubkey;

// Process each subscription update concurrently to avoid blocking on delegation
// record fetches. This allows multiple updates to be processed in parallel.
let this = Arc::clone(&self);
pending_tasks.spawn(async move {
Self::process_subscription_update(
&this, pubkey, update,
)
.await;
});

// This will only be true in the following case:
// 1. a commit was triggered for the account
// 2. a commit + undelegate was triggered for the account -> undelegating
// 3. we receive the update for (1.)
//
// Thus our state is more up to date and we don't need to update our
// bank.
if account_still_undelegating_on_chain(
&pubkey,
account.delegated(),
in_bank.remote_slot(),
deleg_record,
&this.validator_pubkey,
) {
return;
}
} else if in_bank.owner().eq(&dlp::id()) {
debug!(
"Received update for {pubkey} owned by deleg program not marked as undelegating"
);
// Drain any completed tasks before continuing
while let Some(result) = pending_tasks.try_join_next() {
if let Err(e) = result {
error!(
"Subscription update task panicked: {e:?}"
);
}
} else {
warn!(
"Received update for {pubkey} which is not in bank"
);
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
// No messages available - wait for either a new message or task completion
tokio::select! {
maybe_update = subscription_updates.recv() => {
let Some(update) = maybe_update else {
// Channel closed, drain remaining tasks and exit
while pending_tasks.join_next().await.is_some() {}
break;
};
Comment on lines +190 to +194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Inconsistent panic handling in channel-close drain loops.

The drain loops at lines 192 and 216 silently discard task results (including panics), while the drain loops at lines 178-184 and 206-208 properly log panicked tasks. This inconsistency could hide critical issues when the subscription channel closes.

🔎 Proposed fix to log panics during drain
                             let Some(update) = maybe_update else {
                                 // Channel closed, drain remaining tasks and exit
-                                while pending_tasks.join_next().await.is_some() {}
+                                while let Some(result) = pending_tasks.join_next().await {
+                                    if let Err(e) = result {
+                                        error!("Subscription update task panicked during drain: {e:?}");
+                                    }
+                                }
                                 break;
                             };
                     Err(
                         tokio::sync::mpsc::error::TryRecvError::Disconnected,
                     ) => {
                         // Channel closed, drain remaining tasks and exit
-                        while pending_tasks.join_next().await.is_some() {}
+                        while let Some(result) = pending_tasks.join_next().await {
+                            if let Err(e) = result {
+                                error!("Subscription update task panicked during drain: {e:?}");
+                            }
+                        }
                         break;
                     }

Also applies to: 215-218


// Determine if delegated to another validator
let delegated_to_other = deleg_record
.as_ref()
.and_then(|dr| this.get_delegated_to_other(dr));

// Once we clone an account that is delegated to us we no longer need
// to receive updates for it from chain
// The subscription will be turned back on once the committor service schedules
// a commit for it that includes undelegation
if account.delegated() {
if let Err(err) = this
.remote_account_provider
.unsubscribe(&pubkey)
.await
{
error!(
"Failed to unsubscribe from delegated account {pubkey}: {err}"
);
trace!("FetchCloner received subscription update for {} at slot {}",
update.pubkey, update.account.slot());
let pubkey = update.pubkey;

let this = Arc::clone(&self);
pending_tasks.spawn(async move {
Self::process_subscription_update(&this, pubkey, update).await;
});
}
Some(result) = pending_tasks.join_next(), if !pending_tasks.is_empty() => {
if let Err(e) = result {
error!("Subscription update task panicked: {e:?}");
}
}
}
}
Err(
tokio::sync::mpsc::error::TryRecvError::Disconnected,
) => {
// Channel closed, drain remaining tasks and exit
while pending_tasks.join_next().await.is_some() {}
break;
}
}
}
});
}

if account.executable() {
this.handle_executable_sub_update(pubkey, account)
.await;
} else if let Err(err) = this
.cloner
.clone_account(AccountCloneRequest {
pubkey,
account,
commit_frequency_ms: None,
delegated_to_other,
})
.await
{
error!(
"Failed to clone account {pubkey} into bank: {err}"
);
}
/// Process a single subscription update
async fn process_subscription_update(
this: &Arc<Self>,
pubkey: Pubkey,
update: ForwardedSubscriptionUpdate,
) {
let (resolved_account, deleg_record) = this
.resolve_account_to_clone_from_forwarded_sub_with_unsubscribe(
update,
)
.await;
if let Some(account) = resolved_account {
// Ensure that the subscription update isn't out of order, i.e. we don't already
// hold a newer version of the account in our bank
let out_of_order_slot =
this.accounts_bank.get_account(&pubkey).and_then(|in_bank| {
if in_bank.remote_slot() >= account.remote_slot() {
Some(in_bank.remote_slot())
} else {
None
}
});
if let Some(in_bank_slot) = out_of_order_slot {
warn!(
"Ignoring out-of-order subscription update for {pubkey}: bank slot {in_bank_slot}, update slot {}",
account.remote_slot()
);
return;
}
});

if let Some(in_bank) = this.accounts_bank.get_account(&pubkey) {
if in_bank.undelegating() {
// We expect the account to still be delegated, but with the delegation
// program owner
debug!(
"Received update for undelegating account {pubkey} \
in_bank.delegated={}, \
in_bank.owner={}, \
in_bank.remote_slot={}, \
chain.delegated={}, \
chain.owner={}, \
chain.remote_slot={}",
in_bank.delegated(),
in_bank.owner(),
in_bank.remote_slot(),
account.delegated(),
account.owner(),
account.remote_slot()
);

// This will only be true in the following case:
// 1. a commit was triggered for the account
// 2. a commit + undelegate was triggered for the account -> undelegating
// 3. we receive the update for (1.)
//
// Thus our state is more up to date and we don't need to update our
// bank.
if account_still_undelegating_on_chain(
&pubkey,
account.delegated(),
in_bank.remote_slot(),
deleg_record,
&this.validator_pubkey,
) {
return;
}
} else if in_bank.owner().eq(&dlp::id()) {
debug!(
"Received update for {pubkey} owned by deleg program not marked as undelegating"
);
}
} else {
warn!("Received update for {pubkey} which is not in bank");
}

// Determine if delegated to another validator
let delegated_to_other = deleg_record
.as_ref()
.and_then(|dr| this.get_delegated_to_other(dr));

// Once we clone an account that is delegated to us we no longer need
// to receive updates for it from chain
// The subscription will be turned back on once the committor service schedules
// a commit for it that includes undelegation
if account.delegated() {
if let Err(err) =
this.remote_account_provider.unsubscribe(&pubkey).await
{
error!(
"Failed to unsubscribe from delegated account {pubkey}: {err}"
);
}
}

if account.executable() {
this.handle_executable_sub_update(pubkey, account).await;
} else if let Err(err) = this
.cloner
.clone_account(AccountCloneRequest {
pubkey,
account,
commit_frequency_ms: None,
delegated_to_other,
})
.await
{
error!("Failed to clone account {pubkey} into bank: {err}");
}
}
}

async fn handle_executable_sub_update(
Expand Down
Loading
Loading