Skip to content

Commit 7b9ab17

Browse files
authored
re2: add ability to selectively disable release concurrency queue consumers via env var (#1881)
* run engine: add ability to selectively disable release concurrency queue consumers via env var Also added some additional logging * actually pass the disableConsumers option through to the release concurrency system * Fixed CLI e2e failing tests
1 parent 25c1aba commit 7b9ab17

File tree

6 files changed

+76
-7
lines changed

6 files changed

+76
-7
lines changed

apps/webapp/app/env.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,7 @@ const EnvironmentSchema = z.object({
567567
RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),
568568

569569
RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
570+
RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"),
570571
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
571572
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3),
572573
RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1),

apps/webapp/app/v3/runEngine.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ function createRunEngine() {
7777
},
7878
releaseConcurrency: {
7979
disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0",
80+
disableConsumers: env.RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS === "1",
8081
maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO,
8182
maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES,
8283
consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT,

internal-packages/run-engine/src/engine/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ export class RunEngine {
201201
options.releaseConcurrency.disabled
202202
? undefined
203203
: {
204+
disableConsumers: options.releaseConcurrency?.disableConsumers,
204205
redis: {
205206
...options.queue.redis, // Use base queue redis options
206207
...options.releaseConcurrency?.redis, // Allow overrides

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

+63-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export type ReleaseConcurrencyQueueOptions<T> = {
2828
pollInterval?: number;
2929
batchSize?: number;
3030
retry?: ReleaseConcurrencyQueueRetryOptions;
31+
disableConsumers?: boolean;
3132
};
3233

3334
const QueueItemMetadata = z.object({
@@ -74,7 +75,10 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
7475
};
7576

7677
this.#registerCommands();
77-
this.#startConsumers();
78+
79+
if (!options.disableConsumers) {
80+
this.#startConsumers();
81+
}
7882
}
7983

8084
public async quit() {
@@ -93,6 +97,12 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
9397
const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor);
9498

9599
if (maxTokens === 0) {
100+
this.logger.debug("No tokens available, skipping release", {
101+
releaseQueueDescriptor,
102+
releaserId,
103+
maxTokens,
104+
});
105+
96106
return;
97107
}
98108

@@ -109,6 +119,14 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
109119
String(Date.now())
110120
);
111121

122+
this.logger.debug("Consumed token in attemptToRelease", {
123+
releaseQueueDescriptor,
124+
releaserId,
125+
maxTokens,
126+
result,
127+
releaseQueue,
128+
});
129+
112130
if (!!result) {
113131
await this.#callExecutor(releaseQueueDescriptor, releaserId, {
114132
retryCount: 0,
@@ -119,6 +137,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
119137
releaseQueueDescriptor,
120138
releaserId,
121139
maxTokens,
140+
releaseQueue,
122141
});
123142
}
124143
}
@@ -130,13 +149,19 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
130149
*/
131150
public async consumeToken(releaseQueueDescriptor: T, releaserId: string) {
132151
const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor);
152+
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
133153

134154
if (maxTokens === 0) {
155+
this.logger.debug("No tokens available, skipping consume", {
156+
releaseQueueDescriptor,
157+
releaserId,
158+
maxTokens,
159+
releaseQueue,
160+
});
161+
135162
return;
136163
}
137164

138-
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
139-
140165
await this.redis.consumeToken(
141166
this.masterQueuesKey,
142167
this.#bucketKey(releaseQueue),
@@ -147,6 +172,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
147172
String(maxTokens),
148173
String(Date.now())
149174
);
175+
176+
this.logger.debug("Consumed token in consumeToken", {
177+
releaseQueueDescriptor,
178+
releaserId,
179+
maxTokens,
180+
releaseQueue,
181+
});
150182
}
151183

152184
/**
@@ -157,6 +189,11 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
157189
public async returnToken(releaseQueueDescriptor: T, releaserId: string) {
158190
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
159191

192+
this.logger.debug("Returning token in returnToken", {
193+
releaseQueueDescriptor,
194+
releaserId,
195+
});
196+
160197
await this.redis.returnTokenOnly(
161198
this.masterQueuesKey,
162199
this.#bucketKey(releaseQueue),
@@ -165,6 +202,12 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
165202
releaseQueue,
166203
releaserId
167204
);
205+
206+
this.logger.debug("Returned token in returnToken", {
207+
releaseQueueDescriptor,
208+
releaserId,
209+
releaseQueue,
210+
});
168211
}
169212

170213
/**
@@ -177,10 +220,20 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
177220
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
178221

179222
if (amount < 0) {
223+
this.logger.debug("Cannot refill with negative tokens", {
224+
releaseQueueDescriptor,
225+
amount,
226+
});
227+
180228
throw new Error("Cannot refill with negative tokens");
181229
}
182230

183231
if (amount === 0) {
232+
this.logger.debug("Cannot refill with 0 tokens", {
233+
releaseQueueDescriptor,
234+
amount,
235+
});
236+
184237
return [];
185238
}
186239

@@ -192,6 +245,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
192245
String(amount),
193246
String(maxTokens)
194247
);
248+
249+
this.logger.debug("Refilled tokens in refillTokens", {
250+
releaseQueueDescriptor,
251+
releaseQueue,
252+
amount,
253+
maxTokens,
254+
});
195255
}
196256

197257
/**

internal-packages/run-engine/src/engine/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ export type RunEngineOptions = {
5151
maxDelay?: number; // Defaults to 60000
5252
factor?: number; // Defaults to 2
5353
};
54+
disableConsumers?: boolean;
5455
};
5556
};
5657

packages/cli-v3/e2e/utils.ts

+9-4
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,15 @@ export async function executeTestCaseRun({
314314
version: "1.0.0",
315315
contentHash,
316316
},
317+
machine: {
318+
name: "small-1x",
319+
cpu: 1,
320+
memory: 256,
321+
centsPerMs: 0.0000001,
322+
},
323+
}).initialize();
324+
325+
const result = await taskRunProcess.execute({
317326
payload: {
318327
traceContext: {
319328
traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
@@ -374,10 +383,6 @@ export async function executeTestCaseRun({
374383
messageId: "run_1234",
375384
});
376385

377-
await taskRunProcess.initialize();
378-
379-
const result = await taskRunProcess.execute();
380-
381386
await taskRunProcess.cleanup(true);
382387

383388
return {

0 commit comments

Comments
 (0)