Skip to content

Commit 2e6300c

Browse files
fix(gateway): deprecate legacy batch policy (#515)
1 parent 64aef98 commit 2e6300c

File tree

6 files changed

+23
-74
lines changed

6 files changed

+23
-74
lines changed

services/gateway/src/batch_policy.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,6 @@ mod tests {
344344

345345
fn cfg() -> BatchPolicyConfig {
346346
BatchPolicyConfig {
347-
enabled: true,
348347
reeval_ms: 2_000,
349348
max_wait_secs: 30,
350349
cost_ema_alpha: 0.2,

services/gateway/src/config.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,9 @@ impl Default for OrphanSweeperConfig {
6060
}
6161
}
6262

63-
/// Adaptive batching policy configuration.
63+
/// Policy-driven batching configuration.
6464
#[derive(Clone, Debug, clap::Args)]
6565
pub struct BatchPolicyConfig {
66-
/// Enable adaptive policy-driven batch dispatch.
67-
#[arg(long, env = "BATCH_POLICY_ENABLED", default_value = "false")]
68-
pub enabled: bool,
69-
7066
/// Re-evaluation cadence for policy decisions, in milliseconds.
7167
#[arg(long, env = "BATCH_REEVAL_MS", default_value = "1000")]
7268
pub reeval_ms: u64,
@@ -91,7 +87,6 @@ pub struct BatchPolicyConfig {
9187
impl Default for BatchPolicyConfig {
9288
fn default() -> Self {
9389
Self {
94-
enabled: false,
9590
reeval_ms: 1_000,
9691
max_wait_secs: 30,
9792
cost_ema_alpha: 0.2,
@@ -229,12 +224,9 @@ impl GatewayConfig {
229224
));
230225
}
231226

232-
if self.batch_policy.enabled
233-
&& self.sweeper().stale_queued_threshold_secs <= self.batch_policy.max_wait_secs
234-
{
227+
if self.sweeper().stale_queued_threshold_secs <= self.batch_policy.max_wait_secs {
235228
return Err(GatewayError::Config(
236-
"STALE_QUEUED_THRESHOLD_SECS must be greater than BATCH_MAX_WAIT_SECS when adaptive batching is enabled"
237-
.to_string(),
229+
"STALE_QUEUED_THRESHOLD_SECS must be greater than BATCH_MAX_WAIT_SECS".to_string(),
238230
));
239231
}
240232

@@ -376,4 +368,16 @@ mod tests {
376368
assert!(err.contains("BATCH_COST_HIGH_RATIO"));
377369
assert!(err.contains("finite"));
378370
}
371+
372+
#[test]
373+
fn test_stale_queued_threshold_must_exceed_max_wait_secs() {
374+
let mut config = parse_valid_config();
375+
config.batch_policy.max_wait_secs = 30;
376+
config.stale_queued_threshold_secs = 30;
377+
378+
let result = config.validate();
379+
assert!(result.is_err());
380+
let err = result.unwrap_err().to_string();
381+
assert!(err.contains("STALE_QUEUED_THRESHOLD_SECS"));
382+
}
379383
}

services/gateway/src/create_batcher.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,7 @@ impl CreateBatcherRunner {
6565
}
6666

6767
pub async fn run(mut self) {
68-
if self.batch_policy.enabled {
69-
self.run_policy_loop().await;
70-
} else {
71-
self.run_legacy_loop().await;
72-
}
68+
self.run_policy_loop().await;
7369
}
7470

7571
async fn drop_local_queue_and_inflight(

services/gateway/src/ops_batcher.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,7 @@ impl OpsBatcherRunner {
8282
}
8383

8484
pub async fn run(mut self) {
85-
if self.batch_policy.enabled {
86-
self.run_policy_loop().await;
87-
} else {
88-
self.run_legacy_loop().await;
89-
}
85+
self.run_policy_loop().await;
9086
}
9187

9288
async fn submit_ops_batch(&self, batch: Vec<OpsEnvelope>) {

services/gateway/src/policy_batcher.rs

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ enum PolicyLoopEvent<T> {
2525
///
2626
/// Implementors provide batcher-specific primitives (queue source, submit path,
2727
/// backlog scope, and no-backlog reconciliation), while this trait provides the
28-
/// common legacy/policy event loops.
28+
/// common policy event loop.
2929
pub(crate) trait PolicyBatchLoopRunner {
3030
type Envelope: Send + 'static;
3131

@@ -54,50 +54,6 @@ pub(crate) trait PolicyBatchLoopRunner {
5454
/// create clears queue and removes in-flight authenticators, while ops only clears queue.
5555
async fn handle_no_backlog(&self, queue: &mut VecDeque<TimedEnvelope<Self::Envelope>>);
5656

57-
/// Legacy batching loop: wait for first request, then fill until deadline or max batch size.
58-
async fn run_legacy_loop(&mut self) {
59-
let window = Duration::from_millis(self.batch_policy().reeval_ms);
60-
61-
loop {
62-
let first = {
63-
let rx = self.rx();
64-
rx.recv().await
65-
};
66-
let Some(first) = first else {
67-
tracing::info!("{} batcher channel closed", self.batch_type());
68-
return;
69-
};
70-
71-
let mut batch = vec![first];
72-
let deadline = Instant::now() + window;
73-
74-
loop {
75-
if batch.len() >= self.max_batch_size() {
76-
break;
77-
}
78-
79-
let next_req = {
80-
let rx = self.rx();
81-
tokio::time::timeout_at(deadline, rx.recv()).await
82-
};
83-
84-
match next_req {
85-
Ok(Some(req)) => batch.push(req),
86-
Ok(None) => {
87-
tracing::info!(
88-
"{} batcher channel closed while batching",
89-
self.batch_type()
90-
);
91-
break;
92-
}
93-
Err(_) => break, // Timeout expired
94-
}
95-
}
96-
97-
self.submit_batch(batch).await;
98-
}
99-
}
100-
10157
/// Policy-driven batching loop with periodic re-evaluation and bounded local queueing.
10258
async fn run_policy_loop(&mut self) {
10359
let mut policy_engine = BatchPolicyEngine::new(self.batch_policy().clone());

services/gateway/src/routes.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,11 @@ pub(crate) async fn build_app(
7171
let tracker = RequestTracker::new(redis_url, rate_limit).await;
7272
let base_fee_cache = BaseFeeCache::default();
7373

74-
if batch_policy_config.enabled {
75-
spawn_base_fee_sampler(
76-
registry.provider().clone(),
77-
Duration::from_millis(batch_policy_config.reeval_ms),
78-
base_fee_cache.clone(),
79-
);
80-
}
74+
spawn_base_fee_sampler(
75+
registry.provider().clone(),
76+
Duration::from_millis(batch_policy_config.reeval_ms),
77+
base_fee_cache.clone(),
78+
);
8179

8280
let (tx, rx) = mpsc::channel(CREATE_BATCHER_CHANNEL_CAPACITY);
8381
let batcher = CreateBatcherHandle { tx };

0 commit comments

Comments
 (0)