Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 42 additions & 65 deletions magicblock-accounts/src/scheduled_commits_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use magicblock_chainlink::{
Chainlink,
};
use magicblock_committor_service::{
intent_execution_manager::{
BroadcastedIntentExecutionResult, ExecutionOutputWrapper,
},
intent_execution_manager::BroadcastedIntentExecutionResult,
intent_executor::ExecutionOutput,
types::{ScheduledBaseIntentWrapper, TriggerType},
BaseIntentCommittor, CommittorService,
Expand Down Expand Up @@ -241,79 +239,56 @@ impl ScheduledCommitsProcessorImpl {
continue;
};

match execution_result {
Ok(value) => {
Self::process_intent_result(
intent_id,
&internal_transaction_scheduler,
value,
intent_meta,
)
.await;
}
Err((_, _, err)) => {
match err.as_ref() {
&magicblock_committor_service::intent_executor::error::IntentExecutorError::EmptyIntentError => {
warn!("Empty intent was scheduled!");
Self::process_empty_intent(
intent_id,
&internal_transaction_scheduler,
intent_meta
).await;
}
_ => {
error!(
"Failed to commit in slot: {}, blockhash: {}. {:?}",
intent_meta.slot, intent_meta.blockhash, err
);
}
}
}
}
Self::process_intent_result(
intent_id,
&internal_transaction_scheduler,
execution_result,
intent_meta,
)
.await;
}
}

async fn process_intent_result(
intent_id: u64,
internal_transaction_scheduler: &TransactionSchedulerHandle,
execution_outcome: ExecutionOutputWrapper,
result: BroadcastedIntentExecutionResult,
mut intent_meta: ScheduledBaseIntentMeta,
) {
let chain_signatures = match execution_outcome.output {
ExecutionOutput::SingleStage(signature) => vec![signature],
ExecutionOutput::TwoStage {
commit_signature,
finalize_signature,
} => vec![commit_signature, finalize_signature],
};
let intent_sent_transaction =
std::mem::take(&mut intent_meta.intent_sent_transaction);
let sent_commit =
Self::build_sent_commit(intent_id, chain_signatures, intent_meta);
register_scheduled_commit_sent(sent_commit);
match internal_transaction_scheduler
.execute(intent_sent_transaction)
.await
{
Ok(signature) => debug!(
"Signaled sent commit with internal signature: {:?}",
signature
),
Err(err) => {
error!("Failed to signal sent commit via transaction: {}", err);
let error_message = result
.as_ref()
.err()
.map(|(_, _, err)| format!("{:?}", err));
let chain_signatures = match result {
Ok(execution_outcome) => match execution_outcome.output {
ExecutionOutput::SingleStage(signature) => vec![signature],
ExecutionOutput::TwoStage {
commit_signature,
finalize_signature,
} => vec![commit_signature, finalize_signature],
},
Err((_, _, err)) => {
error!(
"Failed to commit in slot: {}, blockhash: {}. {:?}",
intent_meta.slot, intent_meta.blockhash, err
);
err.signatures()
.map(|(commit, finalize)| {
finalize
.map(|finalize| vec![commit, finalize])
.unwrap_or(vec![commit])
})
.unwrap_or(vec![])
}
}
}

async fn process_empty_intent(
intent_id: u64,
internal_transaction_scheduler: &TransactionSchedulerHandle,
mut intent_meta: ScheduledBaseIntentMeta,
) {
};
let intent_sent_transaction =
std::mem::take(&mut intent_meta.intent_sent_transaction);
let sent_commit =
Self::build_sent_commit(intent_id, vec![], intent_meta);
let sent_commit = Self::build_sent_commit(
intent_id,
chain_signatures,
intent_meta,
error_message,
);
register_scheduled_commit_sent(sent_commit);
match internal_transaction_scheduler
.execute(intent_sent_transaction)
Expand All @@ -333,6 +308,7 @@ impl ScheduledCommitsProcessorImpl {
intent_id: u64,
chain_signatures: Vec<Signature>,
intent_meta: ScheduledBaseIntentMeta,
error_message: Option<String>,
) -> SentCommit {
SentCommit {
message_id: intent_id,
Expand All @@ -343,6 +319,7 @@ impl ScheduledCommitsProcessorImpl {
included_pubkeys: intent_meta.included_pubkeys,
excluded_pubkeys: intent_meta.excluded_pubkeys,
requested_undelegation: intent_meta.requested_undelegation,
error_message,
}
}
}
Expand Down
93 changes: 70 additions & 23 deletions magicblock-committor-service/src/intent_executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub enum IntentExecutorError {
EmptyIntentError,
#[error("User supplied actions are ill-formed: {0}. {:?}", .1)]
ActionsError(#[source] TransactionError, Option<Signature>),
#[error("Invalid undelegation: {0}. {:?}", .1)]
UndelegationError(#[source] TransactionError, Option<Signature>),
#[error("Accounts committed with an invalid Commit id: {0}. {:?}", .1)]
CommitIDError(#[source] TransactionError, Option<Signature>),
#[error("Max instruction trace length exceeded: {0}. {:?}", .1)]
Expand Down Expand Up @@ -94,11 +96,42 @@ impl IntentExecutorError {
err,
signature,
) => IntentExecutorError::CommitIDError(err, signature),
TransactionStrategyExecutionError::UndelegationError(
err,
signature,
) => IntentExecutorError::UndelegationError(err, signature),
TransactionStrategyExecutionError::InternalError(err) => {
converter(err)
}
}
}

pub fn signatures(&self) -> Option<(Signature, Option<Signature>)> {
match self {
IntentExecutorError::CpiLimitError(_, signature)
| IntentExecutorError::ActionsError(_, signature)
| IntentExecutorError::CommitIDError(_, signature)
| IntentExecutorError::UndelegationError(_, signature)
| IntentExecutorError::FailedToCommitError { signature, err: _ } => {
signature.map(|el| (el, None))
}
IntentExecutorError::FailedCommitPreparationError(err)
| IntentExecutorError::FailedFinalizePreparationError(err) => {
err.signature().map(|el| (el, None))
}
IntentExecutorError::TaskBuilderError(err) => {
err.signature().map(|el| (el, None))
}
IntentExecutorError::FailedToFinalizeError {
err: _,
commit_signature,
finalize_signature,
} => commit_signature.map(|el| (el, *finalize_signature)),
IntentExecutorError::EmptyIntentError
| IntentExecutorError::FailedToFitError
| IntentExecutorError::SignerError(_) => None,
}
}
}

impl metrics::LabelValue for IntentExecutorError {
Expand All @@ -117,6 +150,8 @@ impl metrics::LabelValue for IntentExecutorError {
pub enum TransactionStrategyExecutionError {
#[error("User supplied actions are ill-formed: {0}. {:?}", .1)]
ActionsError(#[source] TransactionError, Option<Signature>),
#[error("Invalid undelegation: {0}. {:?}", .1)]
UndelegationError(#[source] TransactionError, Option<Signature>),
#[error("Accounts committed with an invalid Commit id: {0}. {:?}", .1)]
CommitIDError(#[source] TransactionError, Option<Signature>),
#[error("Max instruction trace length exceeded: {0}. {:?}", .1)]
Expand Down Expand Up @@ -146,14 +181,6 @@ impl TransactionStrategyExecutionError {
dlp::error::DlpError::NonceOutOfOrder as u32;

match err {
// Filter CommitIdError by custom error code
transaction_err @ TransactionError::InstructionError(
_,
InstructionError::Custom(NONCE_OUT_OF_ORDER),
) => Ok(TransactionStrategyExecutionError::CommitIDError(
transaction_err,
signature,
)),
// Some tx may use too much CPIs and we can handle it in certain cases
transaction_err @ TransactionError::InstructionError(
_,
Expand All @@ -162,25 +189,45 @@ impl TransactionStrategyExecutionError {
transaction_err,
signature,
)),
// Filter ActionError, we can attempt recovery by stripping away actions
transaction_err @ TransactionError::InstructionError(index, _) => {
// Map per-task InstructionError into CommitID / Actions / Undelegation errors when possible
TransactionError::InstructionError(index, instruction_err) => {
let tx_err_helper = |instruction_err| -> TransactionError {
TransactionError::InstructionError(index, instruction_err)
};
let Some(action_index) = index.checked_sub(OFFSET) else {
return Err(transaction_err);
return Err(tx_err_helper(instruction_err));
};

let Some(task_type) = tasks
.get(action_index as usize)
.map(|task| task.task_type())
else {
return Err(tx_err_helper(instruction_err));
};

// If index corresponds to an Action -> ActionsError; otherwise -> InternalError.
if matches!(
tasks
.get(action_index as usize)
.map(|task| task.task_type()),
Some(TaskType::Action)
) {
Ok(TransactionStrategyExecutionError::ActionsError(
transaction_err,
match (task_type, instruction_err) {
(
TaskType::Commit,
InstructionError::Custom(NONCE_OUT_OF_ORDER),
) => Ok(TransactionStrategyExecutionError::CommitIDError(
tx_err_helper(InstructionError::Custom(
NONCE_OUT_OF_ORDER,
)),
signature,
))
} else {
Err(transaction_err)
)),
(TaskType::Action, instruction_err) => {
Ok(TransactionStrategyExecutionError::ActionsError(
tx_err_helper(instruction_err),
signature,
))
}
(TaskType::Undelegate, instruction_err) => Ok(
TransactionStrategyExecutionError::UndelegationError(
tx_err_helper(instruction_err),
signature,
),
),
(_, instruction_err) => Err(tx_err_helper(instruction_err)),
}
}
// This means transaction failed to other reasons that we don't handle - propagate
Expand Down
34 changes: 33 additions & 1 deletion magicblock-committor-service/src/intent_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,35 @@ where
(commit_strategy, finalize_strategy, to_cleanup)
}

/// Handles actions error, stripping away actions
/// Returns [`TransactionStrategy`] to be cleaned up
fn handle_undelegation_error(
&self,
strategy: &mut TransactionStrategy,
) -> TransactionStrategy {
let position = strategy
.optimized_tasks
.iter()
.position(|el| el.task_type() == TaskType::Undelegate);

if let Some(position) = position {
// Remove everything after undelegation including post undelegation actions
let removed_task =
strategy.optimized_tasks.drain(position..).collect();
let old_alts =
strategy.dummy_revaluate_alts(&self.authority.pubkey());
TransactionStrategy {
optimized_tasks: removed_task,
lookup_tables_keys: old_alts,
}
} else {
TransactionStrategy {
optimized_tasks: vec![],
lookup_tables_keys: vec![],
}
}
}

/// Shared helper for sending transactions
async fn send_prepared_message(
&self,
Expand Down Expand Up @@ -520,7 +549,8 @@ where
}
Err(IntentExecutorError::CommitIDError(_, _))
| Err(IntentExecutorError::ActionsError(_, _))
| Err(IntentExecutorError::CpiLimitError(_, _)) => None,
| Err(IntentExecutorError::CpiLimitError(_, _))
| Err(IntentExecutorError::UndelegationError(_, _)) => None,
Err(IntentExecutorError::EmptyIntentError)
| Err(IntentExecutorError::FailedToFitError)
| Err(IntentExecutorError::TaskBuilderError(_))
Expand Down Expand Up @@ -758,6 +788,8 @@ where
let result = self.execute_inner(base_intent, &persister).await;
if let Some(pubkeys) = pubkeys {
// Reset TaskInfoFetcher, as cache could become invalid
// NOTE: if undelegation was removed - we still reset
// We assume its safe since all consecutive commits will fail
if result.is_err() || is_undelegate {
self.task_info_fetcher.reset(ResetType::Specific(&pubkeys));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ where
.await?;
Ok(ControlFlow::Continue(to_cleanup))
}
TransactionStrategyExecutionError::UndelegationError(_, _) => {
// Here we patch strategy for it to be retried in next iteration
// & we also record data that has to be cleaned up after patch
let to_cleanup =
self.handle_undelegation_error(transaction_strategy);
Ok(ControlFlow::Continue(to_cleanup))
}
TransactionStrategyExecutionError::CpiLimitError(_, _) => {
// Can't be handled in scope of single stage execution
// We signal flow break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use lru::LruCache;
use magicblock_metrics::metrics;
use magicblock_rpc_client::{MagicBlockRpcClientError, MagicblockRpcClient};
use solana_pubkey::Pubkey;
use solana_sdk::signature::Signature;

const NUM_FETCH_RETRIES: NonZeroUsize =
unsafe { NonZeroUsize::new_unchecked(5) };
Expand Down Expand Up @@ -276,4 +277,14 @@ pub enum TaskInfoFetcherError {
MagicBlockRpcClientError(#[from] MagicBlockRpcClientError),
}

impl TaskInfoFetcherError {
pub fn signature(&self) -> Option<Signature> {
match self {
Self::MetadataNotFoundError(_) => None,
Self::InvalidAccountDataError(_) => None,
Self::MagicBlockRpcClientError(err) => err.signature(),
}
}
}

pub type TaskInfoFetcherResult<T, E = TaskInfoFetcherError> = Result<T, E>;
Loading
Loading