Skip to content

Commit cf6229d

Browse files
committed
Use TaskInfoFetcher
1 parent c07a177 commit cf6229d

File tree

7 files changed

+114
-103
lines changed

7 files changed

+114
-103
lines changed

magicblock-committor-service/src/intent_executor/task_info_fetcher.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use dlp::{
88
};
99
use log::{error, warn};
1010
use lru::LruCache;
11-
use magicblock_rpc_client::{MagicBlockRpcClientError, MagicblockRpcClient};
11+
use magicblock_rpc_client::{
12+
MagicBlockRpcClientError, MagicBlockRpcClientResult, MagicblockRpcClient,
13+
};
14+
use solana_account::Account;
1215
use solana_pubkey::Pubkey;
1316

1417
const NUM_FETCH_RETRIES: NonZeroUsize =
@@ -35,6 +38,13 @@ pub trait TaskInfoFetcher: Send + Sync + 'static {
3538

3639
/// Resets cache for some or all accounts
3740
fn reset(&self, reset_type: ResetType);
41+
42+
async fn get_base_account(
43+
&self,
44+
_pubkey: &Pubkey,
45+
) -> MagicBlockRpcClientResult<Option<Account>> {
46+
Ok(None) // AccountNotFound
47+
}
3848
}
3949

4050
pub enum ResetType<'a> {
@@ -262,6 +272,13 @@ impl TaskInfoFetcher for CacheTaskInfoFetcher {
262272
}
263273
}
264274
}
275+
276+
async fn get_base_account(
277+
&self,
278+
pubkey: &Pubkey,
279+
) -> MagicBlockRpcClientResult<Option<Account>> {
280+
self.rpc_client.get_account(pubkey).await
281+
}
265282
}
266283

267284
#[derive(thiserror::Error, Debug)]
@@ -275,3 +292,37 @@ pub enum TaskInfoFetcherError {
275292
}
276293

277294
pub type TaskInfoFetcherResult<T, E = TaskInfoFetcherError> = Result<T, E>;
295+
296+
#[cfg(any(test, feature = "dev-context-only-utils"))]
297+
pub struct NullTaskInfoFetcher;
298+
299+
#[cfg(any(test, feature = "dev-context-only-utils"))]
300+
#[async_trait]
301+
impl TaskInfoFetcher for NullTaskInfoFetcher {
302+
async fn fetch_next_commit_ids(
303+
&self,
304+
pubkeys: &[Pubkey],
305+
) -> TaskInfoFetcherResult<HashMap<Pubkey, u64>> {
306+
Ok(pubkeys.iter().map(|pubkey| (*pubkey, 0)).collect())
307+
}
308+
309+
async fn fetch_rent_reimbursements(
310+
&self,
311+
pubkeys: &[Pubkey],
312+
) -> TaskInfoFetcherResult<Vec<Pubkey>> {
313+
Ok(pubkeys.to_vec())
314+
}
315+
316+
fn peek_commit_id(&self, _pubkey: &Pubkey) -> Option<u64> {
317+
None
318+
}
319+
320+
fn reset(&self, _: ResetType) {}
321+
322+
async fn get_base_account(
323+
&self,
324+
_pubkey: &Pubkey,
325+
) -> MagicBlockRpcClientResult<Option<Account>> {
326+
Ok(None) // AccountNotFound
327+
}
328+
}

magicblock-committor-service/src/tasks/account_fetcher.rs

Lines changed: 0 additions & 61 deletions
This file was deleted.

magicblock-committor-service/src/tasks/mod.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use account_fetcher::*;
1+
use std::sync::Arc;
2+
23
use dlp::{
34
args::{CommitDiffArgs, CommitStateArgs},
45
compute_diff,
@@ -23,7 +24,10 @@ use solana_pubkey::Pubkey;
2324
use solana_sdk::instruction::Instruction;
2425
use thiserror::Error;
2526

26-
use crate::tasks::visitor::Visitor;
27+
use crate::{
28+
intent_executor::task_info_fetcher::TaskInfoFetcher,
29+
tasks::visitor::Visitor,
30+
};
2731

2832
pub mod args_task;
2933
pub mod buffer_task;
@@ -33,8 +37,6 @@ pub(crate) mod task_visitors;
3337
pub mod utils;
3438
pub mod visitor;
3539

36-
pub mod account_fetcher;
37-
3840
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
3941
pub enum TaskType {
4042
Commit,
@@ -123,20 +125,25 @@ impl CommitTask {
123125
// or 4 u64 fields.
124126
const COMMIT_STATE_SIZE_THRESHOLD: usize = 256;
125127

126-
pub async fn new(
128+
pub async fn new<C: TaskInfoFetcher>(
127129
commit_id: u64,
128130
allow_undelegation: bool,
129131
committed_account: CommittedAccount,
130-
account_fetcher: AccountFetcher,
132+
task_info_fetcher: &Arc<C>,
131133
) -> Self {
132134
let base_account = if committed_account.account.data.len()
133135
> CommitTask::COMMIT_STATE_SIZE_THRESHOLD
134136
{
135-
match account_fetcher
136-
.fetch_account(&committed_account.pubkey)
137+
match task_info_fetcher
138+
.get_base_account(&committed_account.pubkey)
137139
.await
138140
{
139-
Ok(account) => Some(account),
141+
Ok(Some(account)) => Some(account),
142+
Ok(None) => {
143+
log::warn!("AccountNotFound for commit_diff, pubkey: {}, commit_id: {}, Falling back to commit_state.",
144+
committed_account.pubkey, commit_id);
145+
None
146+
}
140147
Err(e) => {
141148
log::warn!("Failed to fetch base account for commit diff, pubkey: {}, commit_id: {}, error: {}. Falling back to commit_state.",
142149
committed_account.pubkey, commit_id, e);
@@ -411,10 +418,13 @@ mod serialization_safety_test {
411418
};
412419
use solana_account::Account;
413420

414-
use crate::tasks::{
415-
args_task::{ArgsTask, ArgsTaskType},
416-
buffer_task::{BufferTask, BufferTaskType},
417-
*,
421+
use crate::{
422+
intent_executor::task_info_fetcher::NullTaskInfoFetcher,
423+
tasks::{
424+
args_task::{ArgsTask, ArgsTaskType},
425+
buffer_task::{BufferTask, BufferTaskType},
426+
*,
427+
},
418428
};
419429

420430
// Test all ArgsTask variants
@@ -437,7 +447,7 @@ mod serialization_safety_test {
437447
rent_epoch: 0,
438448
},
439449
},
440-
AccountFetcher::new(),
450+
&Arc::new(NullTaskInfoFetcher),
441451
)
442452
.await,
443453
)
@@ -501,7 +511,7 @@ mod serialization_safety_test {
501511
rent_epoch: 0,
502512
},
503513
},
504-
AccountFetcher::new(),
514+
&Arc::new(NullTaskInfoFetcher),
505515
)
506516
.await,
507517
));
@@ -529,7 +539,7 @@ mod serialization_safety_test {
529539
rent_epoch: 0,
530540
},
531541
},
532-
AccountFetcher::new(),
542+
&Arc::new(NullTaskInfoFetcher),
533543
)
534544
.await,
535545
));

magicblock-committor-service/src/tasks/task_builder.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use magicblock_program::magic_scheduled_base_intent::{
99
};
1010
use solana_pubkey::Pubkey;
1111

12-
use super::account_fetcher::AccountFetcher;
1312
use crate::{
1413
intent_executor::task_info_fetcher::{
1514
TaskInfoFetcher, TaskInfoFetcherError,
@@ -25,14 +24,14 @@ use crate::{
2524
pub trait TasksBuilder {
2625
// Creates tasks for commit stage
2726
async fn commit_tasks<C: TaskInfoFetcher, P: IntentPersister>(
28-
commit_id_fetcher: &Arc<C>,
27+
task_info_fetcher: &Arc<C>,
2928
base_intent: &ScheduledBaseIntent,
3029
persister: &Option<P>,
3130
) -> TaskBuilderResult<Vec<Box<dyn BaseTask>>>;
3231

3332
// Create tasks for finalize stage
3433
async fn finalize_tasks<C: TaskInfoFetcher>(
35-
info_fetcher: &Arc<C>,
34+
task_info_fetcher: &Arc<C>,
3635
base_intent: &ScheduledBaseIntent,
3736
) -> TaskBuilderResult<Vec<Box<dyn BaseTask>>>;
3837
}
@@ -45,7 +44,7 @@ pub struct TaskBuilderImpl;
4544
impl TasksBuilder for TaskBuilderImpl {
4645
/// Returns [`Task`]s for Commit stage
4746
async fn commit_tasks<C: TaskInfoFetcher, P: IntentPersister>(
48-
commit_id_fetcher: &Arc<C>,
47+
task_info_fetcher: &Arc<C>,
4948
base_intent: &ScheduledBaseIntent,
5049
persister: &Option<P>,
5150
) -> TaskBuilderResult<Vec<Box<dyn BaseTask>>> {
@@ -73,7 +72,7 @@ impl TasksBuilder for TaskBuilderImpl {
7372
.iter()
7473
.map(|account| account.pubkey)
7574
.collect::<Vec<_>>();
76-
let commit_ids = commit_id_fetcher
75+
let commit_ids = task_info_fetcher
7776
.fetch_next_commit_ids(&committed_pubkeys)
7877
.await
7978
.map_err(TaskBuilderError::CommitTasksBuildError)?;
@@ -95,7 +94,7 @@ impl TasksBuilder for TaskBuilderImpl {
9594
commit_id,
9695
allow_undelegation,
9796
account.clone(),
98-
AccountFetcher::new(),
97+
task_info_fetcher,
9998
).await);
10099

101100
Box::new(ArgsTask::new(task)) as Box<dyn BaseTask>
@@ -106,7 +105,7 @@ impl TasksBuilder for TaskBuilderImpl {
106105

107106
/// Returns [`Task`]s for Finalize stage
108107
async fn finalize_tasks<C: TaskInfoFetcher>(
109-
info_fetcher: &Arc<C>,
108+
task_info_fetcher: &Arc<C>,
110109
base_intent: &ScheduledBaseIntent,
111110
) -> TaskBuilderResult<Vec<Box<dyn BaseTask>>> {
112111
// Helper to create a finalize task
@@ -169,7 +168,7 @@ impl TasksBuilder for TaskBuilderImpl {
169168
.iter()
170169
.map(|account| account.pubkey)
171170
.collect::<Vec<_>>();
172-
let rent_reimbursements = info_fetcher
171+
let rent_reimbursements = task_info_fetcher
173172
.fetch_rent_reimbursements(&pubkeys)
174173
.await
175174
.map_err(TaskBuilderError::FinalizedTasksBuildError)?;

magicblock-committor-service/src/tasks/task_strategist.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ pub type TaskStrategistResult<T, E = TaskStrategistError> = Result<T, E>;
250250

251251
#[cfg(test)]
252252
mod tests {
253+
use std::sync::Arc;
254+
253255
use futures_util::future::join_all;
254256
use magicblock_program::magic_scheduled_base_intent::{
255257
BaseAction, CommittedAccount, ProgramArgs,
@@ -259,11 +261,9 @@ mod tests {
259261

260262
use super::*;
261263
use crate::{
264+
intent_executor::task_info_fetcher::NullTaskInfoFetcher,
262265
persist::IntentPersisterImpl,
263-
tasks::{
264-
AccountFetcher, BaseActionTask, CommitTask, TaskStrategy,
265-
UndelegateTask,
266-
},
266+
tasks::{BaseActionTask, CommitTask, TaskStrategy, UndelegateTask},
267267
};
268268

269269
// Helper to create a simple commit task
@@ -285,7 +285,7 @@ mod tests {
285285
rent_epoch: 0,
286286
},
287287
},
288-
AccountFetcher::new(),
288+
&Arc::new(NullTaskInfoFetcher),
289289
)
290290
.await,
291291
))

0 commit comments

Comments
 (0)