Skip to content

Commit b291a1c

Browse files
authored
lcc: Sync when first fetch returns no records (#725)
If the first fetch returns no records, the consumer is considered synced immediately. This may only happen when the topic has no records. Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent 3ef26f3 commit b291a1c

File tree

2 files changed

+76
-16
lines changed

2 files changed

+76
-16
lines changed

kafka/log_compacted_consumer.go

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ func (cfg *LogCompactedConfig) finalize() error {
7575
// record for each key, allowing for efficient storage and retrieval of the most
7676
// recent state of each key.
7777
type LogCompactedConsumer struct {
78-
client *kgo.Client
79-
logger *zap.Logger
80-
process func(context.Context, *kgo.FetchesRecordIter) error
81-
topic string
82-
ctx context.Context
83-
cancel context.CancelFunc
78+
client *kgo.Client
79+
logger *zap.Logger
80+
process func(context.Context, *kgo.FetchesRecordIter) error
81+
topic string
82+
ctx context.Context
83+
cancel context.CancelFunc
84+
fetchMaxWait time.Duration
8485

8586
mu sync.RWMutex
8687
started chan struct{}
@@ -117,13 +118,14 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig,
117118
}
118119

119120
lcc := LogCompactedConsumer{
120-
topic: cfg.Topic,
121-
process: cfg.Processor,
122-
logger: cfg.Logger,
123-
client: client,
124-
started: make(chan struct{}),
125-
stopped: make(chan struct{}),
126-
synced: make(chan struct{}),
121+
topic: cfg.Topic,
122+
fetchMaxWait: cfg.FetchMaxWait,
123+
process: cfg.Processor,
124+
logger: cfg.Logger,
125+
client: client,
126+
started: make(chan struct{}),
127+
stopped: make(chan struct{}),
128+
synced: make(chan struct{}),
127129
}
128130
lcc.ctx, lcc.cancel = context.WithCancel(context.Background())
129131
return &lcc, nil
@@ -149,7 +151,14 @@ func (lcc *LogCompactedConsumer) Run(ctx context.Context) error {
149151
go func() {
150152
defer close(lcc.stopped)
151153
close(lcc.started)
152-
for {
154+
// It's possible that the topic contains no records, in which case the
155+
// consumer should be considered synced immediately. Since fetchMaxWait
156+
// only works if the topic has records, we need to pass a context with
157+
// a timeout to the first fetch.
158+
ctx, cancel := context.WithTimeout(lcc.ctx, lcc.fetchMaxWait)
159+
defer cancel()
160+
lcc.consume(ctx)
161+
for { // Business as usual: continue to fetch records until closed.
153162
select {
154163
case <-lcc.ctx.Done():
155164
return // Exit the goroutine if the context is done.
@@ -167,7 +176,7 @@ func (lcc *LogCompactedConsumer) Run(ctx context.Context) error {
167176
}
168177

169178
func (lcc *LogCompactedConsumer) consume(ctx context.Context) {
170-
fetches := lcc.client.PollRecords(lcc.ctx, -1) // This means all buffered.
179+
fetches := lcc.client.PollRecords(ctx, -1) // This means all buffered.
171180
if fetches.IsClientClosed() {
172181
lcc.logger.Info("kafka client closed, stopping fetch")
173182
return
@@ -185,6 +194,15 @@ func (lcc *LogCompactedConsumer) consume(ctx context.Context) {
185194
)
186195
})
187196
if fetches.Empty() {
197+
// No records were returned across all partitions; consider the consumer
198+
// synced because there is nothing to process to reach the current HWM.
199+
select {
200+
case <-lcc.synced:
201+
// already synced
202+
default:
203+
lcc.syncDelta.Store(0)
204+
close(lcc.synced)
205+
}
188206
return
189207
}
190208
lcc.mu.RLock()
@@ -260,6 +278,8 @@ func (lcc *LogCompactedConsumer) Close() error {
260278

261279
// Healthy checks if the consumer is healthy by ensuring that it has had a full
262280
// sync and that the underlying Kafka client can ping a broker in the cluster.
281+
// If the first fetch returns no records, the consumer is considered synced
282+
// immediately. This may only happen when the topic has no records.
263283
//
264284
// This function can be used as a readiness probe.
265285
func (lcc *LogCompactedConsumer) Healthy(ctx context.Context) error {

kafka/log_compacted_consumer_test.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func TestLogCompactedConsumerSyncBehavior(t *testing.T) {
268268
ClientID: "sync-test-consumer",
269269
},
270270
Topic: topicName,
271-
FetchMaxWait: 10 * time.Millisecond,
271+
FetchMaxWait: time.Second,
272272
Processor: func(_ context.Context, iter *kgo.FetchesRecordIter) error {
273273
for !iter.Done() {
274274
_ = iter.Next() // We just count records, don't need content
@@ -414,3 +414,43 @@ func TestLogCompactedConsumerStartStop(t *testing.T) {
414414

415415
assert.Error(t, consumer.Run(ctx), "Run should error after Close")
416416
}
417+
418+
func TestLogCompactedConsumerHealthyOnEmptyFirstFetch(t *testing.T) {
419+
topicName := "empty-first-fetch-topic"
420+
cluster := newLogCompactedFakeCluster(t, topicName, 1)
421+
var processedRecords int32
422+
cfg := LogCompactedConfig{
423+
CommonConfig: CommonConfig{
424+
Brokers: cluster.ListenAddrs(),
425+
Logger: zapTest(t),
426+
ClientID: "empty-first-fetch-consumer",
427+
},
428+
Topic: topicName,
429+
FetchMaxWait: 100 * time.Millisecond,
430+
MinFetchSize: 0,
431+
Processor: func(_ context.Context, iter *kgo.FetchesRecordIter) error {
432+
for !iter.Done() {
433+
_ = iter.Next()
434+
atomic.AddInt32(&processedRecords, 1)
435+
}
436+
return nil
437+
},
438+
}
439+
440+
consumer, err := NewLogCompactedConsumer(cfg)
441+
require.NoError(t, err)
442+
t.Cleanup(func() { consumer.Close() })
443+
444+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
445+
defer cancel()
446+
require.NoError(t, consumer.Run(ctx))
447+
448+
// With an empty topic, the first fetch should return no records and the
449+
// consumer should be considered synced (healthy) without producing anything.
450+
require.Eventually(t, func() bool {
451+
return consumer.Healthy(ctx) == nil
452+
}, time.Second, 50*time.Millisecond)
453+
454+
// Sanity check: no records were processed.
455+
assert.Equal(t, int32(0), atomic.LoadInt32(&processedRecords))
456+
}

0 commit comments

Comments
 (0)