Skip to content

Commit 4310cde

Browse files
Delete old code for consuming multiple topics
1 parent 1ad71a0 commit 4310cde

File tree

8 files changed

+22
-642
lines changed

8 files changed

+22
-642
lines changed

pkg/dataobj/consumer/consumer.go

Lines changed: 14 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package consumer
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"sync"
87
"time"
98

@@ -18,47 +17,26 @@ type kafkaConsumer interface {
1817
PollFetches(context.Context) kgo.Fetches
1918
}
2019

20+
// processor allows mocking of [partitionProcessor] in tests.
21+
type processor interface {
22+
Append(records []*kgo.Record) bool
23+
}
24+
2125
// consumer polls records from the Kafka topic and passes each record to
2226
// its indended processor.
2327
type consumer struct {
24-
client kafkaConsumer
25-
logger log.Logger
26-
processors map[string]map[int32]processor
27-
mtx sync.RWMutex
28+
client kafkaConsumer
29+
logger log.Logger
30+
processor processor
31+
mtx sync.RWMutex
2832
}
2933

3034
// newConsumer returns a new consumer.
31-
func newConsumer(client kafkaConsumer, logger log.Logger) *consumer {
35+
func newConsumer(client kafkaConsumer, processor processor, logger log.Logger) *consumer {
3236
return &consumer{
33-
client: client,
34-
logger: logger,
35-
processors: make(map[string]map[int32]processor),
36-
}
37-
}
38-
39-
// OnRegister implements the [partitionProcessorListener] interface.
40-
func (c *consumer) OnRegister(topic string, partition int32, p processor) {
41-
c.mtx.Lock()
42-
defer c.mtx.Unlock()
43-
processorsByTopic, ok := c.processors[topic]
44-
if !ok {
45-
processorsByTopic = make(map[int32]processor)
46-
c.processors[topic] = processorsByTopic
47-
}
48-
processorsByTopic[partition] = p
49-
}
50-
51-
// OnDeregister implements the [partitionProcessorListener] interface.
52-
func (c *consumer) OnDeregister(topic string, partition int32) {
53-
c.mtx.Lock()
54-
defer c.mtx.Unlock()
55-
processorsByTopic, ok := c.processors[topic]
56-
if !ok {
57-
return
58-
}
59-
delete(processorsByTopic, partition)
60-
if len(processorsByTopic) == 0 {
61-
delete(c.processors, topic)
37+
client: client,
38+
logger: logger,
39+
processor: processor,
6240
}
6341
}
6442

@@ -96,56 +74,7 @@ func (c *consumer) pollFetches(ctx context.Context) error {
9674
if errors.Is(err, kgo.ErrClientClosed) || errors.Is(err, context.Canceled) {
9775
return err
9876
}
99-
// Some other error occurred. We will check it in
100-
// [processFetchTopicPartition] instead.
10177
}
102-
fetches.EachPartition(c.processFetchTopicPartition(ctx))
78+
c.processor.Append(fetches.Records())
10379
return nil
10480
}
105-
106-
func (c *consumer) processFetchTopicPartition(_ context.Context) func(kgo.FetchTopicPartition) {
107-
return func(fetch kgo.FetchTopicPartition) {
108-
if err := fetch.Err; err != nil {
109-
level.Error(c.logger).Log("msg", "failed to fetch records for topic partition", "topic", fetch.Topic, "partition", fetch.Partition, "err", err.Error())
110-
return
111-
}
112-
// If there are no records for this partition then skip it.
113-
if len(fetch.Records) == 0 {
114-
return
115-
}
116-
processor, err := c.processorForTopicPartition(fetch.Topic, fetch.Partition)
117-
if err != nil {
118-
// It should never happen that we fetch records for a newly
119-
// assigned partition before the lifecycler has registered a
120-
// processor for it. This is because [kgo.OnPartitionsAssigned]
121-
// guarantees to return before the client starts fetching records
122-
// for new partitions.
123-
//
124-
// However, it can happen the client has fetched records for a
125-
// partition that has just been reassigned to another consumer.
126-
// If this happens, we will attempt to process those records, but
127-
// may not have a processor for them as the processor would have
128-
// been deregistered via [kgo.OnPartitionsRevoked], and the
129-
// following log line will be emitted.
130-
level.Error(c.logger).Log("msg", "failed to get processor", "error", err.Error())
131-
return
132-
}
133-
_ = processor.Append(fetch.Records)
134-
}
135-
}
136-
137-
// processorForTopicPartition returns the processor for the topic and partition.
138-
// It returns an error if one does not exist.
139-
func (c *consumer) processorForTopicPartition(topic string, partition int32) (processor, error) {
140-
c.mtx.RLock()
141-
defer c.mtx.RUnlock()
142-
processorsByTopic, ok := c.processors[topic]
143-
if !ok {
144-
return nil, fmt.Errorf("unknown topic %s", topic)
145-
}
146-
p, ok := processorsByTopic[partition]
147-
if !ok {
148-
return nil, fmt.Errorf("unknown partition %d for topic %s", partition, topic)
149-
}
150-
return p, nil
151-
}
Lines changed: 1 addition & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,3 @@
11
package consumer
22

3-
import (
4-
"testing"
5-
6-
"github.com/go-kit/log"
7-
"github.com/stretchr/testify/require"
8-
)
9-
10-
func TestConsumer_OnRegister(t *testing.T) {
11-
c := newConsumer(&mockKafka{}, log.NewNopLogger())
12-
require.Empty(t, c.processors)
13-
// Register a processor for the topic.
14-
p1 := &partitionProcessor{}
15-
c.OnRegister("topic1", 1, p1)
16-
require.NotNil(t, c.processors["topic1"])
17-
require.Same(t, p1, c.processors["topic1"][1])
18-
// Register another processor for the same topic.
19-
p2 := &partitionProcessor{}
20-
c.OnRegister("topic1", 2, p2)
21-
require.Len(t, c.processors["topic1"], 2)
22-
require.Same(t, p1, c.processors["topic1"][1])
23-
require.Same(t, p2, c.processors["topic1"][2])
24-
// Register another processor for a different topic.
25-
p3 := &partitionProcessor{}
26-
c.OnRegister("topic2", 1, p3)
27-
require.NotNil(t, c.processors["topic1"])
28-
require.Len(t, c.processors["topic1"], 2)
29-
require.Same(t, p1, c.processors["topic1"][1])
30-
require.Same(t, p2, c.processors["topic1"][2])
31-
require.Len(t, c.processors["topic2"], 1)
32-
require.Same(t, p3, c.processors["topic2"][1])
33-
}
34-
35-
func TestConsumer_OnDeregister(t *testing.T) {
36-
c := newConsumer(&mockKafka{}, log.NewNopLogger())
37-
require.Empty(t, c.processors)
38-
// Register a bunch of processors for different topics.
39-
p1 := &partitionProcessor{}
40-
c.OnRegister("topic1", 1, p1)
41-
p2 := &partitionProcessor{}
42-
c.OnRegister("topic1", 2, p2)
43-
require.Len(t, c.processors["topic1"], 2)
44-
p3 := &partitionProcessor{}
45-
c.OnRegister("topic2", 1, p3)
46-
// Check all the processors are as expected.
47-
require.NotNil(t, c.processors["topic1"])
48-
require.Len(t, c.processors["topic1"], 2)
49-
require.Same(t, p1, c.processors["topic1"][1])
50-
require.Same(t, p2, c.processors["topic1"][2])
51-
require.Len(t, c.processors["topic2"], 1)
52-
require.Same(t, p3, c.processors["topic2"][1])
53-
// Now deregister one and check the remaining processors.
54-
c.OnDeregister("topic1", 1)
55-
require.Len(t, c.processors["topic1"], 1)
56-
require.Nil(t, c.processors["topic1"][1])
57-
require.Same(t, p2, c.processors["topic1"][2])
58-
// Now deregister the last one and check the topic is removed.
59-
c.OnDeregister("topic1", 2)
60-
require.Nil(t, c.processors["topic1"])
61-
// The other topic should be unmodified.
62-
require.Len(t, c.processors["topic2"], 1)
63-
require.Same(t, p3, c.processors["topic2"][1])
64-
}
3+
// TODO(grobinson): Write tests for the consumer.

pkg/dataobj/consumer/mock_test.go

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -227,63 +227,6 @@ func (m *mockPartitionProcessorFactory) New(_ context.Context, _ *kgo.Client, _
227227
return &mockPartitionProcessor{}
228228
}
229229

230-
type mockPartitionProcessorListener struct {
231-
processors map[string]map[int32]processor
232-
}
233-
234-
func (m *mockPartitionProcessorListener) OnRegister(topic string, partition int32, p processor) {
235-
if m.processors == nil {
236-
m.processors = make(map[string]map[int32]processor)
237-
}
238-
processorsByTopic, ok := m.processors[topic]
239-
if !ok {
240-
processorsByTopic = make(map[int32]processor)
241-
m.processors[topic] = processorsByTopic
242-
}
243-
processorsByTopic[partition] = p
244-
}
245-
func (m *mockPartitionProcessorListener) OnDeregister(topic string, partition int32) {
246-
processorsByTopic, ok := m.processors[topic]
247-
if !ok {
248-
return
249-
}
250-
delete(processorsByTopic, partition)
251-
if len(processorsByTopic) == 0 {
252-
delete(m.processors, topic)
253-
}
254-
}
255-
256-
// mockPartitionProcessorLifecycler mocks a [partitionProcessorLifecycler].
257-
type mockPartitionProcessorLifecycler struct {
258-
processors map[string]map[int32]struct{}
259-
}
260-
261-
func (m *mockPartitionProcessorLifecycler) Register(_ context.Context, _ *kgo.Client, topic string, partition int32) {
262-
if m.processors == nil {
263-
m.processors = make(map[string]map[int32]struct{})
264-
}
265-
processorsByTopic, ok := m.processors[topic]
266-
if !ok {
267-
processorsByTopic = make(map[int32]struct{})
268-
m.processors[topic] = processorsByTopic
269-
}
270-
processorsByTopic[partition] = struct{}{}
271-
}
272-
func (m *mockPartitionProcessorLifecycler) Deregister(_ context.Context, topic string, partition int32) {
273-
if m.processors == nil {
274-
return
275-
}
276-
processorsByTopic, ok := m.processors[topic]
277-
if !ok {
278-
return
279-
}
280-
delete(processorsByTopic, partition)
281-
if len(processorsByTopic) == 0 {
282-
delete(m.processors, topic)
283-
}
284-
}
285-
func (m *mockPartitionProcessorLifecycler) Stop(_ context.Context) {}
286-
287230
type recordedTocEntry struct {
288231
DataObjectPath string
289232
MinTimestamp time.Time

pkg/dataobj/consumer/partition_lifecycler.go

Lines changed: 0 additions & 76 deletions
This file was deleted.

0 commit comments

Comments
 (0)