Skip to content

Commit bf2d156

Browse files
authored
Add EOA metrics tracking and abstraction for degradation and stuck conditions (#40)
- Introduced new `EoaMetrics` struct to encapsulate configuration and provide a clean interface for tracking EOA transaction metrics. - Added metrics for EOA send degradation, confirmation degradation, and stuck conditions, including thresholds for each. - Updated various components to utilize the new `EoaMetrics` abstraction for recording transaction metrics, enhancing observability and performance monitoring. - Configured default thresholds in the server configuration for EOA metrics tracking. These changes improve the robustness of EOA transaction monitoring and facilitate better handling of problematic EOAs.
1 parent 1d4e650 commit bf2d156

File tree

10 files changed

+196
-6
lines changed

10 files changed

+196
-6
lines changed

executors/src/eoa/store/atomic.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ pub trait SafeRedisTransaction: Send + Sync {
6868
pub struct AtomicEoaExecutorStore {
6969
pub store: EoaExecutorStore,
7070
pub worker_id: String,
71+
pub eoa_metrics: crate::metrics::EoaMetrics,
7172
}
7273

7374
impl std::ops::Deref for AtomicEoaExecutorStore {
@@ -600,6 +601,7 @@ impl AtomicEoaExecutorStore {
600601
last_confirmed_nonce,
601602
keys: &self.keys,
602603
webhook_queue,
604+
eoa_metrics: &self.eoa_metrics,
603605
})
604606
.await
605607
}
@@ -616,6 +618,7 @@ impl AtomicEoaExecutorStore {
616618
results,
617619
keys: &self.keys,
618620
webhook_queue,
621+
eoa_metrics: &self.eoa_metrics,
619622
})
620623
.await
621624
}

executors/src/eoa/store/borrowed.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::eoa::{
1212
},
1313
worker::error::EoaExecutorWorkerError,
1414
};
15-
use crate::metrics::{record_transaction_queued_to_sent, current_timestamp_ms, calculate_duration_seconds};
15+
use crate::metrics::{current_timestamp_ms, calculate_duration_seconds, EoaMetrics};
1616
use crate::webhook::{WebhookJobHandler, queue_webhook_envelopes};
1717

1818
#[derive(Debug, Clone)]
@@ -40,6 +40,7 @@ pub struct ProcessBorrowedTransactions<'a> {
4040
pub results: Vec<SubmissionResult>,
4141
pub keys: &'a EoaExecutorStoreKeys,
4242
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
43+
pub eoa_metrics: &'a EoaMetrics,
4344
}
4445

4546
#[derive(Debug, Default)]
@@ -124,7 +125,12 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
124125
result.transaction.queued_at,
125126
sent_timestamp
126127
);
127-
record_transaction_queued_to_sent("eoa", self.keys.chain_id, queued_to_sent_duration);
128+
// Record metrics using the clean EoaMetrics abstraction
129+
self.eoa_metrics.record_transaction_sent(
130+
self.keys.eoa,
131+
self.keys.chain_id,
132+
queued_to_sent_duration
133+
);
128134

129135
// Add to submitted zset
130136
let (submitted_tx_redis_string, nonce) =

executors/src/eoa/store/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ impl EoaExecutorStore {
407407
pub async fn acquire_eoa_lock_aggressively(
408408
self,
409409
worker_id: &str,
410+
eoa_metrics: crate::metrics::EoaMetrics,
410411
) -> Result<AtomicEoaExecutorStore, TransactionStoreError> {
411412
let lock_key = self.eoa_lock_key_name();
412413
let mut conn = self.redis.clone();
@@ -417,6 +418,7 @@ impl EoaExecutorStore {
417418
return Ok(AtomicEoaExecutorStore {
418419
store: self,
419420
worker_id: worker_id.to_string(),
421+
eoa_metrics,
420422
});
421423
}
422424
let conflict_worker_id = conn.get::<_, Option<String>>(&lock_key).await?;
@@ -434,6 +436,7 @@ impl EoaExecutorStore {
434436
Ok(AtomicEoaExecutorStore {
435437
store: self,
436438
worker_id: worker_id.to_string(),
439+
eoa_metrics,
437440
})
438441
}
439442

executors/src/eoa/store/submitted.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
TransactionStoreError, atomic::SafeRedisTransaction,
1717
},
1818
},
19-
metrics::{record_transaction_queued_to_confirmed, current_timestamp_ms, calculate_duration_seconds},
19+
metrics::{current_timestamp_ms, calculate_duration_seconds, EoaMetrics},
2020
webhook::{WebhookJobHandler, queue_webhook_envelopes},
2121
};
2222

@@ -198,6 +198,7 @@ pub struct CleanSubmittedTransactions<'a> {
198198
pub confirmed_transactions: &'a [ConfirmedTransaction],
199199
pub keys: &'a EoaExecutorStoreKeys,
200200
pub webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
201+
pub eoa_metrics: &'a EoaMetrics,
201202
}
202203

203204
pub struct CleanAndGetRecycledNonces<'a> {
@@ -336,7 +337,12 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
336337
tx.queued_at,
337338
confirmed_timestamp
338339
);
339-
record_transaction_queued_to_confirmed("eoa", self.keys.chain_id, queued_to_mined_duration);
340+
// Record metrics using the clean EoaMetrics abstraction
341+
self.eoa_metrics.record_transaction_confirmed(
342+
self.keys.eoa,
343+
self.keys.chain_id,
344+
queued_to_mined_duration
345+
);
340346
if !tx.user_request.webhook_options.is_empty() {
341347
let event = EoaExecutorEvent {
342348
transaction_id: tx.transaction_id.clone(),

executors/src/eoa/worker/confirm.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,25 @@ impl<C: Chain> EoaExecutorWorker<C> {
9797
);
9898
}
9999
}
100+
101+
// Check if EOA is stuck and record metric using the clean EoaMetrics abstraction
102+
let time_since_movement_seconds = time_since_movement as f64 / 1000.0;
103+
if self.store.eoa_metrics.is_stuck(time_since_movement) {
104+
tracing::warn!(
105+
time_since_movement = time_since_movement,
106+
stuck_threshold = self.store.eoa_metrics.stuck_threshold_seconds,
107+
eoa = ?self.eoa,
108+
chain_id = self.chain_id,
109+
"EOA is stuck - nonce hasn't moved for too long"
110+
);
111+
112+
// Record stuck EOA metric (low cardinality - only problematic EOAs)
113+
self.store.eoa_metrics.record_stuck_eoa(
114+
self.eoa,
115+
self.chain_id,
116+
time_since_movement_seconds
117+
);
118+
}
100119

101120
tracing::debug!("No nonce progress, still going ahead with confirm flow");
102121
// return Ok(CleanupReport::default());

executors/src/eoa/worker/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::eoa::store::{
2424
AtomicEoaExecutorStore, EoaExecutorStore, EoaExecutorStoreKeys, EoaHealth, SubmissionResult,
2525
};
2626
use crate::metrics::{
27-
calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time,
27+
EoaMetrics, calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time,
2828
};
2929
use crate::webhook::WebhookJobHandler;
3030

@@ -123,6 +123,9 @@ where
123123
pub eoa_signer: Arc<EoaSigner>,
124124
pub max_inflight: u64, // Note: Spec uses MAX_INFLIGHT_PER_EOA constant
125125
pub max_recycled_nonces: u64, // Note: Spec uses MAX_RECYCLED_THRESHOLD constant
126+
127+
// EOA metrics abstraction with encapsulated configuration
128+
pub eoa_metrics: EoaMetrics,
126129
}
127130

128131
impl<CS> DurableExecution for EoaExecutorJobHandler<CS>
@@ -159,7 +162,7 @@ where
159162
data.eoa_address,
160163
data.chain_id,
161164
)
162-
.acquire_eoa_lock_aggressively(&worker_id)
165+
.acquire_eoa_lock_aggressively(&worker_id, self.eoa_metrics.clone())
163166
.await
164167
.map_err(|e| Into::<EoaExecutorWorkerError>::into(e).handle())?;
165168

executors/src/metrics.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ pub struct ExecutorMetrics {
77
pub transaction_queued_to_sent_duration: HistogramVec,
88
pub transaction_queued_to_confirmed_duration: HistogramVec,
99
pub eoa_job_processing_duration: HistogramVec,
10+
// EOA degradation and stuck metrics (low cardinality - only problematic EOAs)
11+
pub eoa_degraded_send_duration: HistogramVec,
12+
pub eoa_degraded_confirmation_duration: HistogramVec,
13+
pub eoa_stuck_duration: HistogramVec,
1014
}
1115

1216
impl ExecutorMetrics {
@@ -39,10 +43,41 @@ impl ExecutorMetrics {
3943
registry
4044
)?;
4145

46+
// EOA degradation and stuck metrics (low cardinality - only problematic EOAs)
47+
let eoa_degraded_send_duration = register_histogram_vec_with_registry!(
48+
HistogramOpts::new(
49+
"tw_engine_executor_eoa_degraded_send_duration_seconds",
50+
"Duration of EOA transactions that exceeded the send degradation threshold"
51+
).buckets(vec![5.0, 10.0, 20.0, 30.0, 60.0, 120.0, 300.0, 600.0]),
52+
&["eoa_address", "chain_id"],
53+
registry
54+
)?;
55+
56+
let eoa_degraded_confirmation_duration = register_histogram_vec_with_registry!(
57+
HistogramOpts::new(
58+
"tw_engine_executor_eoa_degraded_confirmation_duration_seconds",
59+
"Duration of EOA transactions that exceeded the confirmation degradation threshold"
60+
).buckets(vec![30.0, 45.0, 60.0, 120.0, 300.0, 600.0, 1200.0, 1800.0, 3600.0]),
61+
&["eoa_address", "chain_id"],
62+
registry
63+
)?;
64+
65+
let eoa_stuck_duration = register_histogram_vec_with_registry!(
66+
HistogramOpts::new(
67+
"tw_engine_executor_eoa_stuck_duration_seconds",
68+
"Duration since last nonce movement for EOAs that are considered stuck"
69+
).buckets(vec![200.0, 300.0, 600.0, 1200.0, 1800.0, 3600.0, 7200.0, 14400.0]),
70+
&["eoa_address", "chain_id"],
71+
registry
72+
)?;
73+
4274
Ok(ExecutorMetrics {
4375
transaction_queued_to_sent_duration,
4476
transaction_queued_to_confirmed_duration,
4577
eoa_job_processing_duration,
78+
eoa_degraded_send_duration,
79+
eoa_degraded_confirmation_duration,
80+
eoa_stuck_duration,
4681
})
4782
}
4883
}
@@ -116,6 +151,76 @@ pub fn record_eoa_job_processing_time(chain_id: u64, duration_seconds: f64) {
116151
.observe(duration_seconds);
117152
}
118153

154+
/// EOA Metrics abstraction that encapsulates configuration and provides clean interface
155+
#[derive(Debug, Clone)]
156+
pub struct EoaMetrics {
157+
pub send_degradation_threshold_seconds: u64,
158+
pub confirmation_degradation_threshold_seconds: u64,
159+
pub stuck_threshold_seconds: u64,
160+
}
161+
162+
impl EoaMetrics {
163+
/// Create new EoaMetrics with configuration
164+
pub fn new(
165+
send_degradation_threshold_seconds: u64,
166+
confirmation_degradation_threshold_seconds: u64,
167+
stuck_threshold_seconds: u64,
168+
) -> Self {
169+
Self {
170+
send_degradation_threshold_seconds,
171+
confirmation_degradation_threshold_seconds,
172+
stuck_threshold_seconds,
173+
}
174+
}
175+
176+
/// Record EOA transaction send metrics with automatic degradation detection
177+
pub fn record_transaction_sent(&self, eoa_address: alloy::primitives::Address, chain_id: u64, duration_seconds: f64) {
178+
// Always record the regular metric
179+
record_transaction_queued_to_sent("eoa", chain_id, duration_seconds);
180+
181+
// Only record degraded metric if threshold exceeded (low cardinality)
182+
if duration_seconds > self.send_degradation_threshold_seconds as f64 {
183+
let metrics = get_metrics();
184+
metrics.eoa_degraded_send_duration
185+
.with_label_values(&[&eoa_address.to_string(), &chain_id.to_string()])
186+
.observe(duration_seconds);
187+
}
188+
}
189+
190+
/// Record EOA transaction confirmation metrics with automatic degradation detection
191+
pub fn record_transaction_confirmed(&self, eoa_address: alloy::primitives::Address, chain_id: u64, duration_seconds: f64) {
192+
// Always record the regular metric
193+
record_transaction_queued_to_confirmed("eoa", chain_id, duration_seconds);
194+
195+
// Only record degraded metric if threshold exceeded (low cardinality)
196+
if duration_seconds > self.confirmation_degradation_threshold_seconds as f64 {
197+
let metrics = get_metrics();
198+
metrics.eoa_degraded_confirmation_duration
199+
.with_label_values(&[&eoa_address.to_string(), &chain_id.to_string()])
200+
.observe(duration_seconds);
201+
}
202+
}
203+
204+
/// Record stuck EOA metric when nonce hasn't moved for too long
205+
pub fn record_stuck_eoa(&self, eoa_address: alloy::primitives::Address, chain_id: u64, time_since_last_movement_seconds: f64) {
206+
// Only record if EOA is actually stuck (exceeds threshold)
207+
if time_since_last_movement_seconds > self.stuck_threshold_seconds as f64 {
208+
let metrics = get_metrics();
209+
metrics.eoa_stuck_duration
210+
.with_label_values(&[&eoa_address.to_string(), &chain_id.to_string()])
211+
.observe(time_since_last_movement_seconds);
212+
}
213+
}
214+
215+
/// Check if an EOA should be considered stuck based on time since last nonce movement
216+
pub fn is_stuck(&self, time_since_last_movement_ms: u64) -> bool {
217+
let time_since_last_movement_seconds = time_since_last_movement_ms as f64 / 1000.0;
218+
time_since_last_movement_seconds > self.stuck_threshold_seconds as f64
219+
}
220+
}
221+
222+
223+
119224
/// Helper to calculate duration in seconds from unix timestamps (milliseconds)
120225
pub fn calculate_duration_seconds(start_timestamp_ms: u64, end_timestamp_ms: u64) -> f64 {
121226
(end_timestamp_ms.saturating_sub(start_timestamp_ms)) as f64 / 1000.0
@@ -165,11 +270,24 @@ mod tests {
165270
record_transaction_queued_to_confirmed("test", 1, 10.0);
166271
record_eoa_job_processing_time(1, 2.0);
167272

273+
// Test new EOA metrics abstraction
274+
let eoa_metrics = EoaMetrics::new(10, 120, 600);
275+
let test_address = "0x1234567890123456789012345678901234567890".parse().unwrap();
276+
277+
eoa_metrics.record_transaction_sent(test_address, 1, 5.0); // Won't record degradation (below threshold)
278+
eoa_metrics.record_transaction_sent(test_address, 1, 15.0); // Will record degradation (above threshold)
279+
eoa_metrics.record_transaction_confirmed(test_address, 1, 60.0); // Won't record degradation (below threshold)
280+
eoa_metrics.record_transaction_confirmed(test_address, 1, 180.0); // Will record degradation (above threshold)
281+
eoa_metrics.record_stuck_eoa(test_address, 1, 900.0); // Will record stuck EOA
282+
168283
// Test that default metrics can be exported
169284
let metrics_output = export_default_metrics().expect("Should be able to export default metrics");
170285
assert!(metrics_output.contains("tw_engine_executor_transaction_queued_to_sent_duration_seconds"));
171286
assert!(metrics_output.contains("tw_engine_executor_transaction_queued_to_confirmed_duration_seconds"));
172287
assert!(metrics_output.contains("tw_engine_eoa_executor_job_processing_duration_seconds"));
288+
assert!(metrics_output.contains("tw_engine_executor_eoa_degraded_send_duration_seconds"));
289+
assert!(metrics_output.contains("tw_engine_executor_eoa_degraded_confirmation_duration_seconds"));
290+
assert!(metrics_output.contains("tw_engine_executor_eoa_stuck_duration_seconds"));
173291
}
174292

175293
#[test]

server/configuration/server_base.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ queue:
2727
always_poll: false
2828
max_success: 1000
2929
max_failed: 10000
30+
monitoring:
31+
eoa_send_degradation_threshold_seconds: 30
32+
eoa_confirmation_degradation_threshold_seconds: 60
33+
eoa_stuck_threshold_seconds: 300

server/src/config.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,27 @@ pub struct QueueConfig {
2424
pub local_concurrency: usize,
2525
pub polling_interval_ms: u64,
2626
pub lease_duration_seconds: u64,
27+
28+
#[serde(default)]
29+
pub monitoring: MonitoringConfig,
30+
}
31+
32+
#[derive(Debug, Clone, Deserialize)]
33+
#[serde(default)]
34+
pub struct MonitoringConfig {
35+
pub eoa_send_degradation_threshold_seconds: u64,
36+
pub eoa_confirmation_degradation_threshold_seconds: u64,
37+
pub eoa_stuck_threshold_seconds: u64,
38+
}
39+
40+
impl Default for MonitoringConfig {
41+
fn default() -> Self {
42+
Self {
43+
eoa_send_degradation_threshold_seconds: 10, // 10 seconds
44+
eoa_confirmation_degradation_threshold_seconds: 120, // 2 minutes
45+
eoa_stuck_threshold_seconds: 600, // 10 minutes
46+
}
47+
}
2748
}
2849

2950
#[derive(Debug, Clone, Deserialize)]

server/src/queue/manager.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ impl QueueManager {
207207
.arc();
208208

209209
// Create EOA executor queue
210+
let eoa_metrics = engine_executors::metrics::EoaMetrics::new(
211+
queue_config.monitoring.eoa_send_degradation_threshold_seconds,
212+
queue_config.monitoring.eoa_confirmation_degradation_threshold_seconds,
213+
queue_config.monitoring.eoa_stuck_threshold_seconds,
214+
);
215+
210216
let eoa_executor_handler = EoaExecutorJobHandler {
211217
chain_service: chain_service.clone(),
212218
eoa_signer: eoa_signer.clone(),
@@ -216,6 +222,7 @@ impl QueueManager {
216222
authorization_cache,
217223
max_inflight: 100,
218224
max_recycled_nonces: 50,
225+
eoa_metrics,
219226
};
220227

221228
let eoa_executor_queue = Queue::builder()

0 commit comments

Comments
 (0)