Skip to content

Commit 64aef98

Browse files
feat(gateway): deduplicate code between create and ops batchers (#514)
1 parent cf887c9 commit 64aef98

File tree

4 files changed

+334
-297
lines changed

4 files changed

+334
-297
lines changed
Lines changed: 57 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
1-
use std::{collections::VecDeque, sync::Arc, time::Duration};
1+
use std::{collections::VecDeque, sync::Arc};
22

33
use crate::{
44
RequestTracker,
5-
batch_policy::{
6-
BacklogUrgencyStats, BaseFeeCache, BatchPolicyEngine, DecisionReason, record_policy_metrics,
7-
},
5+
batch_policy::BaseFeeCache,
86
config::BatchPolicyConfig,
97
error::parse_contract_error,
108
metrics::{
119
METRICS_BATCH_FAILURE, METRICS_BATCH_LATENCY_MS, METRICS_BATCH_SIZE,
1210
METRICS_BATCH_SUBMITTED, METRICS_BATCH_SUCCESS,
1311
},
12+
policy_batcher::{PolicyBatchLoopRunner, TimedEnvelope},
1413
request_tracker::BacklogScope,
1514
};
1615
use alloy::{
@@ -34,11 +33,6 @@ pub struct CreateReqEnvelope {
3433
pub req: CreateAccountRequest,
3534
}
3635

37-
struct TimedCreateReqEnvelope {
38-
enqueued_at: Instant,
39-
envelope: CreateReqEnvelope,
40-
}
41-
4236
pub struct CreateBatcherRunner {
4337
rx: mpsc::Receiver<CreateReqEnvelope>,
4438
registry: Arc<WorldIdRegistryInstance<Arc<DynProvider>>>,
@@ -72,152 +66,15 @@ impl CreateBatcherRunner {
7266

7367
pub async fn run(mut self) {
7468
if self.batch_policy.enabled {
75-
self.run_policy().await;
69+
self.run_policy_loop().await;
7670
} else {
77-
self.run_legacy().await;
78-
}
79-
}
80-
81-
async fn run_legacy(&mut self) {
82-
let window = Duration::from_millis(self.batch_policy.reeval_ms);
83-
loop {
84-
let Some(first) = self.rx.recv().await else {
85-
tracing::info!("create batcher channel closed");
86-
return;
87-
};
88-
89-
let mut batch = vec![first];
90-
let deadline = Instant::now() + window;
91-
92-
loop {
93-
if batch.len() >= self.max_batch_size {
94-
break;
95-
}
96-
match tokio::time::timeout_at(deadline, self.rx.recv()).await {
97-
Ok(Some(req)) => batch.push(req),
98-
Ok(None) => {
99-
tracing::info!("create batcher channel closed while batching");
100-
break;
101-
}
102-
Err(_) => break, // Timeout expired
103-
}
104-
}
105-
106-
self.submit_batch(batch).await;
107-
}
108-
}
109-
110-
async fn run_policy(&mut self) {
111-
let mut policy_engine = BatchPolicyEngine::new(self.batch_policy.clone());
112-
let reeval_interval = Duration::from_millis(self.batch_policy.reeval_ms);
113-
114-
let mut queue: VecDeque<TimedCreateReqEnvelope> = VecDeque::new();
115-
let mut next_eval = Instant::now() + reeval_interval;
116-
let mut rx_open = true;
117-
118-
while rx_open || !queue.is_empty() {
119-
if queue.len() >= self.local_queue_limit {
120-
tracing::warn!(
121-
queue_len = queue.len(),
122-
local_queue_limit = self.local_queue_limit,
123-
"create policy queue reached local capacity, pausing intake for backpressure"
124-
);
125-
}
126-
127-
if queue.is_empty() {
128-
if !rx_open {
129-
break;
130-
}
131-
match self.rx.recv().await {
132-
Some(first) => {
133-
queue.push_back(TimedCreateReqEnvelope {
134-
enqueued_at: Instant::now(),
135-
envelope: first,
136-
});
137-
next_eval = Instant::now() + reeval_interval;
138-
}
139-
None => {
140-
tracing::info!("create batcher channel closed");
141-
rx_open = false;
142-
}
143-
}
144-
continue;
145-
}
146-
147-
tokio::select! {
148-
biased;
149-
_ = tokio::time::sleep_until(next_eval) => {
150-
let cost_score = policy_engine.update_cost_score(self.base_fee_cache.latest());
151-
152-
let fallback_age = queue
153-
.front()
154-
.map(|first| Instant::now().duration_since(first.enqueued_at).as_secs())
155-
.unwrap_or_default();
156-
157-
let stats = match self
158-
.tracker
159-
.queued_backlog_stats_for_scope(BacklogScope::Create)
160-
.await
161-
{
162-
Ok(stats) => stats,
163-
Err(err) => {
164-
tracing::warn!(error = %err, "failed to read queued backlog stats; using local fallback");
165-
BacklogUrgencyStats {
166-
queued_count: queue.len(),
167-
oldest_age_secs: fallback_age,
168-
}
169-
}
170-
};
171-
172-
let decision = policy_engine.evaluate(stats, self.max_batch_size, cost_score);
173-
record_policy_metrics("create", &decision);
174-
175-
if !decision.should_send {
176-
if matches!(decision.reason, DecisionReason::NoBacklog) && !queue.is_empty()
177-
{
178-
let dropped = queue.len();
179-
let inflight_removed =
180-
self.drop_local_queue_and_inflight(&mut queue).await;
181-
tracing::warn!(
182-
dropped,
183-
inflight_removed,
184-
"redis reports no queued backlog, dropping local create queue entries to resync state"
185-
);
186-
}
187-
next_eval = Instant::now() + reeval_interval;
188-
continue;
189-
}
190-
191-
let take_n = decision.target_batch_size.min(queue.len()).max(1);
192-
let batch: Vec<CreateReqEnvelope> = queue
193-
.drain(..take_n)
194-
.map(|timed| timed.envelope)
195-
.collect();
196-
self.submit_batch(batch).await;
197-
198-
next_eval = Instant::now() + reeval_interval;
199-
}
200-
maybe_req = self.rx.recv(), if rx_open && queue.len() < self.local_queue_limit => {
201-
match maybe_req {
202-
Some(req) => {
203-
queue.push_back(TimedCreateReqEnvelope {
204-
enqueued_at: Instant::now(),
205-
envelope: req,
206-
});
207-
}
208-
None => {
209-
tracing::info!("create batcher channel closed while policy batching");
210-
rx_open = false;
211-
}
212-
}
213-
}
214-
}
71+
self.run_legacy_loop().await;
21572
}
21673
}
21774

21875
async fn drop_local_queue_and_inflight(
21976
&self,
220-
queue: &mut VecDeque<TimedCreateReqEnvelope>,
77+
queue: &mut VecDeque<TimedEnvelope<CreateReqEnvelope>>,
22178
) -> usize {
22279
let mut inflight_addresses = Vec::new();
22380
for timed in queue.drain(..) {
@@ -231,7 +88,7 @@ impl CreateBatcherRunner {
23188
removed
23289
}
23390

234-
async fn submit_batch(&self, batch: Vec<CreateReqEnvelope>) {
91+
async fn submit_create_batch(&self, batch: Vec<CreateReqEnvelope>) {
23592
if batch.is_empty() {
23693
return;
23794
}
@@ -327,3 +184,53 @@ impl CreateBatcherRunner {
327184
}
328185
}
329186
}
187+
188+
impl PolicyBatchLoopRunner for CreateBatcherRunner {
189+
type Envelope = CreateReqEnvelope;
190+
191+
fn batch_type(&self) -> &'static str {
192+
"create"
193+
}
194+
195+
fn backlog_scope(&self) -> BacklogScope {
196+
BacklogScope::Create
197+
}
198+
199+
fn max_batch_size(&self) -> usize {
200+
self.max_batch_size
201+
}
202+
203+
fn local_queue_limit(&self) -> usize {
204+
self.local_queue_limit
205+
}
206+
207+
fn batch_policy(&self) -> &BatchPolicyConfig {
208+
&self.batch_policy
209+
}
210+
211+
fn base_fee_cache(&self) -> &BaseFeeCache {
212+
&self.base_fee_cache
213+
}
214+
215+
fn tracker(&self) -> &RequestTracker {
216+
&self.tracker
217+
}
218+
219+
fn rx(&mut self) -> &mut mpsc::Receiver<Self::Envelope> {
220+
&mut self.rx
221+
}
222+
223+
async fn submit_batch(&self, batch: Vec<Self::Envelope>) {
224+
self.submit_create_batch(batch).await;
225+
}
226+
227+
async fn handle_no_backlog(&self, queue: &mut VecDeque<TimedEnvelope<Self::Envelope>>) {
228+
let dropped = queue.len();
229+
let inflight_removed = self.drop_local_queue_and_inflight(queue).await;
230+
tracing::warn!(
231+
dropped,
232+
inflight_removed,
233+
"redis reports no queued backlog, dropping local create queue entries to resync state"
234+
);
235+
}
236+
}

services/gateway/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod metrics;
2121
pub mod nonce;
2222
mod ops_batcher;
2323
pub mod orphan_sweeper;
24+
mod policy_batcher;
2425
mod request;
2526
pub mod request_tracker;
2627
mod routes;

0 commit comments

Comments
 (0)