diff --git a/cmd/topicctl/subcmd/tail.go b/cmd/topicctl/subcmd/tail.go index 555727ba..38b3a323 100644 --- a/cmd/topicctl/subcmd/tail.go +++ b/cmd/topicctl/subcmd/tail.go @@ -26,6 +26,7 @@ type tailCmdConfig struct { partitions []int raw bool headers bool + groupID string shared sharedOptions } @@ -57,6 +58,12 @@ func init() { true, "Output message headers", ) + tailCmd.Flags().StringVar( + &tailConfig.groupID, + "group-id", + "", + "Consumer group ID to tail with", + ) addSharedFlags(tailCmd, &tailConfig.shared) RootCmd.AddCommand(tailCmd) @@ -97,6 +104,7 @@ func tailRun(cmd *cobra.Command, args []string) error { "", tailConfig.raw, tailConfig.headers, + tailConfig.groupID, ) } diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 19b41067..2affb13b 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -657,9 +657,11 @@ func (c *CLIRunner) Tail( filterRegexp string, raw bool, headers bool, + groupID string, ) error { var err error - if len(partitions) == 0 { + + if groupID == "" && len(partitions) == 0 { topicInfo, err := c.adminClient.GetTopic(ctx, topic, false) if err != nil { return err @@ -672,6 +674,7 @@ func (c *CLIRunner) Tail( tailer := messages.NewTopicTailer( c.adminClient.GetConnector(), topic, + groupID, partitions, offset, 10e3, diff --git a/pkg/cli/repl.go b/pkg/cli/repl.go index d1f26826..2bcc1d2d 100644 --- a/pkg/cli/repl.go +++ b/pkg/cli/repl.go @@ -387,7 +387,7 @@ func (r *Repl) executor(in string) { if err := command.checkArgs( 2, 3, - map[string]struct{}{"filter": {}, "raw": {}}, + map[string]struct{}{"filter": {}, "raw": {}, "group-id": {}}, ); err != nil { log.Errorf("Error: %+v", err) return @@ -410,6 +410,7 @@ func (r *Repl) executor(in string) { filterRegexp, command.getBoolValue("raw"), command.getBoolValue("headers"), + command.flags["group-id"], ) if err != nil { log.Errorf("Error: %+v", err) diff --git a/pkg/messages/tail.go b/pkg/messages/tail.go index 039b8bad..d2f67342 100644 --- a/pkg/messages/tail.go +++ b/pkg/messages/tail.go @@ -26,6 +26,7 @@ import ( type TopicTailer struct { Connector *admin.Connector topic string + groupID string partitions []int offset int64 minBytes int @@ -36,6 +37,7 @@ type TopicTailer struct { func NewTopicTailer( Connector *admin.Connector, topic string, + groupID string, partitions []int, offset int64, minBytes int, @@ -44,6 +46,7 @@ func NewTopicTailer( return &TopicTailer{ Connector: Connector, topic: topic, + groupID: groupID, partitions: partitions, offset: offset, minBytes: minBytes, @@ -77,32 +80,45 @@ type TailPartitionStats struct { LastTime time.Time } -// GetMessages gets a stream of messages from the tailer. These are passed -// back through the argument channel. -func (t *TopicTailer) GetMessages( - ctx context.Context, - messagesChan chan TailMessage, -) { - readers := []*kafka.Reader{} +func (t *TopicTailer) buildReaders() []*kafka.Reader { + baseReaderCfg := kafka.ReaderConfig{ + Brokers: []string{t.Connector.Config.BrokerAddr}, + Dialer: t.Connector.Dialer, + Topic: t.topic, + GroupID: t.groupID, + MinBytes: t.minBytes, + MaxBytes: t.maxBytes, + ReadBackoffMin: 200 * time.Millisecond, + ReadBackoffMax: 3 * time.Second, + MaxAttempts: 5, + } + if t.groupID != "" { + c := baseReaderCfg + c.GroupID = t.groupID + return []*kafka.Reader{ + kafka.NewReader(c), + } + } + readers := []*kafka.Reader{} for _, partition := range t.partitions { - reader := kafka.NewReader( - kafka.ReaderConfig{ - Brokers: []string{t.Connector.Config.BrokerAddr}, - Dialer: t.Connector.Dialer, - Topic: t.topic, - Partition: partition, - MinBytes: t.minBytes, - MaxBytes: t.maxBytes, - ReadBackoffMin: 200 * time.Millisecond, - ReadBackoffMax: 3 * time.Second, - MaxAttempts: 5, - }, - ) + c := baseReaderCfg + c.Partition = partition + reader := kafka.NewReader(c) reader.SetOffset(t.offset) readers = append(readers, reader) } + return readers +} + +// GetMessages gets a stream of messages from the tailer. These are passed +// back through the argument channel. +func (t *TopicTailer) GetMessages( + ctx context.Context, + messagesChan chan TailMessage, +) { + readers := t.buildReaders() for _, reader := range readers { go func(r *kafka.Reader) { @@ -180,17 +196,18 @@ func (t *TopicTailer) LogMessages( PartitionStats: map[int]*TailPartitionStats{}, } - for _, partition := range t.partitions { - stats.PartitionStats[partition] = &TailPartitionStats{} - } - for { select { case <-ctx.Done(): return stats, ctx.Err() case tailMessage := <-messagesChan: partition := tailMessage.Partition - partitionStats := stats.PartitionStats[partition] + + partitionStats, ok := stats.PartitionStats[partition] + if !ok { + partitionStats = &TailPartitionStats{} + stats.PartitionStats[partition] = partitionStats + } if tailMessage.Err != nil { log.Warnf("Got error: %+v", tailMessage.Err) diff --git a/pkg/messages/tail_test.go b/pkg/messages/tail_test.go index 90f68d4b..22c93843 100644 --- a/pkg/messages/tail_test.go +++ b/pkg/messages/tail_test.go @@ -77,6 +77,7 @@ func TestTailerGetMessages(t *testing.T) { tailer := NewTopicTailer( connector, topicName, + "", []int{0, 1, 2, 3}, kafka.FirstOffset, 1,