@@ -55,12 +55,14 @@ const { STARTING, POLLING, WAITING, TTL_EXPIRED, RELEASED } = LocalQueueModes;
55
55
*
56
56
* When jobs are fetched:
57
57
*
58
- * - if no jobs were returned then it will wait `pollInterval` ms and then
59
- * fetch again.
60
58
* - if fewer than `Math.ceil(Math.min(localQueueRefetchDelay.threshold, localQueueSize))`
61
59
* jobs were returned then a refetch delay will be set (if configured).
62
- * - if jobs are returned from a POLLING mode fetch then the queue immediately
63
- * enters WAITING mode.
60
+ * - if jobs were returned then it will supply as many as possible to any
61
+ * waiting workers (`workerQueue`)
62
+ * - if all workers are busy and jobs still remain it will store them to
63
+ * `jobQueue` and immediately enter WAITING mode
64
+ * - otherwise (if no jobs remain: `jobQueue` is empty) we'll wait
65
+ * `pollInterval` ms and then fetch again.
64
66
*
65
67
* When a "new job" notification is received, once any required refetch delay
66
68
* has expired (or immediately if it has already expired) the timer will be
@@ -96,38 +98,136 @@ const { STARTING, POLLING, WAITING, TTL_EXPIRED, RELEASED } = LocalQueueModes;
96
98
*
97
99
* Triggered on shutdown.
98
100
*/
99
-
100
101
export class LocalQueue {
102
+ /**
103
+ * The configured time (in milliseconds) that a job may sit unclaimed in the
104
+ * local queue before being returned to the database.
105
+ */
101
106
readonly ttl : number ;
107
+
108
+ /**
109
+ * The time interval (in milliseconds) between fetch requests when in
110
+ * `POLLING` mode.
111
+ */
102
112
readonly pollInterval : number ;
113
+
114
+ /**
115
+ * The jobs that have been pulled from the database that are waiting for a
116
+ * worker to claim them. Once claimed, a job will be removed from this list.
117
+ * This should be empty in POLLING and TTL_EXPIRED modes.
118
+ */
119
+
103
120
readonly jobQueue : Job [ ] = [ ] ;
121
+ /**
122
+ * Workers waiting for jobs are represented by deferred promises in this
123
+ * list. When a job becomes available, first it attempts to satisfy one of
124
+ * these from the workerQueue, and only if this is empty does it then add the
125
+ * job to the `jobQueue`.
126
+ */
104
127
readonly workerQueue : Deferred < Job > [ ] = [ ] ;
128
+
129
+ /**
130
+ * Are we currently fetching jobs from the DB? Prevents double-fetches.
131
+ */
105
132
fetchInProgress = false ;
133
+
134
+ /**
135
+ * When we enter WAITING mode (i.e. there are jobs in `jobQueue`), we set up
136
+ * this timer. When the timer fires, we will release any remaining jobs in
137
+ * jobQueue back to the database (and enter TTL_EXPIRED mode). Note: all jobs
138
+ * are fetched at once, and no further jobs are fetched, so the TTL for all
139
+ * jobs will expire at the same time - we'll only return to POLLING mode once
140
+ * all jobs have been executed.
141
+ */
106
142
ttlExpiredTimer : NodeJS . Timeout | null = null ;
143
+
144
+ /**
145
+ * The timer associated with the next fetch poll (see also `pollInterval`).
146
+ */
107
147
fetchTimer : NodeJS . Timeout | null = null ;
108
- // Set true to fetch immediately after a fetch completes; typically only used
109
- // when the queue is pulsed during a fetch.
148
+
149
+ /**
150
+ * Should we fetch again once the current fetch is complete? This is
151
+ * generally used to indicate that we received a "new job" notification (the
152
+ * queue is "pulsed") whilst we were already fetching, so our fetch may not
153
+ * have included that job.
154
+ */
110
155
fetchAgain = false ;
156
+
157
+ /**
158
+ * The mode that the queue is in; must only be changed via `setMode`, which
159
+ * itself must only be called by the `setMode*()` methods.
160
+ */
111
161
public readonly mode : LocalQueueMode = STARTING ;
112
- /** The promise that resolves/rejects when the queue is disposed of */
162
+
163
+ /**
164
+ * The promise that resolves/rejects when the local queue has been released.
165
+ * Will not resolve until all locally queued jobs have been returned to the
166
+ * pool (or may reject if this process fails) and all active fetches and
167
+ * other background tasks are complete. This is important, otherwise we might
168
+ * release the pg.Pool that we're using before jobs are returned to the
169
+ * database, which would be something we couldn't recover from!
170
+ *
171
+ * If it rejects, may reject with a regular Error or an AggregateError
172
+ * representing multiple failures.
173
+ */
113
174
private _finPromise = defer ( ) ;
175
+
176
+ /**
177
+ * Errors that occurred causing the shutdown or during the shutdown of this
178
+ * local queue instance.
179
+ */
114
180
private errors : Error [ ] = [ ] ;
115
- /** A count of the number of "background" processes such as fetching or returning jobs */
181
+
182
+ /**
183
+ * A count of the number of "background" processes such as fetching or
184
+ * returning jobs such that we can avoid exiting until all background tasks
185
+ * have completed.
186
+ */
116
187
private backgroundCount = 0 ;
117
188
118
- /** If `localQueueRefetchDelay` is configured; set this true if the fetch resulted in a queue size lower than the threshold. */
189
+ /**
190
+ * If `localQueueRefetchDelay` is configured; set this true if the fetch
191
+ * resulted in a queue size lower than the threshold.
192
+ */
119
193
private refetchDelayActive = false ;
194
+
195
+ /**
196
+ * If true, when the refetch delay expires in POLLING mode (or when we next
197
+ * enter POLLING mode after it expires), immediately trigger a fetch. If
198
+ * false, just wait for the regular POLLING timeouts.
199
+ */
120
200
private refetchDelayFetchOnComplete = false ;
201
+
202
+ /** The timer tracking when the refetch delay has expired. */
121
203
private refetchDelayTimer : NodeJS . Timeout | null = null ;
204
+
205
+ /**
206
+ * The number of new jobs received during the fetch or the resulting refetch
207
+ * delay; see also `refetchDelayAbortThreshold`.
208
+ */
122
209
private refetchDelayCounter : number = 0 ;
210
+
211
+ /**
212
+ * A random number between 0 and either
213
+ * `preset.worker.localQueue.refetchDelay.maxAbortThreshold` or
214
+ * `5*preset.worker.localQueue.size`; when we've been informed of this many
215
+ * jobs via pulse(), we must abort the refetch delay and trigger an immediate
216
+ * fetch.
217
+ */
123
218
private refetchDelayAbortThreshold : number = Infinity ;
124
219
125
220
constructor (
126
221
private readonly compiledSharedOptions : CompiledSharedOptions < WorkerPoolOptions > ,
127
222
private readonly tasks : TaskList ,
128
223
private readonly withPgClient : EnhancedWithPgClient ,
129
224
public readonly workerPool : WorkerPool ,
225
+ /** How many jobs to fetch at once */
130
226
private readonly getJobBatchSize : number ,
227
+ /**
228
+ * If false, exit once the DB seems to have been exhausted of jobs, even if
229
+ * for just a moment. (I.e. `runOnce()`)
230
+ */
131
231
private readonly continuous : boolean ,
132
232
) {
133
233
this . ttl =
@@ -148,9 +248,13 @@ export class LocalQueue {
148
248
compiledSharedOptions . events . emit ( "localQueue:init" , {
149
249
localQueue : this ,
150
250
} ) ;
251
+ // Immediately enter polling mode.
151
252
this . setModePolling ( ) ;
152
253
}
153
254
255
+ /**
256
+ * Only call this from `setMode*()` helpers.
257
+ */
154
258
private setMode (
155
259
newMode : Exclude < LocalQueueMode , typeof LocalQueueModes . STARTING > ,
156
260
) {
@@ -164,9 +268,15 @@ export class LocalQueue {
164
268
} ) ;
165
269
}
166
270
271
+ /**
272
+ * Called when the LocalQueue is completely finished and released: no
273
+ * background tasks, no jobs in job queue. Resolves (or rejects)
274
+ * `_finPromise`.
275
+ */
167
276
private fin ( ) {
168
277
assert . equal ( this . mode , "RELEASED" ) ;
169
278
assert . equal ( this . backgroundCount , 0 ) ;
279
+ assert . equal ( this . jobQueue . length , 0 ) ;
170
280
if ( this . errors . length === 1 ) {
171
281
this . _finPromise . reject ( this . errors [ 0 ] ) ;
172
282
} else if ( this . errors . length > 1 ) {
@@ -191,15 +301,17 @@ export class LocalQueue {
191
301
this . fin ( ) ;
192
302
}
193
303
} else {
304
+ // If we're not shutting down, view this as a temporary error (but give
305
+ // Benjie a wrist slap anyway).
194
306
this . compiledSharedOptions . logger . error (
195
- `Backgrounding should never yield errors when the queue is not RELEASED` ,
307
+ `GraphileWorkerInternalError<cd483429-3372-42f0-bcf6-c78e045c760d>: Backgrounding should never yield errors when the queue is not RELEASED` ,
196
308
{ error : e } ,
197
309
) ;
198
310
}
199
311
} ;
200
312
201
313
/**
202
- * For promises that happen in the background, but that we want to ensure are
314
+ * Track promises that happen in the background, but that we want to ensure are
203
315
* handled before we release the queue (so that the database pool isn't
204
316
* released too early).
205
317
*
0 commit comments