Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/go/rpk/pkg/cli/group/seek.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ func seek(
var err error
switch to {
case "start":
listed, err = adm.ListStartOffsets(context.Background(), topics...)
listed, err = kafka.ListStartOffsetsWithRetries(context.Background(), adm, topics...)
case "end":
listed, err = adm.ListEndOffsets(context.Background(), topics...)
listed, err = kafka.ListEndOffsetsWithRetries(context.Background(), adm, topics...)
default:
var milli int64
milli, err = strconv.ParseInt(to, 10, 64)
Expand All @@ -247,7 +247,7 @@ func seek(
default:
out.Die("--to timestamp %q is not a second, nor a millisecond, nor a nanosecond", to)
}
listed, err = adm.ListOffsetsAfterMilli(context.Background(), milli, topics...)
listed, err = kafka.ListOffsetsAfterMilliWithRetries(context.Background(), adm, milli, topics...)
}
if err == nil { // ListOffsets can return ShardErrors, but we want to be entirely successful
err = listed.Error()
Expand Down
14 changes: 4 additions & 10 deletions src/go/rpk/pkg/cli/topic/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,15 @@ type offsetRanges struct {
func getOffsetsForTimeRange(ctx context.Context, adm *kadm.Client, tr timeRange, topics []string) (offsetRanges, error) {
var offsets offsetRanges

lstart, err := adm.ListOffsetsAfterMilli(ctx, tr.Start.UnixMilli(), topics...)
lstart, err := kafka.ListOffsetsAfterMilliWithRetries(ctx, adm, tr.Start.UnixMilli(), topics...)
if err != nil {
return offsets, err
}
if lstart.Error() != nil {
return offsets, fmt.Errorf("unable to list start offsets: %v", lstart.Error())
return offsets, fmt.Errorf("unable to list start offsets: %v", err)
}
offsets.startOffsets = lstart.Offsets()

lend, err := adm.ListOffsetsAfterMilli(ctx, tr.End.UnixMilli(), topics...)
lend, err := kafka.ListOffsetsAfterMilliWithRetries(ctx, adm, tr.End.UnixMilli(), topics...)
if err != nil {
return offsets, err
}
if lend.Error() != nil {
return offsets, fmt.Errorf("unable to list end offsets: %v", lend.Error())
return offsets, fmt.Errorf("unable to list end offsets: %v", err)
}
offsets.endOffsets = lend.Offsets()

Expand Down
29 changes: 7 additions & 22 deletions src/go/rpk/pkg/cli/topic/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ func (c *consumer) parseOffset(
return nil
}

lstart, err := adm.ListStartOffsets(context.Background(), topics...)
lstart, err := kafka.ListStartOffsetsWithRetries(context.Background(), adm, topics...)
if err != nil {
return fmt.Errorf("unable to list start offsets: %v", err)
}
lend, err := adm.ListEndOffsets(context.Background(), topics...)
lend, err := kafka.ListEndOffsetsWithRetries(context.Background(), adm, topics...)
if err != nil {
return fmt.Errorf("unable to list end offsets: %v", err)
}
Expand Down Expand Up @@ -594,10 +594,7 @@ func (c *consumer) parseTimeOffset(
// If there are no offsets after the requested milli, we get
// the default offset -1, which below in NewOffset().At(-1)
// actually coincidentally maps to AtEnd(). So it all works.
lstart, err := adm.ListOffsetsAfterMilli(context.Background(), startAt.UnixMilli(), topics...)
if err == nil {
err = lstart.Error()
}
lstart, err := kafka.ListOffsetsAfterMilliWithRetries(context.Background(), adm, startAt.UnixMilli(), topics...)
if err != nil {
return fmt.Errorf("unable to list offsets after milli %d: %v", startAt.UnixMilli(), err)
}
Expand All @@ -608,10 +605,7 @@ func (c *consumer) parseTimeOffset(
return nil
}
} else {
lstart, err := adm.ListStartOffsets(context.Background(), topics...)
if err == nil {
err = lstart.Error()
}
lstart, err := kafka.ListStartOffsetsWithRetries(context.Background(), adm, topics...)
if err != nil {
return fmt.Errorf("unable to list start offsets: %v", err)
}
Expand All @@ -632,18 +626,12 @@ func (c *consumer) parseTimeOffset(
}
var lend kadm.ListedOffsets
if end {
lend, err = adm.ListEndOffsets(context.Background(), topics...)
if err == nil {
err = lend.Error()
}
lend, err = kafka.ListEndOffsetsWithRetries(context.Background(), adm, topics...)
if err != nil {
return fmt.Errorf("unable to list end offsets: %v", err)
}
} else {
lend, err = adm.ListOffsetsAfterMilli(context.Background(), endAt.UnixMilli(), topics...)
if err == nil {
err = lend.Error()
}
lend, err = kafka.ListOffsetsAfterMilliWithRetries(context.Background(), adm, endAt.UnixMilli(), topics...)
if err != nil {
return fmt.Errorf("unable to list offsets after milli %d: %v", endAt.UnixMilli(), err)
}
Expand All @@ -654,10 +642,7 @@ func (c *consumer) parseTimeOffset(
// we want to list offsets to see where to start from and filter empty
// partitions.
if c.partEnds == nil {
lstart, err := adm.ListStartOffsets(context.Background(), topics...)
if err == nil {
err = lstart.Error()
}
lstart, err := kafka.ListStartOffsetsWithRetries(context.Background(), adm, topics...)
if err != nil {
return fmt.Errorf("unable to list start offsets: %v", err)
}
Expand Down
44 changes: 43 additions & 1 deletion src/go/rpk/pkg/cli/topic/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"slices"
"sort"
"strconv"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
Expand All @@ -30,6 +31,47 @@ import (
"github.com/twmb/types"
)

const (
// A broker can briefly return a retriable error (e.g.
// LEADER_NOT_AVAILABLE) for a partition whose leader it has not yet
// learned -- right after topic creation or while leadership moves between
// nodes. Refetch metadata for a bounded time so the partition listing is
// complete rather than intermittently missing partitions.
describeRetryTimeout = 11 * time.Second
describeRetryBackoff = 250 * time.Millisecond
)

// describeTopicMetadata requests topic metadata. When retryPartitions is set
// (i.e. the partition section is being printed), it refetches while any
// partition reports a retriable error, so a transient leadership gap does not
// yield an incomplete partition list. Non-retriable errors are returned as-is.
func describeTopicMetadata(
ctx context.Context, cl *kgo.Client, req *kmsg.MetadataRequest, retryPartitions bool,
) (*kmsg.MetadataResponse, error) {
deadline := time.Now().Add(describeRetryTimeout)
for {
resp, err := req.RequestWith(ctx, cl)
if err != nil || !retryPartitions || time.Now().After(deadline) || !hasRetriablePartitionErr(resp) {
return resp, err
}
time.Sleep(describeRetryBackoff)
}
}

// hasRetriablePartitionErr reports whether any partition in the metadata
// response carries a retriable error code.
func hasRetriablePartitionErr(resp *kmsg.MetadataResponse) bool {
for i := range resp.Topics {
for j := range resp.Topics[i].Partitions {
err := kerr.ErrorForCode(resp.Topics[i].Partitions[j].ErrorCode)
if err != nil && kerr.IsRetriable(err) {
return true
}
}
}
return false
}

func newDescribeCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
all bool
Expand Down Expand Up @@ -105,7 +147,7 @@ For example,
reqTopic.Topic = new(topic)
req.Topics = append(req.Topics, reqTopic)
}
resp, err := req.RequestWith(cmd.Context(), cl)
resp, err := describeTopicMetadata(cmd.Context(), cl, req, partitions)
out.MaybeDie(err, "unable to request topic metadata: %v", err)

var topicDescriptions []describedTopic
Expand Down
5 changes: 4 additions & 1 deletion src/go/rpk/pkg/kafka/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "kafka",
srcs = ["client_franz.go"],
srcs = [
"client_franz.go",
"offsets.go",
],
importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka",
visibility = ["//visibility:public"],
deps = [
Expand Down
103 changes: 103 additions & 0 deletions src/go/rpk/pkg/kafka/offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package kafka

import (
"context"
"errors"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
)

const (
// offsetListRetryTimeout bounds how long an offset listing retries. It
// matches the client RetryTimeout so offset listing does not retry any
// longer than the client would for an ordinary request.
offsetListRetryTimeout = 11 * time.Second
offsetListRetryBackoff = 250 * time.Millisecond
)

// ListStartOffsetsWithRetries wraps (*kadm.Client).ListStartOffsets, retrying
// transient errors; see retryListOffsets for the retry semantics.
func ListStartOffsetsWithRetries(
ctx context.Context, adm *kadm.Client, topics ...string,
) (kadm.ListedOffsets, error) {
return retryListOffsets(ctx, func(ctx context.Context) (kadm.ListedOffsets, error) {
return adm.ListStartOffsets(ctx, topics...)
})
}

// ListEndOffsetsWithRetries wraps (*kadm.Client).ListEndOffsets, retrying
// transient errors; see retryListOffsets for the retry semantics.
func ListEndOffsetsWithRetries(
ctx context.Context, adm *kadm.Client, topics ...string,
) (kadm.ListedOffsets, error) {
return retryListOffsets(ctx, func(ctx context.Context) (kadm.ListedOffsets, error) {
return adm.ListEndOffsets(ctx, topics...)
})
}

// ListOffsetsAfterMilliWithRetries wraps (*kadm.Client).ListOffsetsAfterMilli,
// retrying transient errors; see retryListOffsets for the retry semantics.
func ListOffsetsAfterMilliWithRetries(
ctx context.Context, adm *kadm.Client, milli int64, topics ...string,
) (kadm.ListedOffsets, error) {
return retryListOffsets(ctx, func(ctx context.Context) (kadm.ListedOffsets, error) {
return adm.ListOffsetsAfterMilli(ctx, milli, topics...)
})
}

// retryListOffsets runs a kadm offset-listing call, retrying for a bounded time
// while it fails with only retriable errors. A broker can briefly return a
// retriable error such as LEADER_NOT_AVAILABLE for a partition whose leader it
// has not yet learned -- for example right after topic creation or while
// leadership moves between brokers -- which a one-shot listing would otherwise
// surface as an outright failure. The error may be a top-level shard error or a
// per-partition error in the returned offsets; both are retried.
func retryListOffsets(
ctx context.Context,
list func(context.Context) (kadm.ListedOffsets, error),
) (kadm.ListedOffsets, error) {
deadline := time.Now().Add(offsetListRetryTimeout)
for {
l, err := list(ctx)
if err == nil {
err = l.Error()
}
if err == nil || time.Now().After(deadline) || !offsetListRetriable(err) {
return l, err
}
select {
case <-ctx.Done():
return l, err
case <-time.After(offsetListRetryBackoff):
}
}
}

// offsetListRetriable reports whether an offset-listing failure is composed
// entirely of retriable Kafka errors.
func offsetListRetriable(err error) bool {
var se *kadm.ShardErrors
if errors.As(err, &se) {
if len(se.Errs) == 0 {
return false
}
for _, e := range se.Errs {
if !kerr.IsRetriable(e.Err) {
return false
}
}
return true
}
return kerr.IsRetriable(err)
}
32 changes: 31 additions & 1 deletion src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "config/configuration.h"
#include "config/node_config.h"
#include "container/chunked_vector.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/schemata/metadata_response.h"
#include "kafka/protocol/types.h"
#include "kafka/server/errors.h"
Expand Down Expand Up @@ -157,7 +158,31 @@ metadata_response::topic make_topic_response_from_topic_metadata(
auto lt = get_leader_term(tp_ns, p_as.id, md_cache, replicas);
if (lt && !is_node_isolated && p.error_code == error_code::none) {
p.leader_id = lt->leader.value_or(no_leader);
p.leader_epoch = leader_epoch_from_term(lt->term);

if (lt->term.has_value()) {
p.leader_epoch = leader_epoch_from_term(lt->term);
} else {
// We don't have term information for the partition, so submit a
// stale guess of term 0. We deliberately avoid the invalid
// epoch (-1): the Java client treats -1 as a signal to drop its
// cached leader epochs, which interferes with truncation
// detection (KIP-320). Term 0 is parsed as an ordinary (stale)
// epoch instead.
p.leader_epoch = leader_epoch_from_term(model::term_id(0));

// Pair the stale guess with an error so clients refetch
// metadata.
//
// Franz go skips processing the partition altogether if there
// is an error, regardless of the term, opting to retry later.
// https://github.com/twmb/franz-go/blob/8268a5d078c01d29ca0daa1748fac264e0fc2f11/pkg/kgo/metadata.go#L1011
//
// The Java client still parses the stale guess from above, but
// also treats this error as a signal to request another update
// immediately.
// https://github.com/apache/kafka/blob/5db02ead60fbc937b3c51a51ecd6e93936dddf88/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L306-L310
p.error_code = error_code::leader_not_available;
}
}
if (is_node_isolated && p.error_code == error_code::none) {
auto replicas_for_sfuffle = replicas;
Expand All @@ -171,6 +196,11 @@ metadata_response::topic make_topic_response_from_topic_metadata(
break;
}
}

// An isolated node only has a stale guess at leadership, so apply
// the same term 0 + error as the missing-term case above.
p.leader_epoch = leader_epoch_from_term(model::term_id(0));
p.error_code = error_code::leader_not_available;
}
p.replica_nodes = std::move(replicas);
p.isr_nodes = p.replica_nodes;
Expand Down
16 changes: 8 additions & 8 deletions tests/go/kgo-verifier/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.11.1
github.com/twmb/franz-go v1.20.6-0.20251204171952-b7b6b8e44d30
github.com/twmb/franz-go/pkg/kadm v1.17.1
github.com/twmb/franz-go/pkg/kmsg v1.12.0
github.com/twmb/franz-go v1.21.2
github.com/twmb/franz-go/pkg/kadm v1.17.2
github.com/twmb/franz-go/pkg/kmsg v1.13.1
github.com/vectorizedio/redpanda/src/go/rpk v0.0.0-20211217123319-86af7226d9f0
golang.org/x/sync v0.18.0
golang.org/x/time v0.9.0
Expand All @@ -23,11 +23,11 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/icza/dyno v0.0.0-20200205103839-49cb13720835 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/klauspost/compress v1.18.6 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.3.0 // indirect
Expand All @@ -37,9 +37,9 @@ require (
github.com/spf13/viper v1.7.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/twmb/tlscfg v1.2.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.31.0 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.37.0 // indirect
gopkg.in/ini.v1 v1.51.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading
Loading