Skip to content

Commit 3a62ee3

Browse files
authored
v3: prevent envs with excess queues from degrading dequeue performance (#1982)
* v3: WIP maximum queues per env implementation * Add a queue distribution test * Add edge case tests * Set the marqs maximumQueuePerEnvCount
1 parent bb606a1 commit 3a62ee3

File tree

4 files changed

+444
-4
lines changed

4 files changed

+444
-4
lines changed

apps/webapp/app/env.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ const EnvironmentSchema = z.object({
384384
.int()
385385
.default(60 * 1000 * 15),
386386
MARQS_SHARED_QUEUE_LIMIT: z.coerce.number().int().default(1000),
387+
MARQS_MAXIMUM_QUEUE_PER_ENV_COUNT: z.coerce.number().int().default(50),
387388
MARQS_DEV_QUEUE_LIMIT: z.coerce.number().int().default(1000),
388389
MARQS_MAXIMUM_NACK_COUNT: z.coerce.number().int().default(64),
389390
MARQS_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75),

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

+17-4
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ export type FairDequeuingStrategyOptions = {
4343
biases?: FairDequeuingStrategyBiases;
4444
reuseSnapshotCount?: number;
4545
maximumEnvCount?: number;
46+
/**
47+
* Maximum number of queues to process per environment
48+
* If not provided, all queues in an environment will be processed
49+
*/
50+
maximumQueuePerEnvCount?: number;
4651
};
4752

4853
type FairQueueConcurrency = {
@@ -216,8 +221,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
216221
return result;
217222
}
218223

219-
// Helper method to maintain DRY principle
220-
// Update return type
221224
#orderQueuesByEnvs(envs: string[], snapshot: FairQueueSnapshot): Array<EnvQueues> {
222225
const queuesByEnv = snapshot.queues.reduce((acc, queue) => {
223226
if (!acc[queue.env]) {
@@ -231,11 +234,17 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
231234
if (queuesByEnv[envId]) {
232235
// Get ordered queues for this env
233236
const orderedQueues = this.#weightedRandomQueueOrder(queuesByEnv[envId]);
237+
238+
// Apply queue limit if maximumQueuePerEnvCount is set
239+
const limitedQueues = this.options.maximumQueuePerEnvCount
240+
? orderedQueues.slice(0, this.options.maximumQueuePerEnvCount)
241+
: orderedQueues;
242+
234243
// Only add the env if it has queues
235-
if (orderedQueues.length > 0) {
244+
if (limitedQueues.length > 0) {
236245
acc.push({
237246
envId,
238-
queues: orderedQueues.map((queue) => queue.id),
247+
queues: limitedQueues.map((queue) => queue.id),
239248
});
240249
}
241250
}
@@ -512,6 +521,10 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
512521

513522
span.setAttribute("queue_count", result.length);
514523

524+
if (result.length === this.options.parentQueueLimit) {
525+
span.setAttribute("parent_queue_limit_reached", true);
526+
}
527+
515528
return result;
516529
});
517530
}

apps/webapp/app/v3/marqs/index.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -2189,6 +2189,7 @@ function getMarQSClient() {
21892189
},
21902190
reuseSnapshotCount: env.MARQS_REUSE_SNAPSHOT_COUNT,
21912191
maximumEnvCount: env.MARQS_MAXIMUM_ENV_COUNT,
2192+
maximumQueuePerEnvCount: env.MARQS_MAXIMUM_QUEUE_PER_ENV_COUNT,
21922193
}),
21932194
envQueuePriorityStrategy: new FairDequeuingStrategy({
21942195
tracer: tracer,

0 commit comments

Comments
 (0)