Skip to content

Commit 30e3f8f

Browse files
authored
Merge pull request #30781 from vbotbuildovich/ai-backport-pr-30538-v26.1.x-1781249226
[v26.1.x] kafka: make metadata return term 0 when missing info
2 parents 9c058a1 + 64ee43a commit 30e3f8f

10 files changed

Lines changed: 231 additions & 68 deletions

File tree

src/go/rpk/pkg/cli/group/seek.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,9 @@ func seek(
231231
var err error
232232
switch to {
233233
case "start":
234-
listed, err = adm.ListStartOffsets(context.Background(), topics...)
234+
listed, err = kafka.ListStartOffsetsWithRetries(context.Background(), adm, topics...)
235235
case "end":
236-
listed, err = adm.ListEndOffsets(context.Background(), topics...)
236+
listed, err = kafka.ListEndOffsetsWithRetries(context.Background(), adm, topics...)
237237
default:
238238
var milli int64
239239
milli, err = strconv.ParseInt(to, 10, 64)
@@ -247,7 +247,7 @@ func seek(
247247
default:
248248
out.Die("--to timestamp %q is not a second, nor a millisecond, nor a nanosecond", to)
249249
}
250-
listed, err = adm.ListOffsetsAfterMilli(context.Background(), milli, topics...)
250+
listed, err = kafka.ListOffsetsAfterMilliWithRetries(context.Background(), adm, milli, topics...)
251251
}
252252
if err == nil { // ListOffsets can return ShardErrors, but we want to be entirely successful
253253
err = listed.Error()

src/go/rpk/pkg/cli/topic/analyze.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,21 +158,15 @@ type offsetRanges struct {
158158
func getOffsetsForTimeRange(ctx context.Context, adm *kadm.Client, tr timeRange, topics []string) (offsetRanges, error) {
159159
var offsets offsetRanges
160160

161-
lstart, err := adm.ListOffsetsAfterMilli(ctx, tr.Start.UnixMilli(), topics...)
161+
lstart, err := kafka.ListOffsetsAfterMilliWithRetries(ctx, adm, tr.Start.UnixMilli(), topics...)
162162
if err != nil {
163-
return offsets, err
164-
}
165-
if lstart.Error() != nil {
166-
return offsets, fmt.Errorf("unable to list start offsets: %v", lstart.Error())
163+
return offsets, fmt.Errorf("unable to list start offsets: %v", err)
167164
}
168165
offsets.startOffsets = lstart.Offsets()
169166

170-
lend, err := adm.ListOffsetsAfterMilli(ctx, tr.End.UnixMilli(), topics...)
167+
lend, err := kafka.ListOffsetsAfterMilliWithRetries(ctx, adm, tr.End.UnixMilli(), topics...)
171168
if err != nil {
172-
return offsets, err
173-
}
174-
if lend.Error() != nil {
175-
return offsets, fmt.Errorf("unable to list end offsets: %v", lend.Error())
169+
return offsets, fmt.Errorf("unable to list end offsets: %v", err)
176170
}
177171
offsets.endOffsets = lend.Offsets()
178172

src/go/rpk/pkg/cli/topic/consume.go

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -363,11 +363,11 @@ func (c *consumer) parseOffset(
363363
return nil
364364
}
365365

366-
lstart, err := adm.ListStartOffsets(context.Background(), topics...)
366+
lstart, err := kafka.ListStartOffsetsWithRetries(context.Background(), adm, topics...)
367367
if err != nil {
368368
return fmt.Errorf("unable to list start offsets: %v", err)
369369
}
370-
lend, err := adm.ListEndOffsets(context.Background(), topics...)
370+
lend, err := kafka.ListEndOffsetsWithRetries(context.Background(), adm, topics...)
371371
if err != nil {
372372
return fmt.Errorf("unable to list end offsets: %v", err)
373373
}
@@ -594,10 +594,7 @@ func (c *consumer) parseTimeOffset(
594594
// If there are no offsets after the requested milli, we get
595595
// the default offset -1, which below in NewOffset().At(-1)
596596
// actually coincidentally maps to AtEnd(). So it all works.
597-
lstart, err := adm.ListOffsetsAfterMilli(context.Background(), startAt.UnixMilli(), topics...)
598-
if err == nil {
599-
err = lstart.Error()
600-
}
597+
lstart, err := kafka.ListOffsetsAfterMilliWithRetries(context.Background(), adm, startAt.UnixMilli(), topics...)
601598
if err != nil {
602599
return fmt.Errorf("unable to list offsets after milli %d: %v", startAt.UnixMilli(), err)
603600
}
@@ -608,10 +605,7 @@ func (c *consumer) parseTimeOffset(
608605
return nil
609606
}
610607
} else {
611-
lstart, err := adm.ListStartOffsets(context.Background(), topics...)
612-
if err == nil {
613-
err = lstart.Error()
614-
}
608+
lstart, err := kafka.ListStartOffsetsWithRetries(context.Background(), adm, topics...)
615609
if err != nil {
616610
return fmt.Errorf("unable to list start offsets: %v", err)
617611
}
@@ -632,18 +626,12 @@ func (c *consumer) parseTimeOffset(
632626
}
633627
var lend kadm.ListedOffsets
634628
if end {
635-
lend, err = adm.ListEndOffsets(context.Background(), topics...)
636-
if err == nil {
637-
err = lend.Error()
638-
}
629+
lend, err = kafka.ListEndOffsetsWithRetries(context.Background(), adm, topics...)
639630
if err != nil {
640631
return fmt.Errorf("unable to list end offsets: %v", err)
641632
}
642633
} else {
643-
lend, err = adm.ListOffsetsAfterMilli(context.Background(), endAt.UnixMilli(), topics...)
644-
if err == nil {
645-
err = lend.Error()
646-
}
634+
lend, err = kafka.ListOffsetsAfterMilliWithRetries(context.Background(), adm, endAt.UnixMilli(), topics...)
647635
if err != nil {
648636
return fmt.Errorf("unable to list offsets after milli %d: %v", endAt.UnixMilli(), err)
649637
}
@@ -654,10 +642,7 @@ func (c *consumer) parseTimeOffset(
654642
// we want to list offsets to see where to start from and filter empty
655643
// partitions.
656644
if c.partEnds == nil {
657-
lstart, err := adm.ListStartOffsets(context.Background(), topics...)
658-
if err == nil {
659-
err = lstart.Error()
660-
}
645+
lstart, err := kafka.ListStartOffsetsWithRetries(context.Background(), adm, topics...)
661646
if err != nil {
662647
return fmt.Errorf("unable to list start offsets: %v", err)
663648
}

src/go/rpk/pkg/cli/topic/describe.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"slices"
1919
"sort"
2020
"strconv"
21+
"time"
2122

2223
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
2324
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
@@ -30,6 +31,47 @@ import (
3031
"github.com/twmb/types"
3132
)
3233

34+
const (
35+
// A broker can briefly return a retriable error (e.g.
36+
// LEADER_NOT_AVAILABLE) for a partition whose leader it has not yet
37+
// learned -- right after topic creation or while leadership moves between
38+
// nodes. Refetch metadata for a bounded time so the partition listing is
39+
// complete rather than intermittently missing partitions.
40+
describeRetryTimeout = 11 * time.Second
41+
describeRetryBackoff = 250 * time.Millisecond
42+
)
43+
44+
// describeTopicMetadata requests topic metadata. When retryPartitions is set
45+
// (i.e. the partition section is being printed), it refetches while any
46+
// partition reports a retriable error, so a transient leadership gap does not
47+
// yield an incomplete partition list. Non-retriable errors are returned as-is.
48+
func describeTopicMetadata(
49+
ctx context.Context, cl *kgo.Client, req *kmsg.MetadataRequest, retryPartitions bool,
50+
) (*kmsg.MetadataResponse, error) {
51+
deadline := time.Now().Add(describeRetryTimeout)
52+
for {
53+
resp, err := req.RequestWith(ctx, cl)
54+
if err != nil || !retryPartitions || time.Now().After(deadline) || !hasRetriablePartitionErr(resp) {
55+
return resp, err
56+
}
57+
time.Sleep(describeRetryBackoff)
58+
}
59+
}
60+
61+
// hasRetriablePartitionErr reports whether any partition in the metadata
62+
// response carries a retriable error code.
63+
func hasRetriablePartitionErr(resp *kmsg.MetadataResponse) bool {
64+
for i := range resp.Topics {
65+
for j := range resp.Topics[i].Partitions {
66+
err := kerr.ErrorForCode(resp.Topics[i].Partitions[j].ErrorCode)
67+
if err != nil && kerr.IsRetriable(err) {
68+
return true
69+
}
70+
}
71+
}
72+
return false
73+
}
74+
3375
func newDescribeCommand(fs afero.Fs, p *config.Params) *cobra.Command {
3476
var (
3577
all bool
@@ -105,7 +147,7 @@ For example,
105147
reqTopic.Topic = new(topic)
106148
req.Topics = append(req.Topics, reqTopic)
107149
}
108-
resp, err := req.RequestWith(cmd.Context(), cl)
150+
resp, err := describeTopicMetadata(cmd.Context(), cl, req, partitions)
109151
out.MaybeDie(err, "unable to request topic metadata: %v", err)
110152

111153
var topicDescriptions []describedTopic

src/go/rpk/pkg/kafka/BUILD

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "kafka",
5-
srcs = ["client_franz.go"],
5+
srcs = [
6+
"client_franz.go",
7+
"offsets.go",
8+
],
69
importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka",
710
visibility = ["//visibility:public"],
811
deps = [

src/go/rpk/pkg/kafka/offsets.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2026 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
package kafka
11+
12+
import (
13+
"context"
14+
"errors"
15+
"time"
16+
17+
"github.com/twmb/franz-go/pkg/kadm"
18+
"github.com/twmb/franz-go/pkg/kerr"
19+
)
20+
21+
const (
22+
// offsetListRetryTimeout bounds how long an offset listing retries. It
23+
// matches the client RetryTimeout so offset listing does not retry any
24+
// longer than the client would for an ordinary request.
25+
offsetListRetryTimeout = 11 * time.Second
26+
offsetListRetryBackoff = 250 * time.Millisecond
27+
)
28+
29+
// ListStartOffsetsWithRetries wraps (*kadm.Client).ListStartOffsets, retrying
30+
// transient errors; see retryListOffsets for the retry semantics.
31+
func ListStartOffsetsWithRetries(
32+
ctx context.Context, adm *kadm.Client, topics ...string,
33+
) (kadm.ListedOffsets, error) {
34+
return retryListOffsets(ctx, func(ctx context.Context) (kadm.ListedOffsets, error) {
35+
return adm.ListStartOffsets(ctx, topics...)
36+
})
37+
}
38+
39+
// ListEndOffsetsWithRetries wraps (*kadm.Client).ListEndOffsets, retrying
40+
// transient errors; see retryListOffsets for the retry semantics.
41+
func ListEndOffsetsWithRetries(
42+
ctx context.Context, adm *kadm.Client, topics ...string,
43+
) (kadm.ListedOffsets, error) {
44+
return retryListOffsets(ctx, func(ctx context.Context) (kadm.ListedOffsets, error) {
45+
return adm.ListEndOffsets(ctx, topics...)
46+
})
47+
}
48+
49+
// ListOffsetsAfterMilliWithRetries wraps (*kadm.Client).ListOffsetsAfterMilli,
50+
// retrying transient errors; see retryListOffsets for the retry semantics.
51+
func ListOffsetsAfterMilliWithRetries(
52+
ctx context.Context, adm *kadm.Client, milli int64, topics ...string,
53+
) (kadm.ListedOffsets, error) {
54+
return retryListOffsets(ctx, func(ctx context.Context) (kadm.ListedOffsets, error) {
55+
return adm.ListOffsetsAfterMilli(ctx, milli, topics...)
56+
})
57+
}
58+
59+
// retryListOffsets runs a kadm offset-listing call, retrying for a bounded time
60+
// while it fails with only retriable errors. A broker can briefly return a
61+
// retriable error such as LEADER_NOT_AVAILABLE for a partition whose leader it
62+
// has not yet learned -- for example right after topic creation or while
63+
// leadership moves between brokers -- which a one-shot listing would otherwise
64+
// surface as an outright failure. The error may be a top-level shard error or a
65+
// per-partition error in the returned offsets; both are retried.
66+
func retryListOffsets(
67+
ctx context.Context,
68+
list func(context.Context) (kadm.ListedOffsets, error),
69+
) (kadm.ListedOffsets, error) {
70+
deadline := time.Now().Add(offsetListRetryTimeout)
71+
for {
72+
l, err := list(ctx)
73+
if err == nil {
74+
err = l.Error()
75+
}
76+
if err == nil || time.Now().After(deadline) || !offsetListRetriable(err) {
77+
return l, err
78+
}
79+
select {
80+
case <-ctx.Done():
81+
return l, err
82+
case <-time.After(offsetListRetryBackoff):
83+
}
84+
}
85+
}
86+
87+
// offsetListRetriable reports whether an offset-listing failure is composed
88+
// entirely of retriable Kafka errors.
89+
func offsetListRetriable(err error) bool {
90+
var se *kadm.ShardErrors
91+
if errors.As(err, &se) {
92+
if len(se.Errs) == 0 {
93+
return false
94+
}
95+
for _, e := range se.Errs {
96+
if !kerr.IsRetriable(e.Err) {
97+
return false
98+
}
99+
}
100+
return true
101+
}
102+
return kerr.IsRetriable(err)
103+
}

src/v/kafka/server/handlers/metadata.cc

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "config/configuration.h"
1717
#include "config/node_config.h"
1818
#include "container/chunked_vector.h"
19+
#include "kafka/protocol/errors.h"
1920
#include "kafka/protocol/schemata/metadata_response.h"
2021
#include "kafka/protocol/types.h"
2122
#include "kafka/server/errors.h"
@@ -157,7 +158,31 @@ metadata_response::topic make_topic_response_from_topic_metadata(
157158
auto lt = get_leader_term(tp_ns, p_as.id, md_cache, replicas);
158159
if (lt && !is_node_isolated && p.error_code == error_code::none) {
159160
p.leader_id = lt->leader.value_or(no_leader);
160-
p.leader_epoch = leader_epoch_from_term(lt->term);
161+
162+
if (lt->term.has_value()) {
163+
p.leader_epoch = leader_epoch_from_term(lt->term);
164+
} else {
165+
// We don't have term information for the partition, so submit a
166+
// stale guess of term 0. We deliberately avoid the invalid
167+
// epoch (-1): the Java client treats -1 as a signal to drop its
168+
// cached leader epochs, which interferes with truncation
169+
// detection (KIP-320). Term 0 is parsed as an ordinary (stale)
170+
// epoch instead.
171+
p.leader_epoch = leader_epoch_from_term(model::term_id(0));
172+
173+
// Pair the stale guess with an error so clients refetch
174+
// metadata.
175+
//
176+
// Franz go skips processing the partition altogether if there
177+
// is an error, regardless of the term, opting to retry later.
178+
// https://github.com/twmb/franz-go/blob/8268a5d078c01d29ca0daa1748fac264e0fc2f11/pkg/kgo/metadata.go#L1011
179+
//
180+
// The Java client still parses the stale guess from above, but
181+
// also treats this error as a signal to request another update
182+
// immediately.
183+
// https://github.com/apache/kafka/blob/5db02ead60fbc937b3c51a51ecd6e93936dddf88/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L306-L310
184+
p.error_code = error_code::leader_not_available;
185+
}
161186
}
162187
if (is_node_isolated && p.error_code == error_code::none) {
163188
auto replicas_for_sfuffle = replicas;
@@ -171,6 +196,11 @@ metadata_response::topic make_topic_response_from_topic_metadata(
171196
break;
172197
}
173198
}
199+
200+
// An isolated node only has a stale guess at leadership, so apply
201+
// the same term 0 + error as the missing-term case above.
202+
p.leader_epoch = leader_epoch_from_term(model::term_id(0));
203+
p.error_code = error_code::leader_not_available;
174204
}
175205
p.replica_nodes = std::move(replicas);
176206
p.isr_nodes = p.replica_nodes;

tests/go/kgo-verifier/go.mod

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ require (
99
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
1010
github.com/sirupsen/logrus v1.9.3
1111
github.com/stretchr/testify v1.11.1
12-
github.com/twmb/franz-go v1.20.6-0.20251204171952-b7b6b8e44d30
13-
github.com/twmb/franz-go/pkg/kadm v1.17.1
14-
github.com/twmb/franz-go/pkg/kmsg v1.12.0
12+
github.com/twmb/franz-go v1.21.2
13+
github.com/twmb/franz-go/pkg/kadm v1.17.2
14+
github.com/twmb/franz-go/pkg/kmsg v1.13.1
1515
github.com/vectorizedio/redpanda/src/go/rpk v0.0.0-20211217123319-86af7226d9f0
1616
golang.org/x/sync v0.18.0
1717
golang.org/x/time v0.9.0
@@ -23,11 +23,11 @@ require (
2323
github.com/hashicorp/hcl v1.0.0 // indirect
2424
github.com/icza/dyno v0.0.0-20200205103839-49cb13720835 // indirect
2525
github.com/inconshreveable/mousetrap v1.0.0 // indirect
26-
github.com/klauspost/compress v1.18.1 // indirect
26+
github.com/klauspost/compress v1.18.6 // indirect
2727
github.com/magiconair/properties v1.8.1 // indirect
2828
github.com/mitchellh/mapstructure v1.4.1 // indirect
2929
github.com/pelletier/go-toml v1.2.0 // indirect
30-
github.com/pierrec/lz4/v4 v4.1.22 // indirect
30+
github.com/pierrec/lz4/v4 v4.1.26 // indirect
3131
github.com/pmezard/go-difflib v1.0.0 // indirect
3232
github.com/spf13/afero v1.6.0 // indirect
3333
github.com/spf13/cast v1.3.0 // indirect
@@ -37,9 +37,9 @@ require (
3737
github.com/spf13/viper v1.7.0 // indirect
3838
github.com/subosito/gotenv v1.2.0 // indirect
3939
github.com/twmb/tlscfg v1.2.0 // indirect
40-
golang.org/x/crypto v0.45.0 // indirect
41-
golang.org/x/sys v0.38.0 // indirect
42-
golang.org/x/text v0.31.0 // indirect
40+
golang.org/x/crypto v0.51.0 // indirect
41+
golang.org/x/sys v0.44.0 // indirect
42+
golang.org/x/text v0.37.0 // indirect
4343
gopkg.in/ini.v1 v1.51.0 // indirect
4444
gopkg.in/yaml.v2 v2.4.0 // indirect
4545
gopkg.in/yaml.v3 v3.0.1 // indirect

0 commit comments

Comments
 (0)