Skip to content

Commit 514cd7b

Browse files
committed
double check the context for cancellation
When the context is canceled, there's no guarantee that Consume() will return context.Canceled. As a result, the context needs to be double-checked before looping in order to shutdown cleanly.
1 parent 0d12056 commit 514cd7b

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

events/consumer_group.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package events
22

33
import (
44
"context"
5+
stderrors "errors"
56
"log"
67
"sync"
78

@@ -65,7 +66,15 @@ func (s *SaramaEventConsumer) Start() error {
6566
// recreated to get the new claims
6667
if err := cg.Consume(ctx, topics, handler); err != nil {
6768
log.Printf("Error from consumer: %v", err)
68-
if err == context.Canceled {
69+
if stderrors.Is(err, context.Canceled) {
70+
return ErrConsumerStopped
71+
}
72+
return err
73+
}
74+
// Double check the context isn't canceled before looping. This is necessary as
75+
// Consume() sometimes returns nil when the context is canceled.
76+
if err := ctx.Err(); err != nil {
77+
if stderrors.Is(err, context.Canceled) {
6978
return ErrConsumerStopped
7079
}
7180
return err

0 commit comments

Comments
 (0)