1
1
package queue
2
2
3
+ // Package queue provides a high-performance, extensible message queue implementation
4
+ // supporting multiple workers, job retries, dynamic scaling, and graceful shutdown.
5
+
3
6
import (
4
7
"context"
5
8
"errors"
@@ -13,45 +16,52 @@ import (
13
16
"github.com/jpillora/backoff"
14
17
)
15
18
16
- // ErrQueueShutdown the queue is released and closed.
19
+ /*
20
+ ErrQueueShutdown is returned when an operation is attempted on a queue
21
+ that has already been closed and released.
22
+ */
17
23
var ErrQueueShutdown = errors .New ("queue has been closed and released" )
18
24
19
25
type (
20
- // A Queue is a message queue.
26
+ // Queue represents a message queue with worker management, job scheduling,
27
+ // retry logic, and graceful shutdown capabilities.
21
28
Queue struct {
22
- sync.Mutex
23
- metric * metric
24
- logger Logger
25
- workerCount int64
26
- routineGroup * routineGroup
27
- quit chan struct {}
28
- ready chan struct {}
29
- notify chan struct {}
30
- worker core.Worker
31
- stopOnce sync.Once
32
- stopFlag int32
33
- afterFn func ()
34
- retryInterval time.Duration
29
+ sync.Mutex // Mutex to protect concurrent access to queue state
30
+ metric * metric // Metrics collector for tracking queue and worker stats
31
+ logger Logger // Logger for queue events and errors
32
+ workerCount int64 // Number of worker goroutines to process jobs
33
+ routineGroup * routineGroup // Group to manage and wait for goroutines
34
+ quit chan struct {} // Channel to signal shutdown to all goroutines
35
+ ready chan struct {} // Channel to signal worker readiness
36
+ notify chan struct {} // Channel to notify workers of new jobs
37
+ worker core.Worker // The worker implementation that processes jobs
38
+ stopOnce sync.Once // Ensures shutdown is only performed once
39
+ stopFlag int32 // Atomic flag indicating if shutdown has started
40
+ afterFn func () // Optional callback after each job execution
41
+ retryInterval time.Duration // Interval for retrying job requests
35
42
}
36
43
)
37
44
38
- // ErrMissingWorker missing define worker
45
+ /*
46
+ ErrMissingWorker is returned when a queue is created without a worker implementation.
47
+ */
39
48
var ErrMissingWorker = errors .New ("missing worker module" )
40
49
41
- // NewQueue returns a Queue.
50
+ // NewQueue creates and returns a new Queue instance with the provided options.
51
+ // Returns an error if no worker is specified.
42
52
func NewQueue (opts ... Option ) (* Queue , error ) {
43
53
o := NewOptions (opts ... )
44
54
q := & Queue {
45
- routineGroup : newRoutineGroup (),
46
- quit : make (chan struct {}),
47
- ready : make (chan struct {}, 1 ),
48
- notify : make (chan struct {}, 1 ),
49
- workerCount : o .workerCount ,
50
- logger : o .logger ,
51
- worker : o .worker ,
52
- metric : & metric {},
53
- afterFn : o .afterFn ,
54
- retryInterval : o .retryInterval ,
55
+ routineGroup : newRoutineGroup (), // Manages all goroutines spawned by the queue
56
+ quit : make (chan struct {}), // Signals shutdown to all goroutines
57
+ ready : make (chan struct {}, 1 ), // Signals when a worker is ready to process a job
58
+ notify : make (chan struct {}, 1 ), // Notifies workers of new jobs
59
+ workerCount : o .workerCount , // Number of worker goroutines
60
+ logger : o .logger , // Logger for queue events
61
+ worker : o .worker , // Worker implementation
62
+ metric : & metric {}, // Metrics collector
63
+ afterFn : o .afterFn , // Optional post-job callback
64
+ retryInterval : o .retryInterval , // Interval for retrying job requests
55
65
}
56
66
57
67
if q .worker == nil {
@@ -61,7 +71,8 @@ func NewQueue(opts ...Option) (*Queue, error) {
61
71
return q , nil
62
72
}
63
73
64
- // Start to enable all worker
74
+ // Start launches all worker goroutines and begins processing jobs.
75
+ // If workerCount is zero, Start is a no-op.
65
76
func (q * Queue ) Start () {
66
77
q .Lock ()
67
78
count := q .workerCount
@@ -74,7 +85,9 @@ func (q *Queue) Start() {
74
85
})
75
86
}
76
87
77
- // Shutdown stops all queues.
88
+ // Shutdown initiates a graceful shutdown of the queue.
89
+ // It signals all goroutines to stop, shuts down the worker, and closes the quit channel.
90
+ // Shutdown is idempotent and safe to call multiple times.
78
91
func (q * Queue ) Shutdown () {
79
92
if ! atomic .CompareAndSwapInt32 (& q .stopFlag , 0 , 1 ) {
80
93
return
@@ -92,55 +105,59 @@ func (q *Queue) Shutdown() {
92
105
})
93
106
}
94
107
95
- // Release for graceful shutdown.
108
+ // Release performs a graceful shutdown and waits for all goroutines to finish .
96
109
func (q * Queue ) Release () {
97
110
q .Shutdown ()
98
111
q .Wait ()
99
112
}
100
113
101
- // BusyWorkers returns the numbers of workers in the running process .
114
+ // BusyWorkers returns the number of workers currently processing jobs .
102
115
func (q * Queue ) BusyWorkers () int64 {
103
116
return q .metric .BusyWorkers ()
104
117
}
105
118
106
- // BusyWorkers returns the numbers of success tasks.
119
+ // SuccessTasks returns the number of successfully completed tasks.
107
120
func (q * Queue ) SuccessTasks () uint64 {
108
121
return q .metric .SuccessTasks ()
109
122
}
110
123
111
- // BusyWorkers returns the numbers of failure tasks.
124
+ // FailureTasks returns the number of failed tasks.
112
125
func (q * Queue ) FailureTasks () uint64 {
113
126
return q .metric .FailureTasks ()
114
127
}
115
128
116
- // BusyWorkers returns the numbers of submitted tasks .
129
+ // SubmittedTasks returns the number of tasks submitted to the queue .
117
130
func (q * Queue ) SubmittedTasks () uint64 {
118
131
return q .metric .SubmittedTasks ()
119
132
}
120
133
121
- // CompletedTasks returns the numbers of completed tasks.
134
+ // CompletedTasks returns the total number of completed tasks (success + failure) .
122
135
func (q * Queue ) CompletedTasks () uint64 {
123
136
return q .metric .CompletedTasks ()
124
137
}
125
138
126
- // Wait all process
139
+ // Wait blocks until all goroutines in the routine group have finished.
127
140
func (q * Queue ) Wait () {
128
141
q .routineGroup .Wait ()
129
142
}
130
143
131
- // Queue to queue single job with binary
144
+ // Queue enqueues a single job (core.QueuedMessage) into the queue.
145
+ // Accepts job options for customization.
132
146
func (q * Queue ) Queue (message core.QueuedMessage , opts ... job.AllowOption ) error {
133
147
data := job .NewMessage (message , opts ... )
134
148
135
149
return q .queue (& data )
136
150
}
137
151
138
- // QueueTask to queue single task
152
+ // QueueTask enqueues a single task function into the queue.
153
+ // Accepts job options for customization.
139
154
func (q * Queue ) QueueTask (task job.TaskFunc , opts ... job.AllowOption ) error {
140
155
data := job .NewTask (task , opts ... )
141
156
return q .queue (& data )
142
157
}
143
158
159
+ // queue is an internal helper to enqueue a job.Message into the worker.
160
+ // It increments the submitted task metric and notifies workers if possible.
144
161
func (q * Queue ) queue (m * job.Message ) error {
145
162
if atomic .LoadInt32 (& q .stopFlag ) == 1 {
146
163
return ErrQueueShutdown
@@ -151,9 +168,8 @@ func (q *Queue) queue(m *job.Message) error {
151
168
}
152
169
153
170
q .metric .IncSubmittedTask ()
154
- // notify worker
155
- // if the channel is full, it means that the worker is busy
156
- // and we don't want to block the main thread
171
+ // Notify a worker that a new job is available.
172
+ // If the notify channel is full, the worker is busy and we avoid blocking.
157
173
select {
158
174
case q .notify <- struct {}{}:
159
175
default :
@@ -162,10 +178,11 @@ func (q *Queue) queue(m *job.Message) error {
162
178
return nil
163
179
}
164
180
181
+ // work executes a single task, handling panics and updating metrics accordingly.
182
+ // After execution, it schedules the next worker if needed.
165
183
func (q * Queue ) work (task core.TaskMessage ) {
166
184
var err error
167
- // to handle panic cases from inside the worker
168
- // in such case, we start a new goroutine
185
+ // Defer block to handle panics, update metrics, and run afterFn callback.
169
186
defer func () {
170
187
q .metric .DecBusyWorker ()
171
188
e := recover ()
@@ -174,7 +191,7 @@ func (q *Queue) work(task core.TaskMessage) {
174
191
}
175
192
q .schedule ()
176
193
177
- // increase success or failure number
194
+ // Update success or failure metrics based on execution result.
178
195
if err == nil && e == nil {
179
196
q .metric .IncSuccessTask ()
180
197
} else {
@@ -190,6 +207,8 @@ func (q *Queue) work(task core.TaskMessage) {
190
207
}
191
208
}
192
209
210
+ // run dispatches the task to the appropriate handler based on its type.
211
+ // Returns an error if the task type is invalid.
193
212
func (q * Queue ) run (task core.TaskMessage ) error {
194
213
switch t := task .(type ) {
195
214
case * job.Message :
@@ -199,8 +218,11 @@ func (q *Queue) run(task core.TaskMessage) error {
199
218
}
200
219
}
201
220
221
+ // handle executes a job.Message, supporting retries, timeouts, and panic recovery.
222
+ // Returns an error if the job fails or times out.
202
223
func (q * Queue ) handle (m * job.Message ) error {
203
- // create channel with buffer size 1 to avoid goroutine leak
224
+ // done: receives the result of the job execution
225
+ // panicChan: receives any panic that occurs in the job goroutine
204
226
done := make (chan error , 1 )
205
227
panicChan := make (chan any , 1 )
206
228
startTime := time .Now ()
@@ -209,18 +231,18 @@ func (q *Queue) handle(m *job.Message) error {
209
231
cancel ()
210
232
}()
211
233
212
- // run the job
234
+ // Run the job in a separate goroutine to support timeout and panic recovery.
213
235
go func () {
214
- // handle panic issue
236
+ // Defer block to catch panics and send to panicChan
215
237
defer func () {
216
238
if p := recover (); p != nil {
217
239
panicChan <- p
218
240
}
219
241
}()
220
242
221
- // run custom process function
222
243
var err error
223
244
245
+ // Set up backoff for retry logic
224
246
b := & backoff.Backoff {
225
247
Min : m .RetryMin ,
226
248
Max : m .RetryMax ,
@@ -230,26 +252,28 @@ func (q *Queue) handle(m *job.Message) error {
230
252
delay := m .RetryDelay
231
253
loop:
232
254
for {
255
+ // If a custom Task function is provided, use it; otherwise, use the worker's Run method.
233
256
if m .Task != nil {
234
257
err = m .Task (ctx )
235
258
} else {
236
259
err = q .worker .Run (ctx , m )
237
260
}
238
261
239
- // check error and retry count
262
+ // If no error or no retries left, exit loop.
240
263
if err == nil || m .RetryCount == 0 {
241
264
break
242
265
}
243
266
m .RetryCount --
244
267
268
+ // If no fixed retry delay, use backoff.
245
269
if m .RetryDelay == 0 {
246
270
delay = b .Duration ()
247
271
}
248
272
249
273
select {
250
- case <- time .After (delay ): // retry delay
274
+ case <- time .After (delay ): // Wait before retrying
251
275
q .logger .Infof ("retry remaining times: %d, delay time: %s" , m .RetryCount , delay )
252
- case <- ctx .Done (): // timeout reached
276
+ case <- ctx .Done (): // Timeout reached
253
277
err = ctx .Err ()
254
278
break loop
255
279
}
@@ -261,36 +285,36 @@ func (q *Queue) handle(m *job.Message) error {
261
285
select {
262
286
case p := <- panicChan :
263
287
panic (p )
264
- case <- ctx .Done (): // timeout reached
288
+ case <- ctx .Done (): // Timeout reached
265
289
return ctx .Err ()
266
- case <- q .quit : // shutdown service
267
- // cancel job
290
+ case <- q .quit : // Queue is shutting down
291
+ // Cancel job and wait for remaining time or job completion
268
292
cancel ()
269
-
270
293
leftTime := m .Timeout - time .Since (startTime )
271
- // wait job
272
294
select {
273
295
case <- time .After (leftTime ):
274
296
return context .DeadlineExceeded
275
- case err := <- done : // job finish
297
+ case err := <- done : // Job finished
276
298
return err
277
299
case p := <- panicChan :
278
300
panic (p )
279
301
}
280
- case err := <- done : // job finish
302
+ case err := <- done : // Job finished
281
303
return err
282
304
}
283
305
}
284
306
285
- // UpdateWorkerCount to update worker number dynamically.
307
+ // UpdateWorkerCount dynamically updates the number of worker goroutines.
308
+ // Triggers scheduling to adjust to the new worker count.
286
309
func (q * Queue ) UpdateWorkerCount (num int64 ) {
287
310
q .Lock ()
288
311
q .workerCount = num
289
312
q .Unlock ()
290
313
q .schedule ()
291
314
}
292
315
293
- // schedule to check worker number
316
+ // schedule checks if more workers can be started based on the current busy count.
317
+ // If so, it signals readiness to start a new worker.
294
318
func (q * Queue ) schedule () {
295
319
q .Lock ()
296
320
defer q .Unlock ()
@@ -304,24 +328,30 @@ func (q *Queue) schedule() {
304
328
}
305
329
}
306
330
307
- // start to start all worker
331
+ /*
332
+ start launches the main worker loop, which manages job scheduling and execution.
333
+
334
+ - It uses a ticker to periodically retry job requests if the queue is empty.
335
+ - For each available worker slot, it requests a new task from the worker.
336
+ - If a task is available, it is sent to the tasks channel and processed by a new goroutine.
337
+ - The loop exits when the quit channel is closed.
338
+ */
308
339
func (q * Queue ) start () {
309
340
tasks := make (chan core.TaskMessage , 1 )
310
341
ticker := time .NewTicker (q .retryInterval )
311
342
defer ticker .Stop ()
312
343
313
344
for {
314
- // check worker number
345
+ // Ensure the number of busy workers does not exceed the configured worker count.
315
346
q .schedule ()
316
347
317
348
select {
318
- // wait worker ready
319
- case <- q .ready :
320
- case <- q .quit :
349
+ case <- q .ready : // Wait for a worker slot to become available
350
+ case <- q .quit : // Shutdown signal received
321
351
return
322
352
}
323
353
324
- // request task from queue in background
354
+ // Request a task from the worker in a background goroutine.
325
355
q .routineGroup .Run (func () {
326
356
for {
327
357
t , err := q .worker .Request ()
@@ -359,7 +389,7 @@ func (q *Queue) start() {
359
389
return
360
390
}
361
391
362
- // start new task
392
+ // Start processing the new task in a separate goroutine.
363
393
q .metric .IncBusyWorker ()
364
394
q .routineGroup .Run (func () {
365
395
q .work (task )
0 commit comments