Skip to content

Commit c9aa4c8

Browse files
authored
feat: add missing project.id to consumer logger (#576)
## 🧑‍💻 What the main consumer logger is passed to internal topiconsumer which are not logging the additional fields from the field func. The topicconsumer doesn't have access to the config in the public Consumer. As a solution, the logfieldfunc is passed as well to delegate logging of the addtional field ## ❓ Why Logs should be consistent so they can be correlated ## ✅ How Look at log messages before and after
1 parent 9df9e40 commit c9aa4c8

File tree

1 file changed

+18
-6
lines changed

1 file changed

+18
-6
lines changed

kafka/consumer.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
183183
namespacePrefix := cfg.namespacePrefix()
184184
consumer := &consumer{
185185
topicPrefix: namespacePrefix,
186+
logFieldFn: cfg.TopicLogFieldFunc,
186187
assignments: make(map[topicPartition]*pc),
187188
processor: cfg.Processor,
188189
logger: cfg.Logger.Named("partition"),
@@ -394,6 +395,7 @@ type consumer struct {
394395
processor apmqueue.Processor
395396
logger *zap.Logger
396397
delivery apmqueue.DeliveryType
398+
logFieldFn TopicLogFieldFunc
397399
// ctx contains the graceful cancellation context that is passed to the
398400
// partition consumers.
399401
ctx context.Context
@@ -412,11 +414,16 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[
412414
for topic, partitions := range assigned {
413415
for _, partition := range partitions {
414416
t := strings.TrimPrefix(topic, c.topicPrefix)
417+
logger := c.logger.With(
418+
zap.String("topic", t),
419+
zap.Int32("partition", partition),
420+
)
421+
if c.logFieldFn != nil {
422+
logger = logger.With(c.logFieldFn(t))
423+
}
424+
415425
pc := newPartitionConsumer(c.ctx, client, c.processor,
416-
c.delivery, t, c.logger.With(
417-
zap.String("topic", t),
418-
zap.Int32("partition", partition),
419-
),
426+
c.delivery, t, logger,
420427
)
421428
c.assignments[topicPartition{topic: topic, partition: partition}] = pc
422429
}
@@ -492,12 +499,17 @@ func (c *consumer) processFetch(fetches kgo.Fetches) {
492499
// NOTE(marclop) While possible, this is unlikely to happen given the
493500
// locking that's in place in the caller.
494501
if c.delivery == apmqueue.AtMostOnceDeliveryType {
495-
c.logger.Warn(
502+
topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix)
503+
logger := c.logger
504+
if c.logFieldFn != nil {
505+
logger = logger.With(c.logFieldFn(topicName))
506+
}
507+
logger.Warn(
496508
"data loss: failed to send records to process after commit",
497509
zap.Error(errors.New(
498510
"attempted to process records for revoked partition",
499511
)),
500-
zap.String("topic", strings.TrimPrefix(ftp.Topic, c.topicPrefix)),
512+
zap.String("topic", topicName),
501513
zap.Int32("partition", ftp.Partition),
502514
zap.Int64("offset", ftp.HighWatermark),
503515
zap.Int("records", len(ftp.Records)),

0 commit comments

Comments
 (0)