Skip to content

Commit 4149e90

Browse files
committed
Overhaul error handling in returnJobs
1 parent e922c13 commit 4149e90

File tree

1 file changed

+90
-26
lines changed

1 file changed

+90
-26
lines changed

src/localQueue.ts

+90-26
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
import { MINUTE, SECOND } from "./cronConstants";
1111
import defer, { Deferred } from "./deferred";
1212
import { GetJobFunction, Job, TaskList, WorkerPool } from "./interfaces";
13-
import { coerceError } from "./lib";
13+
import { coerceError, sleep } from "./lib";
1414
import { batchGetJobs } from "./sql/getJobs";
1515
import { returnJobs } from "./sql/returnJobs";
1616

@@ -346,19 +346,23 @@ export class LocalQueue {
346346
"Cannot enter polling mode when job queue isn't empty",
347347
);
348348

349+
// There's no jobs, so there's no need for ttlExpired timer any more.
349350
if (this.ttlExpiredTimer) {
350351
clearTimeout(this.ttlExpiredTimer);
351352
this.ttlExpiredTimer = null;
352353
}
353354

354355
this.setMode(POLLING);
355356

357+
// This won't necessarily fetch, it will respect refetchDelay
356358
this.fetch();
357359
}
358360

359-
private setModeWaiting() {
360-
// Can only enter WAITING mode from POLLING mode.
361-
assert.equal(this.mode, POLLING);
361+
private setModeWaiting(causedByErrorHandling = false) {
362+
if (!causedByErrorHandling) {
363+
// Can only enter WAITING mode from POLLING mode.
364+
assert.equal(this.mode, POLLING);
365+
}
362366
assert.ok(
363367
!this.fetchTimer,
364368
"Cannot enter waiting mode when a fetch is scheduled",
@@ -367,6 +371,11 @@ export class LocalQueue {
367371
!this.fetchInProgress,
368372
"Cannot enter waiting mode when fetch is in progress",
369373
);
374+
assert.equal(
375+
this.workerQueue.length,
376+
0,
377+
"Cannot enter waiting mode when the worker queue is not empty",
378+
);
370379
assert.notEqual(
371380
this.jobQueue.length,
372381
0,
@@ -379,12 +388,10 @@ export class LocalQueue {
379388

380389
this.setMode(WAITING);
381390

382-
this.ttlExpiredTimer = setTimeout(() => {
383-
this.setModeTtlExpired();
384-
}, this.ttl);
391+
this.ttlExpiredTimer = setTimeout(this.setModeTtlExpired, this.ttl);
385392
}
386393

387-
private setModeTtlExpired() {
394+
private setModeTtlExpired = () => {
388395
// Can only enter TTL_EXPIRED mode from WAITING mode.
389396
assert.equal(this.mode, WAITING);
390397
assert.ok(
@@ -410,42 +417,97 @@ export class LocalQueue {
410417

411418
// Return jobs to the pool
412419
this.returnJobs();
413-
}
420+
};
414421

415422
private returnJobs() {
416423
const l = this.jobQueue.length;
417424
if (l === 0) {
418425
return;
419426
}
420427
const jobsToReturn = this.jobQueue.splice(0, l);
428+
421429
this.compiledSharedOptions.events.emit("localQueue:returnJobs", {
422430
localQueue: this,
423431
jobs: jobsToReturn,
424432
});
433+
434+
let attempts = 1;
435+
let initialError: Error;
436+
const MAX_ATTEMPTS = 20;
437+
const onError = (e: unknown): void | Promise<void> => {
438+
if (attempts === 1) {
439+
initialError = coerceError(e);
440+
}
441+
442+
this.compiledSharedOptions.logger.error(
443+
`Failed to return jobs from local queue to database queue (attempt ${attempts}/${MAX_ATTEMPTS})`,
444+
{ error: e, attempts, maxAttempts: MAX_ATTEMPTS },
445+
);
446+
447+
// NOTE: the mode now may not be the mode that we were in when
448+
// returnJobs was called. An error happened... we need to deal with
449+
// this error gracefully.
450+
switch (this.mode) {
451+
case "RELEASED": {
452+
throw new Error(
453+
`Error occurred whilst returning jobs from local queue to database queue: ${initialError.message}`,
454+
);
455+
}
456+
457+
// NOTE: considered doing `this.receivedJobs(jobsToReturn)`; but I
458+
// simply trying to release them again seems safer and more correct.
459+
default: {
460+
if (attempts < MAX_ATTEMPTS) {
461+
/** Minimum delay between attempts (milliseconds); can actually be half this due to jitter */
462+
const minDelay = 200;
463+
/** Maximum delay between attempts (milliseconds) - can actually be 1.5x this due to jitter */
464+
const maxDelay = 30_000; // Maximum delay in milliseconds
465+
/** `multiplier ^ attempts` */
466+
const multiplier = 1.5;
467+
/** Prevent the thundering herd problem by offsetting randomly */
468+
const jitter = Math.random();
469+
const delay =
470+
Math.min(
471+
minDelay * Math.pow(multiplier, attempts - 1),
472+
maxDelay,
473+
) *
474+
(0.5 + jitter);
475+
476+
// Be sure to increment attempts to avoid infinite loop!
477+
++attempts;
478+
return sleep(delay).then(() =>
479+
returnJobs(
480+
this.compiledSharedOptions,
481+
this.withPgClient,
482+
this.workerPool.id,
483+
jobsToReturn,
484+
).then(noop, onError),
485+
);
486+
} else {
487+
// TODO: is this the correct way to handle this? Are we allowed to
488+
// trigger shut down internally?
489+
this.release();
490+
// Now we're in release mode, throwing the error will be tracked
491+
// automatically by `this.background()`
492+
throw new Error(
493+
`Error occurred whilst returning jobs from local queue to database queue; aborting after ${attempts} attempts. Initial error: ${initialError.message}`,
494+
);
495+
}
496+
}
497+
}
498+
};
499+
500+
// NOTE: the `this.background` call covers all of the re-attempts via
501+
// `onError` above, since `onError` returns the next promise each time.
425502
this.background(
426503
returnJobs(
427504
this.compiledSharedOptions,
428505
this.withPgClient,
429506
this.workerPool.id,
430507
jobsToReturn,
431508
).then(
432-
() => {},
433-
(e) => {
434-
if (this.mode === "RELEASED") {
435-
throw new Error(
436-
`Error occurred whilst returning jobs from local queue to database queue: ${
437-
coerceError(e).message
438-
}`,
439-
);
440-
} else {
441-
// Return the jobs to the queue; MUST NOT HAPPEN IN RELEASED MODE.
442-
this.receivedJobs(jobsToReturn);
443-
this.compiledSharedOptions.logger.error(
444-
`Failed to return jobs from local queue to database queue`,
445-
{ error: e },
446-
);
447-
}
448-
},
509+
noop, // No action necessary on success
510+
onError,
449511
),
450512
);
451513
}
@@ -799,3 +861,5 @@ export class LocalQueue {
799861
}
800862
}
801863
}
864+
865+
function noop() {}

0 commit comments

Comments
 (0)