Skip to content

Commit 9fafe0a

Browse files
authored
auto max in flight for delegated EOA (#32)
* add max in flight limit for delegated account * add manual reset scheduling
1 parent 3af720f commit 9fafe0a

File tree

14 files changed

+242
-76
lines changed

14 files changed

+242
-76
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,5 +260,5 @@ impl ThirdwebChainConfig<'_> {
260260
}
261261

262262
pub trait ChainService {
263-
fn get_chain(&self, chain_id: u64) -> Result<impl Chain, EngineError>;
263+
fn get_chain(&self, chain_id: u64) -> Result<impl Chain + Clone, EngineError>;
264264
}

executors/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ uuid = { version = "1.17.0", features = ["v4"] }
2424
chrono = "0.4.41"
2525
tokio = { version = "1.45.0", features = ["full"] }
2626
futures = "0.3.31"
27+
moka = { version = "0.12.10", features = ["future"] }
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use std::ops::Deref;
2+
3+
use alloy::primitives::Address;
4+
use engine_core::{chain::Chain, error::EngineError};
5+
use engine_eip7702_core::delegated_account::DelegatedAccount;
6+
use moka::future::Cache;
7+
8+
#[derive(Hash, Eq, PartialEq)]
9+
pub struct AuthorizationCacheKey {
10+
eoa_address: Address,
11+
chain_id: u64,
12+
}
13+
14+
#[derive(Clone)]
15+
pub struct EoaAuthorizationCache {
16+
pub inner: moka::future::Cache<AuthorizationCacheKey, bool>,
17+
}
18+
19+
impl EoaAuthorizationCache {
20+
pub fn new(cache: Cache<AuthorizationCacheKey, bool>) -> Self {
21+
Self { inner: cache }
22+
}
23+
24+
pub async fn is_minimal_account<C: Chain>(
25+
&self,
26+
delegated_account: &DelegatedAccount<C>,
27+
) -> Result<bool, EngineError> {
28+
self.inner
29+
.try_get_with(
30+
AuthorizationCacheKey {
31+
eoa_address: delegated_account.eoa_address,
32+
chain_id: delegated_account.chain.chain_id(),
33+
},
34+
delegated_account.is_minimal_account(),
35+
)
36+
.await
37+
.map_err(|e| e.deref().clone())
38+
}
39+
}

executors/src/eoa/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod authorization_cache;
12
pub mod error_classifier;
23
pub mod events;
34
pub mod store;

executors/src/eoa/store/atomic.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ impl AtomicEoaExecutorStore {
506506
let optimistic_key = self.optimistic_transaction_count_key_name();
507507
let cached_nonce_key = self.last_transaction_count_key_name();
508508
let recycled_key = self.recycled_nonces_zset_name();
509+
let manual_reset_key = self.manual_reset_key_name();
509510

510511
// Update health data only if it exists
511512
if let Some(ref health_json) = health_update {
@@ -521,6 +522,9 @@ impl AtomicEoaExecutorStore {
521522

522523
// Reset the recycled nonces
523524
pipeline.del(recycled_key);
525+
526+
// Delete the manual reset key
527+
pipeline.del(&manual_reset_key);
524528
})
525529
.await
526530
}

executors/src/eoa/store/mod.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,19 @@ impl EoaExecutorStoreKeys {
274274
None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa),
275275
}
276276
}
277+
278+
/// Manual reset key name.
279+
///
280+
/// This holds a timestamp if a manual reset is scheduled.
281+
pub fn manual_reset_key_name(&self) -> String {
282+
match &self.namespace {
283+
Some(ns) => format!(
284+
"{ns}:eoa_executor:pending_manual_reset:{}:{}",
285+
self.chain_id, self.eoa
286+
),
287+
None => format!("eoa_executor:pending_manual_reset:{}:{}", self.chain_id, self.eoa),
288+
}
289+
}
277290
}
278291

279292
impl EoaExecutorStore {
@@ -341,6 +354,7 @@ impl From<BorrowedTransactionData> for SubmittedTransactionDehydrated {
341354
transaction_hash: data.signed_transaction.hash().to_string(),
342355
transaction_id: data.transaction_id.clone(),
343356
queued_at: data.queued_at,
357+
submitted_at: EoaExecutorStore::now(),
344358
}
345359
}
346360
}
@@ -721,6 +735,15 @@ impl EoaExecutorStore {
721735
Ok(())
722736
}
723737

738+
/// Schedule a manual reset for the EOA
739+
pub async fn schedule_manual_reset(&self) -> Result<(), TransactionStoreError> {
740+
let manual_reset_key = self.manual_reset_key_name();
741+
let mut conn = self.redis.clone();
742+
conn.set::<_, _, ()>(&manual_reset_key, EoaExecutorStore::now())
743+
.await?;
744+
Ok(())
745+
}
746+
724747
/// Get count of submitted transactions awaiting confirmation
725748
pub async fn get_submitted_transactions_count(&self) -> Result<u64, TransactionStoreError> {
726749
let submitted_key = self.submitted_transactions_zset_name();
@@ -752,7 +775,7 @@ impl EoaExecutorStore {
752775
}
753776

754777
/// Get the current time in milliseconds
755-
///
778+
///
756779
/// Used as the canonical time representation for this store
757780
pub fn now() -> u64 {
758781
chrono::Utc::now().timestamp_millis().max(0) as u64
@@ -795,11 +818,13 @@ impl EoaExecutorStore {
795818
}
796819

797820
/// Get all submitted transactions (raw data)
798-
pub async fn get_all_submitted_transactions(&self) -> Result<Vec<SubmittedTransactionDehydrated>, TransactionStoreError> {
821+
pub async fn get_all_submitted_transactions(
822+
&self,
823+
) -> Result<Vec<SubmittedTransactionDehydrated>, TransactionStoreError> {
799824
let submitted_key = self.submitted_transactions_zset_name();
800825
let mut conn = self.redis.clone();
801826

802-
let submitted_data: Vec<SubmittedTransactionStringWithNonce> =
827+
let submitted_data: Vec<SubmittedTransactionStringWithNonce> =
803828
conn.zrange_withscores(&submitted_key, 0, -1).await?;
804829

805830
let submitted_txs: Vec<SubmittedTransactionDehydrated> =
@@ -809,7 +834,10 @@ impl EoaExecutorStore {
809834
}
810835

811836
/// Get attempts count for a specific transaction
812-
pub async fn get_transaction_attempts_count(&self, transaction_id: &str) -> Result<u64, TransactionStoreError> {
837+
pub async fn get_transaction_attempts_count(
838+
&self,
839+
transaction_id: &str,
840+
) -> Result<u64, TransactionStoreError> {
813841
let attempts_key = self.transaction_attempts_list_name(transaction_id);
814842
let mut conn = self.redis.clone();
815843

@@ -818,12 +846,15 @@ impl EoaExecutorStore {
818846
}
819847

820848
/// Get all transaction attempts for a specific transaction
821-
pub async fn get_transaction_attempts(&self, transaction_id: &str) -> Result<Vec<TransactionAttempt>, TransactionStoreError> {
849+
pub async fn get_transaction_attempts(
850+
&self,
851+
transaction_id: &str,
852+
) -> Result<Vec<TransactionAttempt>, TransactionStoreError> {
822853
let attempts_key = self.transaction_attempts_list_name(transaction_id);
823854
let mut conn = self.redis.clone();
824855

825856
let attempts_data: Vec<String> = conn.lrange(&attempts_key, 0, -1).await?;
826-
857+
827858
let mut attempts = Vec::new();
828859
for attempt_json in attempts_data {
829860
let attempt: TransactionAttempt = serde_json::from_str(&attempt_json)?;
@@ -832,6 +863,14 @@ impl EoaExecutorStore {
832863

833864
Ok(attempts)
834865
}
866+
867+
pub async fn is_manual_reset_scheduled(&self) -> Result<bool, TransactionStoreError> {
868+
let manual_reset_key = self.manual_reset_key_name();
869+
let mut conn = self.redis.clone();
870+
871+
let manual_reset: Option<u64> = conn.get(&manual_reset_key).await?;
872+
Ok(manual_reset.is_some())
873+
}
835874
}
836875

837876
// Additional error types

executors/src/eoa/store/submitted.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,13 @@ pub struct SubmittedNoopTransaction {
3838
pub transaction_hash: String,
3939
}
4040

41+
/// String representation is: {transaction_hash}:{transaction_id}:{queued_at}:{submitted_at}
4142
pub type SubmittedTransactionStringWithNonce = (String, u64);
4243

4344
impl SubmittedNoopTransaction {
4445
pub fn to_redis_string_with_nonce(&self) -> SubmittedTransactionStringWithNonce {
4546
(
46-
format!("{}:{}:0", self.transaction_hash, NO_OP_TRANSACTION_ID),
47+
format!("{}:{}:0:0", self.transaction_hash, NO_OP_TRANSACTION_ID),
4748
self.nonce,
4849
)
4950
}
@@ -91,6 +92,7 @@ pub struct SubmittedTransactionDehydrated {
9192
pub nonce: u64,
9293
pub transaction_hash: String,
9394
pub transaction_id: String,
95+
pub submitted_at: u64,
9496
pub queued_at: u64,
9597
}
9698

@@ -100,21 +102,42 @@ impl SubmittedTransactionDehydrated {
100102
.iter()
101103
.filter_map(|tx| {
102104
let parts: Vec<&str> = tx.0.split(':').collect();
105+
// this exists for backwards compatibility with old transactions
106+
// remove after sufficient time for old transactions to be cleaned up
103107
if parts.len() == 3 {
104108
if let Ok(queued_at) = parts[2].parse::<u64>() {
105109
Some(SubmittedTransactionDehydrated {
106110
transaction_hash: parts[0].to_string(),
107111
transaction_id: parts[1].to_string(),
112+
submitted_at: 0,
108113
nonce: tx.1,
109114
queued_at,
110115
})
111116
} else {
112117
tracing::error!("Invalid queued_at timestamp: {}", tx.0);
113118
None
114119
}
120+
} else if parts.len() == 4 {
121+
let transaction_hash = parts[0].to_string();
122+
let transaction_id = parts[1].to_string();
123+
let queued_at = parts[2].parse::<u64>();
124+
let submitted_at = parts[3].parse::<u64>();
125+
126+
if let (Ok(queued_at), Ok(submitted_at)) = (queued_at, submitted_at) {
127+
Some(SubmittedTransactionDehydrated {
128+
transaction_hash,
129+
transaction_id,
130+
submitted_at,
131+
nonce: tx.1,
132+
queued_at,
133+
})
134+
} else {
135+
tracing::error!("Invalid queued_at or submitted_at timestamps: {}", tx.0);
136+
None
137+
}
115138
} else {
116139
tracing::error!(
117-
"Invalid transaction format, expected 3 parts separated by ':': {}",
140+
"Invalid transaction format, expected 3 or 4 parts separated by ':': {}",
118141
tx.0
119142
);
120143
None
@@ -137,8 +160,8 @@ impl SubmittedTransactionDehydrated {
137160
pub fn to_redis_string_with_nonce(&self) -> SubmittedTransactionStringWithNonce {
138161
(
139162
format!(
140-
"{}:{}:{}",
141-
self.transaction_hash, self.transaction_id, self.queued_at
163+
"{}:{}:{}:{}",
164+
self.transaction_hash, self.transaction_id, self.queued_at, self.submitted_at
142165
),
143166
self.nonce,
144167
)

0 commit comments

Comments
 (0)