Skip to content

Commit 71d8d24

Browse files
authored
kafka: queuemanager add api to list all topics (#687)
Add API to Kafka queuemanager to list all topics from the broker. In case an error is returned for a specific topic, it will be appened to the error list instead.
1 parent 287661e commit 71d8d24

File tree

2 files changed

+59
-0
lines changed

2 files changed

+59
-0
lines changed

kafka/manager.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"fmt"
2424
"regexp"
25+
"slices"
2526
"strings"
2627

2728
"github.com/twmb/franz-go/pkg/kadm"
@@ -341,3 +342,25 @@ func (m *Manager) CreateACLs(ctx context.Context, acls *kadm.ACLBuilder) error {
341342
}
342343
return errors.Join(errs...)
343344
}
345+
346+
// ListTopics returns all topics that begin with prefix in lexicographical order from the Kafka broker.
347+
func (m *Manager) ListTopics(ctx context.Context, prefix string) ([]string, error) {
348+
details, err := m.adminClient.ListTopics(ctx)
349+
if err != nil {
350+
return nil, err
351+
}
352+
topics := make([]string, 0, len(details))
353+
var errs []error
354+
for _, t := range details {
355+
if !strings.HasPrefix(t.Topic, prefix) {
356+
continue
357+
}
358+
if t.Err != nil {
359+
errs = append(errs, fmt.Errorf("%s %w", t.Topic, t.Err))
360+
continue
361+
}
362+
topics = append(topics, t.Topic)
363+
}
364+
slices.Sort(topics)
365+
return topics, errors.Join(errs...)
366+
}

kafka/manager_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,42 @@ func TestManagerCreateACLs(t *testing.T) {
705705
})
706706
}
707707

708+
func TestListTopics(t *testing.T) {
709+
cluster, commonConfig := newFakeCluster(t)
710+
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
711+
require.NoError(t, err)
712+
t.Cleanup(func() { m.Close() })
713+
var metadataRequest *kmsg.MetadataRequest
714+
cluster.ControlKey(kmsg.Metadata.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
715+
metadataRequest = req.(*kmsg.MetadataRequest)
716+
cluster.KeepControl()
717+
return &kmsg.MetadataResponse{
718+
Version: metadataRequest.Version,
719+
Brokers: []kmsg.MetadataResponseBroker{},
720+
Topics: []kmsg.MetadataResponseTopic{{
721+
Topic: kmsg.StringPtr("name_space-topic1"),
722+
Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 1}, {Partition: 2}},
723+
}, {
724+
Topic: kmsg.StringPtr("name_space-topic2"),
725+
Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 3}},
726+
ErrorCode: kerr.UnknownTopicOrPartition.Code,
727+
}, {
728+
Topic: kmsg.StringPtr("name_space-topic3"),
729+
Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 4}},
730+
}, {
731+
Topic: kmsg.StringPtr("name_space-mytopic"),
732+
Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 1}},
733+
}, {
734+
Topic: kmsg.StringPtr("rnd-topic"),
735+
Partitions: []kmsg.MetadataResponseTopicPartition{{Partition: 4}},
736+
}},
737+
}, nil, true
738+
})
739+
topics, err := m.ListTopics(context.Background(), "name_space")
740+
assert.EqualError(t, err, "name_space-topic2 UNKNOWN_TOPIC_OR_PARTITION: This server does not host this topic-partition.")
741+
assert.Equal(t, []string{"name_space-mytopic", "name_space-topic1", "name_space-topic3"}, topics)
742+
}
743+
708744
func newFakeCluster(t testing.TB) (*kfake.Cluster, CommonConfig) {
709745
cluster, err := kfake.NewCluster(
710746
// Just one broker to simplify dealing with sharded requests.

0 commit comments

Comments
 (0)