Skip to content

Commit 77abf7f

Browse files
wip
1 parent 4310cde commit 77abf7f

File tree

10 files changed

+268
-119
lines changed

10 files changed

+268
-119
lines changed

pkg/dataobj/consumer/config.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,33 @@ import (
44
"flag"
55
"time"
66

7+
"github.com/grafana/dskit/ring"
78
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
89
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
10+
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
11+
util_log "github.com/grafana/loki/v3/pkg/util/log"
912
)
1013

1114
type Config struct {
12-
logsobj.BuilderConfig
13-
UploaderConfig uploader.Config `yaml:"uploader"`
14-
IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"`
15+
BuilderConfig logsobj.BuilderConfig `yaml:"builder,omitempty`
16+
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
17+
PartitionRingConfig partitionring.Config `yaml:"partition_ring"`
18+
UploaderConfig uploader.Config `yaml:"uploader"`
19+
IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"`
1520
}
1621

1722
func (cfg *Config) Validate() error {
23+
if err := cfg.BuilderConfig.Validate(); err != nil {
24+
return err
25+
}
26+
if err := cfg.LifecyclerConfig.Validate(); err != nil {
27+
return err
28+
}
29+
// The PartitionRingConfig does not have a validate method.
1830
if err := cfg.UploaderConfig.Validate(); err != nil {
1931
return err
2032
}
21-
return cfg.BuilderConfig.Validate()
33+
return nil
2234
}
2335

2436
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -27,6 +39,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2739

2840
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
2941
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
42+
cfg.LifecyclerConfig.RegisterFlagsWithPrefix(prefix, f, util_log.Logger)
43+
cfg.PartitionRingConfig.RegisterFlagsWithPrefix(prefix, f)
3044
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)
3145

3246
f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*60*time.Second, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")

pkg/dataobj/consumer/consumer.go

Lines changed: 16 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,79 +2,48 @@ package consumer
22

33
import (
44
"context"
5-
"errors"
65
"sync"
7-
"time"
86

97
"github.com/go-kit/log"
10-
"github.com/go-kit/log/level"
11-
"github.com/grafana/dskit/backoff"
12-
"github.com/twmb/franz-go/pkg/kgo"
8+
"github.com/grafana/loki/v3/pkg/kafka/partition"
139
)
1410

15-
// kafkaConsumer allows mocking of certain [kgo.Client] methods in tests.
16-
type kafkaConsumer interface {
17-
PollFetches(context.Context) kgo.Fetches
18-
}
19-
2011
// processor allows mocking of [partitionProcessor] in tests.
2112
type processor interface {
22-
Append(records []*kgo.Record) bool
13+
Append(records []partition.Record) bool
2314
}
2415

2516
// consumer polls records from the Kafka topic and passes each record to
2617
// its indended processor.
2718
type consumer struct {
28-
client kafkaConsumer
2919
logger log.Logger
3020
processor processor
3121
mtx sync.RWMutex
3222
}
3323

3424
// newConsumer returns a new consumer.
35-
func newConsumer(client kafkaConsumer, processor processor, logger log.Logger) *consumer {
25+
func newConsumer(processor processor, logger log.Logger) *consumer {
3626
return &consumer{
37-
client: client,
3827
logger: logger,
3928
processor: processor,
4029
}
4130
}
4231

43-
// run starts the poll loop. It is stopped when either the context is canceled
44-
// or the kafka client is closed.
45-
func (c *consumer) Run(ctx context.Context) error {
46-
b := backoff.New(ctx, backoff.Config{
47-
MinBackoff: 100 * time.Millisecond,
48-
MaxBackoff: time.Second,
49-
MaxRetries: 0,
50-
})
51-
for b.Ongoing() {
52-
select {
53-
case <-ctx.Done():
54-
return nil
55-
default:
56-
if err := c.pollFetches(ctx); err != nil {
57-
if errors.Is(err, kgo.ErrClientClosed) {
58-
return nil
59-
}
60-
level.Error(c.logger).Log("msg", "failed to poll fetches", "err", err.Error())
61-
b.Wait()
62-
}
32+
func (c *consumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() {
33+
wg := sync.WaitGroup{}
34+
wg.Add(1)
35+
go func() {
36+
defer wg.Done()
37+
for records := range recordsChan {
38+
c.processor.Append(records)
6339
}
64-
}
65-
return nil
40+
}()
41+
return wg.Wait
6642
}
6743

68-
func (c *consumer) pollFetches(ctx context.Context) error {
69-
fetches := c.client.PollFetches(ctx)
70-
// If the client is closed, or the context was canceled, return the error
71-
// as no fetches were polled. We use this instead of [kgo.IsClientClosed]
72-
// so we can also check if the context was canceled.
73-
if err := fetches.Err0(); err != nil {
74-
if errors.Is(err, kgo.ErrClientClosed) || errors.Is(err, context.Canceled) {
75-
return err
76-
}
44+
// newConsumerFactory returns a consumer factory.
45+
func newConsumerFactory(partitionProcessorFactory *partitionProcessorFactory) partition.ConsumerFactory {
46+
return func(committer partition.Committer, logger log.Logger) (partition.Consumer, error) {
47+
return newConsumer(partitionProcessorFactory.New(committer, logger), logger), nil
7748
}
78-
c.processor.Append(fetches.Records())
79-
return nil
8049
}

pkg/dataobj/consumer/partition_processor.go

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ import (
55
"errors"
66
"fmt"
77
"io"
8-
"strconv"
98
"sync"
109
"time"
11-
"unicode/utf8"
1210

1311
"github.com/coder/quartz"
1412
"github.com/go-kit/log"
@@ -24,6 +22,7 @@ import (
2422
"github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy"
2523
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
2624
"github.com/grafana/loki/v3/pkg/kafka"
25+
"github.com/grafana/loki/v3/pkg/kafka/partition"
2726
"github.com/grafana/loki/v3/pkg/logproto"
2827
"github.com/grafana/loki/v3/pkg/scratch"
2928
)
@@ -39,7 +38,7 @@ type builder interface {
3938

4039
// committer allows mocking of certain [kgo.Client] methods in tests.
4140
type committer interface {
42-
CommitRecords(ctx context.Context, records ...*kgo.Record) error
41+
Commit(ctx context.Context, offset int64) error
4342
}
4443

4544
type producer interface {
@@ -52,10 +51,10 @@ type partitionProcessor struct {
5251
topic string
5352
partition int32
5453
// Processing pipeline
55-
records chan *kgo.Record
54+
records chan partition.Record
5655
// lastRecord contains the last record appended to the builder. It is used
5756
// to commit the correct offset after a flush.
58-
lastRecord *kgo.Record
57+
lastRecord *partition.Record
5958
builder builder
6059
decoder *kafka.Decoder
6160
uploader *uploader.Uploader
@@ -94,14 +93,12 @@ type partitionProcessor struct {
9493

9594
func newPartitionProcessor(
9695
ctx context.Context,
97-
client *kgo.Client,
96+
committer committer,
9897
builderCfg logsobj.BuilderConfig,
9998
uploaderCfg uploader.Config,
10099
metastoreCfg metastore.Config,
101100
bucket objstore.Bucket,
102101
scratchStore scratch.Store,
103-
topic string,
104-
partition int32,
105102
logger log.Logger,
106103
reg prometheus.Registerer,
107104
idleFlushTimeout time.Duration,
@@ -112,10 +109,10 @@ func newPartitionProcessor(
112109
if err != nil {
113110
panic(err)
114111
}
115-
reg = prometheus.WrapRegistererWith(prometheus.Labels{
116-
"topic": topic,
117-
"partition": strconv.Itoa(int(partition)),
118-
}, reg)
112+
// reg = prometheus.WrapRegistererWith(prometheus.Labels{
113+
// "topic": topic,
114+
// "partition": strconv.Itoa(int(partition)),
115+
// }, reg)
119116

120117
metrics := newPartitionOffsetMetrics()
121118
if err := metrics.register(reg); err != nil {
@@ -128,11 +125,9 @@ func newPartitionProcessor(
128125
}
129126

130127
return &partitionProcessor{
131-
committer: client,
132-
logger: log.With(logger, "partition", partition),
133-
topic: topic,
134-
partition: partition,
135-
records: make(chan *kgo.Record, 1000),
128+
committer: committer,
129+
logger: logger,
130+
records: make(chan partition.Record, 1000),
136131
ctx: ctx,
137132
cancel: cancel,
138133
decoder: decoder,
@@ -188,7 +183,7 @@ func (p *partitionProcessor) stop() {
188183

189184
// Drops records from the channel if the processor is stopped.
190185
// Returns false if the processor is stopped, true otherwise.
191-
func (p *partitionProcessor) Append(records []*kgo.Record) bool {
186+
func (p *partitionProcessor) Append(records []partition.Record) bool {
192187
for _, record := range records {
193188
select {
194189
// must check per-record in order to not block on a full channel
@@ -241,7 +236,7 @@ func (p *partitionProcessor) emitObjectWrittenEvent(objectPath string) error {
241236
return results.FirstErr()
242237
}
243238

244-
func (p *partitionProcessor) processRecord(record *kgo.Record) {
239+
func (p *partitionProcessor) processRecord(record partition.Record) {
245240
// Update offset metric at the end of processing
246241
defer p.metrics.updateOffset(record.Offset)
247242

@@ -254,14 +249,8 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
254249
return
255250
}
256251

257-
tenant := string(record.Key)
258-
if !utf8.ValidString(tenant) {
259-
// This shouldn't happen, but we catch it here.
260-
level.Error(p.logger).Log("msg", "record key is not valid UTF-8")
261-
return
262-
}
263-
264-
stream, err := p.decoder.DecodeWithoutLabels(record.Value)
252+
tenant := record.TenantID
253+
stream, err := p.decoder.DecodeWithoutLabels(record.Content)
265254
if err != nil {
266255
level.Error(p.logger).Log("msg", "failed to decode record", "err", err)
267256
return
@@ -287,7 +276,7 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
287276
}
288277
}
289278

290-
p.lastRecord = record
279+
p.lastRecord = &record
291280
p.lastModified = p.clock.Now()
292281
}
293282

@@ -349,7 +338,7 @@ func (p *partitionProcessor) commit() error {
349338
backoff.Reset()
350339
for backoff.Ongoing() {
351340
p.metrics.incCommitsTotal()
352-
err := p.committer.CommitRecords(p.ctx, p.lastRecord)
341+
err := p.committer.Commit(p.ctx, p.lastRecord.Offset)
353342
if err == nil {
354343
return nil
355344
}

pkg/dataobj/consumer/partition_processor_factory.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/twmb/franz-go/pkg/kgo"
1010

1111
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
12+
"github.com/grafana/loki/v3/pkg/kafka/partition"
1213
"github.com/grafana/loki/v3/pkg/scratch"
1314
)
1415

@@ -46,31 +47,16 @@ func newPartitionProcessorFactory(
4647
}
4748
}
4849

49-
// New creates a new processor for the per-tenant topic partition.
50-
//
51-
// New requires the caller to provide the [kgo.Client] as an argument. This
52-
// is due to a circular dependency that occurs when creating a [kgo.Client]
53-
// where the partition event handlers, such as [kgo.OnPartitionsAssigned] and
54-
// [kgo.OnPartitionsRevoked] must be registered when the client is created.
55-
// However, the lifecycler cannot be created without the factory, and the
56-
// factory cannot be created with a [kgo.Client]. This is why New requires a
57-
// [kgo.Client] as an argument.
58-
func (f *partitionProcessorFactory) New(
59-
ctx context.Context,
60-
client *kgo.Client,
61-
topic string,
62-
partition int32,
63-
) processor {
50+
// New returns a new processor for the partition.
51+
func (f *partitionProcessorFactory) New(committer partition.Committer, logger log.Logger) *partitionProcessor {
6452
return newPartitionProcessor(
65-
ctx,
66-
client,
53+
context.TODO(),
54+
committer,
6755
f.cfg.BuilderConfig,
6856
f.cfg.UploaderConfig,
6957
f.metastoreCfg,
7058
f.bucket,
7159
f.scratchStore,
72-
topic,
73-
partition,
7460
f.logger,
7561
f.reg,
7662
f.cfg.IdleFlushTimeout,

0 commit comments

Comments
 (0)