From 3b677d289c0822383eae02704de0338d463d25fa Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Wed, 30 Apr 2025 13:38:25 +0300 Subject: [PATCH 1/5] Kafka 4.0 protocol fields updated --- conn.go | 6 ++--- conn_test.go | 11 ++++---- consumergroup.go | 12 ++++----- consumergroup_test.go | 12 ++++----- createtopics.go | 62 ++++++++++++++++++++++++++++--------------- createtopics_test.go | 8 +++--- deletetopics.go | 34 ++++++++++++++---------- deletetopics_test.go | 4 +-- joingroup.go | 6 ++--- reader_test.go | 2 +- 10 files changed, 91 insertions(+), 66 deletions(-) diff --git a/conn.go b/conn.go index 2b51afbd5..eafde7563 100644 --- a/conn.go +++ b/conn.go @@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) { // DeleteTopics deletes the specified topics. func (c *Conn) DeleteTopics(topics ...string) error { - _, err := c.deleteTopics(deleteTopicsRequestV0{ + _, err := c.deleteTopics(deleteTopicsRequestV1{ Topics: topics, }) return err @@ -368,12 +368,12 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) { +func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV1, error) { var response joinGroupResponseV1 err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroup, v1, id, request) + return c.writeRequest(joinGroup, v2, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { diff --git a/conn_test.go b/conn_test.go index bdce327e0..bd94a752b 100644 --- a/conn_test.go +++ b/conn_test.go @@ -13,8 +13,9 @@ import ( "testing" "time" - ktesting "github.com/segmentio/kafka-go/testing" "golang.org/x/net/nettest" + + ktesting "github.com/segmentio/kafka-go/testing" ) type timeout struct{} @@ -682,7 +683,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, join := func() (joinGroup joinGroupResponseV1) { var err error for attempt := 0; attempt < 10; attempt++ { - joinGroup, err = conn.joinGroup(joinGroupRequestV1{ + joinGroup, err = conn.joinGroup(joinGroupRequestV2{ GroupID: groupID, SessionTimeout: int32(time.Minute / time.Millisecond), RebalanceTimeout: int32(time.Second / time.Millisecond), @@ -770,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { } func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) { - _, err := conn.joinGroup(joinGroupRequestV1{}) + _, err := conn.joinGroup(joinGroupRequestV2{}) if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) { t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err) } @@ -780,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequestV2{ GroupID: groupID, }) if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) { @@ -792,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequestV2{ GroupID: groupID, SessionTimeout: int32(3 * time.Second / time.Millisecond), }) diff --git a/consumergroup.go b/consumergroup.go index f4bb382cb..5fa5bd42b 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { type coordinator interface { io.Closer findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error) + joinGroup(joinGroupRequestV2) (joinGroupResponseV1, error) syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -588,7 +588,7 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find return t.conn.findCoordinator(req) } -func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) { +func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV2) (joinGroupResponseV1, error) { // in the case of join group, the consumer group coordinator may wait up // to rebalance timeout in order to wait for all members to join. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil { @@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // * InvalidSessionTimeout: // * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { - request, err := cg.makeJoinGroupRequestV1(memberID) + request, err := cg.makeJoinGroupRequestV2(memberID) if err != nil { return "", 0, nil, err } @@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup // request. -func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { - request := joinGroupRequestV1{ +func (cg *ConsumerGroup) makeJoinGroupRequestV2(memberID string) (joinGroupRequestV2, error) { + request := joinGroupRequestV2{ GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), @@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque for _, balancer := range cg.config.GroupBalancers { userData, err := balancer.UserData() if err != nil { - return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) + return joinGroupRequestV2{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) } request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: balancer.ProtocolName(), diff --git a/consumergroup_test.go b/consumergroup_test.go index 0d3e290a9..1c7a065f9 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -15,7 +15,7 @@ var _ coordinator = mockCoordinator{} type mockCoordinator struct { closeFunc func() error findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroupFunc func(joinGroupRequestV1) (joinGroupResponseV1, error) + joinGroupFunc func(joinGroupRequestV2) (joinGroupResponseV1, error) syncGroupFunc func(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroupFunc func(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeatFunc func(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -38,7 +38,7 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor return c.findCoordinatorFunc(req) } -func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) { +func (c mockCoordinator) joinGroup(req joinGroupRequestV2) (joinGroupResponseV1, error) { if c.joinGroupFunc == nil { return joinGroupResponseV1{}, errors.New("no joinGroup behavior specified") } @@ -419,7 +419,7 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { + mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { return joinGroupResponseV1{}, errors.New("join group failed") } // NOTE : no stub for leaving the group b/c the member never joined. @@ -449,7 +449,7 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { + mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { return joinGroupResponseV1{ ErrorCode: int16(InvalidTopic), }, nil @@ -472,7 +472,7 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to join group (leader, unsupported protocol)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { + mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { return joinGroupResponseV1{ GenerationID: 12345, GroupProtocol: "foo", @@ -498,7 +498,7 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to sync group (general error)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { + mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { return joinGroupResponseV1{ GenerationID: 12345, GroupProtocol: "range", diff --git a/createtopics.go b/createtopics.go index 8ad9ebf44..1bf61472f 100644 --- a/createtopics.go +++ b/createtopics.go @@ -262,7 +262,7 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) { } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsRequestV0 struct { +type createTopicsRequestV2 struct { // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. Topics []createTopicsRequestV0Topic @@ -270,77 +270,95 @@ type createTopicsRequestV0 struct { // Timeout ms to wait for a topic to be completely created on the // controller node. Values <= 0 will trigger topic creation and return immediately Timeout int32 + + // If true, check that the topics can be created as specified, but don't create anything. + // Internal use only for Kafka 4.0 support. + ValidateOnly bool } -func (t createTopicsRequestV0) size() int32 { +func (t createTopicsRequestV2) size() int32 { return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + - sizeofInt32(t.Timeout) + sizeofInt32(t.Timeout) + 1 } -func (t createTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t createTopicsRequestV2) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) }) wb.writeInt32(t.Timeout) + wb.writeBool(t.ValidateOnly) } -type createTopicsResponseV0TopicError struct { +type createTopicsResponseV1TopicError struct { // Topic name Topic string // ErrorCode holds response error code ErrorCode int16 + + // ErrorMessage holds responce error message string + ErrorMessage string } -func (t createTopicsResponseV0TopicError) size() int32 { +func (t createTopicsResponseV1TopicError) size() int32 { return sizeofString(t.Topic) + - sizeofInt16(t.ErrorCode) + sizeofInt16(t.ErrorCode) + + sizeofString(t.ErrorMessage) } -func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) { +func (t createTopicsResponseV1TopicError) writeTo(wb *writeBuffer) { wb.writeString(t.Topic) wb.writeInt16(t.ErrorCode) + wb.writeString(t.ErrorMessage) } -func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseV1TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } + if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { + return + } return } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsResponseV0 struct { - TopicErrors []createTopicsResponseV0TopicError +type createTopicsResponseV1 struct { + ThrottleTime int32 + TopicErrors []createTopicsResponseV1TopicError } -func (t createTopicsResponseV0) size() int32 { - return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) +func (t createTopicsResponseV1) size() int32 { + return sizeofInt32(t.ThrottleTime) + sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) } -func (t createTopicsResponseV0) writeTo(wb *writeBuffer) { +func (t createTopicsResponseV1) writeTo(wb *writeBuffer) { + wb.writeInt32(t.ThrottleTime) wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) }) } -func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var topic createTopicsResponseV0TopicError - if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil { + var topic createTopicsResponseV1TopicError + if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil { return } t.TopicErrors = append(t.TopicErrors, topic) return } - if remain, err = readArrayWith(r, size, fn); err != nil { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } + if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } -func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) { - var response createTopicsResponseV0 +func (c *Conn) createTopics(request createTopicsRequestV2) (createTopicsResponseV1, error) { + var response createTopicsResponseV1 err := c.writeOperation( func(deadline time.Time, id int32) error { @@ -349,7 +367,7 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopics, v0, id, request) + return c.writeRequest(createTopics, v2, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -383,7 +401,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error { t.toCreateTopicsRequestV0Topic()) } - _, err := c.createTopics(createTopicsRequestV0{ + _, err := c.createTopics(createTopicsRequestV2{ Topics: requestV0Topics, }) return err diff --git a/createtopics_test.go b/createtopics_test.go index 38819c382..78bba5bc1 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -160,9 +160,9 @@ func TestClientCreateTopics(t *testing.T) { } } -func TestCreateTopicsResponseV0(t *testing.T) { - item := createTopicsResponseV0{ - TopicErrors: []createTopicsResponseV0TopicError{ +func TestCreateTopicsResponseV1(t *testing.T) { + item := createTopicsResponseV1{ + TopicErrors: []createTopicsResponseV1TopicError{ { Topic: "topic", ErrorCode: 2, @@ -174,7 +174,7 @@ func TestCreateTopicsResponseV0(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found createTopicsResponseV0 + var found createTopicsResponseV1 remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Error(err) diff --git a/deletetopics.go b/deletetopics.go index d758d9fd6..e3c3be198 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D } // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -type deleteTopicsRequestV0 struct { +type deleteTopicsRequestV1 struct { // Topics holds the topic names Topics []string @@ -77,41 +77,47 @@ type deleteTopicsRequestV0 struct { Timeout int32 } -func (t deleteTopicsRequestV0) size() int32 { +func (t deleteTopicsRequestV1) size() int32 { return sizeofStringArray(t.Topics) + sizeofInt32(t.Timeout) } -func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsRequestV1) writeTo(wb *writeBuffer) { wb.writeStringArray(t.Topics) wb.writeInt32(t.Timeout) } -type deleteTopicsResponseV0 struct { +type deleteTopicsResponseV1 struct { + ThrottleTime int32 // TopicErrorCodes holds per topic error codes TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode } -func (t deleteTopicsResponseV0) size() int32 { - return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) +func (t deleteTopicsResponseV1) size() int32 { + return sizeofInt32(t.ThrottleTime) + + sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) } -func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item deleteTopicsResponseV0TopicErrorCode - if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { + if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { return } t.TopicErrorCodes = append(t.TopicErrorCodes, item) return } - if remain, err = readArrayWith(r, size, fn); err != nil { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } + if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } -func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsResponseV1) writeTo(wb *writeBuffer) { + wb.writeInt32(t.ThrottleTime) wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) }) } @@ -146,8 +152,8 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) { // deleteTopics deletes the specified topics. // // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) { - var response deleteTopicsResponseV0 +func (c *Conn) deleteTopics(request deleteTopicsRequestV1) (deleteTopicsResponseV1, error) { + var response deleteTopicsResponseV1 err := c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { @@ -155,7 +161,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(deleteTopics, v0, id, request) + return c.writeRequest(deleteTopics, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -164,7 +170,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse }, ) if err != nil { - return deleteTopicsResponseV0{}, err + return deleteTopicsResponseV1{}, err } for _, c := range response.TopicErrorCodes { if c.ErrorCode != 0 { diff --git a/deletetopics_test.go b/deletetopics_test.go index 3caffe840..98c6cd00d 100644 --- a/deletetopics_test.go +++ b/deletetopics_test.go @@ -29,7 +29,7 @@ func TestClientDeleteTopics(t *testing.T) { } func TestDeleteTopicsResponseV1(t *testing.T) { - item := deleteTopicsResponseV0{ + item := deleteTopicsResponseV1{ TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{ { Topic: "a", @@ -42,7 +42,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found deleteTopicsResponseV0 + var found deleteTopicsResponseV1 remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Fatal(err) diff --git a/joingroup.go b/joingroup.go index 30823a69a..d76a27491 100644 --- a/joingroup.go +++ b/joingroup.go @@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) { wb.writeBytes(t.ProtocolMetadata) } -type joinGroupRequestV1 struct { +type joinGroupRequestV2 struct { // GroupID holds the unique group identifier GroupID string @@ -264,7 +264,7 @@ type joinGroupRequestV1 struct { GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV1) size() int32 { +func (t joinGroupRequestV2) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { +func (t joinGroupRequestV2) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.SessionTimeout) wb.writeInt32(t.RebalanceTimeout) diff --git a/reader_test.go b/reader_test.go index 64d45190f..0dc593d8c 100644 --- a/reader_test.go +++ b/reader_test.go @@ -301,7 +301,7 @@ func createTopic(t *testing.T, topic string, partitions int) { conn.SetDeadline(time.Now().Add(10 * time.Second)) - _, err = conn.createTopics(createTopicsRequestV0{ + _, err = conn.createTopics(createTopicsRequestV2{ Topics: []createTopicsRequestV0Topic{ { Topic: topic, From e2e2dfa7f16b8a593d626373fae6ff848897af94 Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Wed, 30 Apr 2025 15:25:47 +0300 Subject: [PATCH 2/5] dynamic versions support --- conn.go | 19 +++++---- conn_test.go | 10 ++--- consumergroup.go | 16 +++---- consumergroup_test.go | 29 ++++++------- createtopics.go | 98 +++++++++++++++++++++++++++++-------------- createtopics_test.go | 6 +-- deletetopics.go | 52 +++++++++++++++-------- deletetopics_test.go | 4 +- joingroup.go | 35 ++++++++++++---- joingroup_test.go | 6 ++- reader_test.go | 2 +- 11 files changed, 177 insertions(+), 100 deletions(-) diff --git a/conn.go b/conn.go index eafde7563..777c3fb07 100644 --- a/conn.go +++ b/conn.go @@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) { // DeleteTopics deletes the specified topics. func (c *Conn) DeleteTopics(topics ...string) error { - _, err := c.deleteTopics(deleteTopicsRequestV1{ + _, err := c.deleteTopics(deleteTopicsRequestV0{ Topics: topics, }) return err @@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV1, error) { - var response joinGroupResponseV1 +func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponse, error) { + version, err := c.negotiateVersion(joinGroup, v1, v2) + if err != nil { + return joinGroupResponse{}, err + } - err := c.writeOperation( + response := joinGroupResponse{v: version} + + err = c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroup, v2, id, request) + return c.writeRequest(joinGroup, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV1, error }, ) if err != nil { - return joinGroupResponseV1{}, err + return joinGroupResponse{}, err } if response.ErrorCode != 0 { - return joinGroupResponseV1{}, Error(response.ErrorCode) + return joinGroupResponse{}, Error(response.ErrorCode) } return response, nil diff --git a/conn_test.go b/conn_test.go index bd94a752b..0ec09c114 100644 --- a/conn_test.go +++ b/conn_test.go @@ -680,10 +680,10 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) { func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) { waitForCoordinator(t, conn, groupID) - join := func() (joinGroup joinGroupResponseV1) { + join := func() (joinGroup joinGroupResponse) { var err error for attempt := 0; attempt < 10; attempt++ { - joinGroup, err = conn.joinGroup(joinGroupRequestV2{ + joinGroup, err = conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, SessionTimeout: int32(time.Minute / time.Millisecond), RebalanceTimeout: int32(time.Second / time.Millisecond), @@ -771,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { } func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) { - _, err := conn.joinGroup(joinGroupRequestV2{}) + _, err := conn.joinGroup(joinGroupRequestV1{}) if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) { t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err) } @@ -781,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV2{ + _, err := conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, }) if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) { @@ -793,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV2{ + _, err := conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, SessionTimeout: int32(3 * time.Second / time.Millisecond), }) diff --git a/consumergroup.go b/consumergroup.go index 5fa5bd42b..41cf03531 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { type coordinator interface { io.Closer findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroup(joinGroupRequestV2) (joinGroupResponseV1, error) + joinGroup(joinGroupRequestV1) (joinGroupResponse, error) syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find return t.conn.findCoordinator(req) } -func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV2) (joinGroupResponseV1, error) { +func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponse, error) { // in the case of join group, the consumer group coordinator may wait up // to rebalance timeout in order to wait for all members to join. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil { - return joinGroupResponseV1{}, err + return joinGroupResponse{}, err } return t.conn.joinGroup(req) } @@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // * InvalidSessionTimeout: // * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { - request, err := cg.makeJoinGroupRequestV2(memberID) + request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { return "", 0, nil, err } @@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup // request. -func (cg *ConsumerGroup) makeJoinGroupRequestV2(memberID string) (joinGroupRequestV2, error) { - request := joinGroupRequestV2{ +func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { + request := joinGroupRequestV1{ GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), @@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV2(memberID string) (joinGroupReque for _, balancer := range cg.config.GroupBalancers { userData, err := balancer.UserData() if err != nil { - return joinGroupRequestV2{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) + return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) } request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: balancer.ProtocolName(), @@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV2(memberID string) (joinGroupReque // assignTopicPartitions uses the selected GroupBalancer to assign members to // their various partitions. -func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) { +func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) { cg.withLogger(func(l Logger) { l.Printf("selected as leader for group, %s\n", cg.config.ID) }) diff --git a/consumergroup_test.go b/consumergroup_test.go index 1c7a065f9..bbfecb42f 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -15,7 +15,7 @@ var _ coordinator = mockCoordinator{} type mockCoordinator struct { closeFunc func() error findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroupFunc func(joinGroupRequestV2) (joinGroupResponseV1, error) + joinGroupFunc func(joinGroupRequestV1) (joinGroupResponse, error) syncGroupFunc func(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroupFunc func(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeatFunc func(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -38,9 +38,9 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor return c.findCoordinatorFunc(req) } -func (c mockCoordinator) joinGroup(req joinGroupRequestV2) (joinGroupResponseV1, error) { +func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponse, error) { if c.joinGroupFunc == nil { - return joinGroupResponseV1{}, errors.New("no joinGroup behavior specified") + return joinGroupResponse{}, errors.New("no joinGroup behavior specified") } return c.joinGroupFunc(req) } @@ -140,8 +140,9 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } - newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponseV1 { - resp := joinGroupResponseV1{ + newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponse { + resp := joinGroupResponse{ + v: v1, GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(), } @@ -158,7 +159,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { } testCases := map[string]struct { - Members joinGroupResponseV1 + Members joinGroupResponse Assignments GroupMemberAssignments }{ "nil": { @@ -419,8 +420,8 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { - return joinGroupResponseV1{}, errors.New("join group failed") + mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + return joinGroupResponse{}, errors.New("join group failed") } // NOTE : no stub for leaving the group b/c the member never joined. }, @@ -449,8 +450,8 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + return joinGroupResponse{ ErrorCode: int16(InvalidTopic), }, nil } @@ -472,8 +473,8 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to join group (leader, unsupported protocol)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "foo", LeaderID: "abc", @@ -498,8 +499,8 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to sync group (general error)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "range", LeaderID: "abc", diff --git a/createtopics.go b/createtopics.go index 1bf61472f..708a314d7 100644 --- a/createtopics.go +++ b/createtopics.go @@ -262,7 +262,9 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) { } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsRequestV2 struct { +type createTopicsRequest struct { + v apiVersion + // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. Topics []createTopicsRequestV0Topic @@ -276,18 +278,26 @@ type createTopicsRequestV2 struct { ValidateOnly bool } -func (t createTopicsRequestV2) size() int32 { - return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + - sizeofInt32(t.Timeout) + 1 +func (t createTopicsRequest) size() int32 { + sz := sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + + sizeofInt32(t.Timeout) + if t.v >= v1 { + sz += 1 + } + return sz } -func (t createTopicsRequestV2) writeTo(wb *writeBuffer) { +func (t createTopicsRequest) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) }) wb.writeInt32(t.Timeout) - wb.writeBool(t.ValidateOnly) + if t.v >= v1 { + wb.writeBool(t.ValidateOnly) + } } -type createTopicsResponseV1TopicError struct { +type createTopicsResponseTopicError struct { + v apiVersion + // Topic name Topic string @@ -298,57 +308,75 @@ type createTopicsResponseV1TopicError struct { ErrorMessage string } -func (t createTopicsResponseV1TopicError) size() int32 { - return sizeofString(t.Topic) + - sizeofInt16(t.ErrorCode) + - sizeofString(t.ErrorMessage) +func (t createTopicsResponseTopicError) size() int32 { + sz := sizeofString(t.Topic) + + sizeofInt16(t.ErrorCode) + if t.v >= v1 { + sz += sizeofString(t.ErrorMessage) + } + return sz } -func (t createTopicsResponseV1TopicError) writeTo(wb *writeBuffer) { +func (t createTopicsResponseTopicError) writeTo(wb *writeBuffer) { wb.writeString(t.Topic) wb.writeInt16(t.ErrorCode) - wb.writeString(t.ErrorMessage) + if t.v >= v1 { + wb.writeString(t.ErrorMessage) + } } -func (t *createTopicsResponseV1TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } - if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { - return + if t.v >= v1 { + if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { + return + } } return } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsResponseV1 struct { - ThrottleTime int32 - TopicErrors []createTopicsResponseV1TopicError +type createTopicsResponse struct { + v apiVersion + + ThrottleTime int32 // v1+ + TopicErrors []createTopicsResponseTopicError } -func (t createTopicsResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTime) + sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) +func (t createTopicsResponse) size() int32 { + sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) + if t.v >= v1 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t createTopicsResponseV1) writeTo(wb *writeBuffer) { - wb.writeInt32(t.ThrottleTime) +func (t createTopicsResponse) writeTo(wb *writeBuffer) { + if t.v >= v1 { + wb.writeInt32(t.ThrottleTime) + } wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) }) } -func (t *createTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var topic createTopicsResponseV1TopicError + topic := createTopicsResponseTopicError{v: t.v} if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil { return } t.TopicErrors = append(t.TopicErrors, topic) return } - if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { - return + remain = size + if t.v >= v1 { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } } if remain, err = readArrayWith(r, remain, fn); err != nil { return @@ -357,17 +385,23 @@ func (t *createTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int return } -func (c *Conn) createTopics(request createTopicsRequestV2) (createTopicsResponseV1, error) { - var response createTopicsResponseV1 +func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) { + version, err := c.negotiateVersion(createTopics, v0, v2) + if err != nil { + return createTopicsResponse{}, err + } + + request.v = version + response := createTopicsResponse{v: version} - err := c.writeOperation( + err = c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopics, v2, id, request) + return c.writeRequest(createTopics, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -401,7 +435,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error { t.toCreateTopicsRequestV0Topic()) } - _, err := c.createTopics(createTopicsRequestV2{ + _, err := c.createTopics(createTopicsRequest{ Topics: requestV0Topics, }) return err diff --git a/createtopics_test.go b/createtopics_test.go index 78bba5bc1..b3d080247 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -161,8 +161,8 @@ func TestClientCreateTopics(t *testing.T) { } func TestCreateTopicsResponseV1(t *testing.T) { - item := createTopicsResponseV1{ - TopicErrors: []createTopicsResponseV1TopicError{ + item := createTopicsResponse{ + TopicErrors: []createTopicsResponseTopicError{ { Topic: "topic", ErrorCode: 2, @@ -174,7 +174,7 @@ func TestCreateTopicsResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found createTopicsResponseV1 + var found createTopicsResponse remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Error(err) diff --git a/deletetopics.go b/deletetopics.go index e3c3be198..a3674b4d7 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D } // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -type deleteTopicsRequestV1 struct { +type deleteTopicsRequestV0 struct { // Topics holds the topic names Topics []string @@ -77,28 +77,33 @@ type deleteTopicsRequestV1 struct { Timeout int32 } -func (t deleteTopicsRequestV1) size() int32 { +func (t deleteTopicsRequestV0) size() int32 { return sizeofStringArray(t.Topics) + sizeofInt32(t.Timeout) } -func (t deleteTopicsRequestV1) writeTo(wb *writeBuffer) { +func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) { wb.writeStringArray(t.Topics) wb.writeInt32(t.Timeout) } -type deleteTopicsResponseV1 struct { +type deleteTopicsResponse struct { + v apiVersion + ThrottleTime int32 // TopicErrorCodes holds per topic error codes TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode } -func (t deleteTopicsResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTime) + - sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) +func (t deleteTopicsResponse) size() int32 { + sz := sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) + if t.v >= v1 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *deleteTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item deleteTopicsResponseV0TopicErrorCode if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { @@ -107,8 +112,11 @@ func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int t.TopicErrorCodes = append(t.TopicErrorCodes, item) return } - if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { - return + remain = size + if t.v >= v1 { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } } if remain, err = readArrayWith(r, remain, fn); err != nil { return @@ -116,8 +124,10 @@ func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int return } -func (t deleteTopicsResponseV1) writeTo(wb *writeBuffer) { - wb.writeInt32(t.ThrottleTime) +func (t deleteTopicsResponse) writeTo(wb *writeBuffer) { + if t.v >= v1 { + wb.writeInt32(t.ThrottleTime) + } wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) }) } @@ -152,16 +162,24 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) { // deleteTopics deletes the specified topics. // // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -func (c *Conn) deleteTopics(request deleteTopicsRequestV1) (deleteTopicsResponseV1, error) { - var response deleteTopicsResponseV1 - err := c.writeOperation( +func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse, error) { + version, err := c.negotiateVersion(deleteTopics, v0, v1) + if err != nil { + return deleteTopicsResponse{}, err + } + + response := deleteTopicsResponse{ + v: version, + } + + err = c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(deleteTopics, v1, id, request) + return c.writeRequest(deleteTopics, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -170,7 +188,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV1) (deleteTopicsResponse }, ) if err != nil { - return deleteTopicsResponseV1{}, err + return deleteTopicsResponse{}, err } for _, c := range response.TopicErrorCodes { if c.ErrorCode != 0 { diff --git a/deletetopics_test.go b/deletetopics_test.go index 98c6cd00d..4dc681831 100644 --- a/deletetopics_test.go +++ b/deletetopics_test.go @@ -29,7 +29,7 @@ func TestClientDeleteTopics(t *testing.T) { } func TestDeleteTopicsResponseV1(t *testing.T) { - item := deleteTopicsResponseV1{ + item := deleteTopicsResponse{ TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{ { Topic: "a", @@ -42,7 +42,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found deleteTopicsResponseV1 + var found deleteTopicsResponse remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Fatal(err) diff --git a/joingroup.go b/joingroup.go index d76a27491..c5a09eb52 100644 --- a/joingroup.go +++ b/joingroup.go @@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) { wb.writeBytes(t.ProtocolMetadata) } -type joinGroupRequestV2 struct { +type joinGroupRequestV1 struct { // GroupID holds the unique group identifier GroupID string @@ -264,7 +264,7 @@ type joinGroupRequestV2 struct { GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV2) size() int32 { +func (t joinGroupRequestV1) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -273,7 +273,7 @@ func (t joinGroupRequestV2) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV2) writeTo(wb *writeBuffer) { +func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.SessionTimeout) wb.writeInt32(t.RebalanceTimeout) @@ -308,7 +308,11 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain return } -type joinGroupResponseV1 struct { +type joinGroupResponse struct { + v apiVersion + + ThrottleTime int32 + // ErrorCode holds response error code ErrorCode int16 @@ -326,16 +330,23 @@ type joinGroupResponseV1 struct { Members []joinGroupResponseMemberV1 } -func (t joinGroupResponseV1) size() int32 { - return sizeofInt16(t.ErrorCode) + +func (t joinGroupResponse) size() int32 { + sz := sizeofInt16(t.ErrorCode) + sizeofInt32(t.GenerationID) + sizeofString(t.GroupProtocol) + sizeofString(t.LeaderID) + sizeofString(t.MemberID) + sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() }) + if t.v >= v2 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t joinGroupResponseV1) writeTo(wb *writeBuffer) { +func (t joinGroupResponse) writeTo(wb *writeBuffer) { + if t.v >= v2 { + wb.writeInt32(t.ThrottleTime) + } wb.writeInt16(t.ErrorCode) wb.writeInt32(t.GenerationID) wb.writeString(t.GroupProtocol) @@ -344,8 +355,14 @@ func (t joinGroupResponseV1) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) }) } -func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt16(r, size, &t.ErrorCode); err != nil { +func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { + remain = size + if t.v >= v2 { + if remain, err = readInt32(r, remain, &t.ThrottleTime); err != nil { + return + } + } + if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } if remain, err = readInt32(r, remain, &t.GenerationID); err != nil { diff --git a/joingroup_test.go b/joingroup_test.go index 926f5b4a6..f98f97f7a 100644 --- a/joingroup_test.go +++ b/joingroup_test.go @@ -218,7 +218,9 @@ func TestMemberMetadata(t *testing.T) { } func TestJoinGroupResponseV1(t *testing.T) { - item := joinGroupResponseV1{ + const version = v1 + item := joinGroupResponse{ + v: version, ErrorCode: 2, GenerationID: 3, GroupProtocol: "a", @@ -236,7 +238,7 @@ func TestJoinGroupResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found joinGroupResponseV1 + found := joinGroupResponse{v: version} remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Error(err) diff --git a/reader_test.go b/reader_test.go index 0dc593d8c..83d240cb0 100644 --- a/reader_test.go +++ b/reader_test.go @@ -301,7 +301,7 @@ func createTopic(t *testing.T, topic string, partitions int) { conn.SetDeadline(time.Now().Add(10 * time.Second)) - _, err = conn.createTopics(createTopicsRequestV2{ + _, err = conn.createTopics(createTopicsRequest{ Topics: []createTopicsRequestV0Topic{ { Topic: topic, From 5a30e00e7cb5c9a545ea93db3474b01678b6162b Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Thu, 1 May 2025 01:07:51 +0300 Subject: [PATCH 3/5] review fixes --- .circleci/config.yml | 47 ++++++++++++++++ alterpartitionreassignments_test.go | 3 + conn.go | 4 +- conn_test.go | 8 +-- consumergroup.go | 14 ++--- consumergroup_test.go | 86 +++++++++++++++-------------- createtopics.go | 12 ++-- createtopics_test.go | 53 ++++++++++-------- deletetopics.go | 10 ++-- joingroup.go | 20 +++---- joingroup_test.go | 64 ++++++++++----------- listgroups.go | 2 +- offsetfetch.go | 2 +- 13 files changed, 194 insertions(+), 131 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 96bcf0280..eb2ccbf8f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -278,6 +278,52 @@ jobs: entrypoint: *entrypoint steps: *steps + kafka-400: + working_directory: *working_directory + environment: + KAFKA_VERSION: "4.0.0" + + # Need to skip nettest to avoid these kinds of errors: + # --- FAIL: TestConn/nettest (17.56s) + # --- FAIL: TestConn/nettest/PingPong (7.40s) + # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request + # conntest.go:118: mismatching value: got 77, want 78 + # conntest.go:118: mismatching value: got 78, want 79 + # ... + # + # TODO: Figure out why these are happening and fix them (they don't appear to be new). + KAFKA_SKIP_NETTEST: "1" + docker: + - image: circleci/golang + - image: bitnami/kafka:4.0.0 + ports: + - 9092:9092 + - 9093:9093 + environment: + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT + KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN + KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' + entrypoint: + - "/bin/bash" + - "-c" + - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; exec /entrypoint.sh /run.sh + steps: *steps + workflows: version: 2 run: @@ -295,3 +341,4 @@ workflows: - kafka-260 - kafka-270 - kafka-281 + - kafka-400 diff --git a/alterpartitionreassignments_test.go b/alterpartitionreassignments_test.go index 7bbce8fff..48974c7c5 100644 --- a/alterpartitionreassignments_test.go +++ b/alterpartitionreassignments_test.go @@ -3,6 +3,7 @@ package kafka import ( "context" "testing" + "time" ktesting "github.com/segmentio/kafka-go/testing" ) @@ -35,6 +36,7 @@ func TestClientAlterPartitionReassignments(t *testing.T) { BrokerIDs: []int{1}, }, }, + Timeout: 5 * time.Second, }, ) @@ -96,6 +98,7 @@ func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) { BrokerIDs: []int{1}, }, }, + Timeout: 5 * time.Second, }, ) diff --git a/conn.go b/conn.go index 777c3fb07..9f9f25903 100644 --- a/conn.go +++ b/conn.go @@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) { // DeleteTopics deletes the specified topics. func (c *Conn) DeleteTopics(topics ...string) error { - _, err := c.deleteTopics(deleteTopicsRequestV0{ + _, err := c.deleteTopics(deleteTopicsRequest{ Topics: topics, }) return err @@ -368,7 +368,7 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponse, error) { +func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) { version, err := c.negotiateVersion(joinGroup, v1, v2) if err != nil { return joinGroupResponse{}, err diff --git a/conn_test.go b/conn_test.go index 0ec09c114..ef5ce3071 100644 --- a/conn_test.go +++ b/conn_test.go @@ -683,7 +683,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, join := func() (joinGroup joinGroupResponse) { var err error for attempt := 0; attempt < 10; attempt++ { - joinGroup, err = conn.joinGroup(joinGroupRequestV1{ + joinGroup, err = conn.joinGroup(joinGroupRequest{ GroupID: groupID, SessionTimeout: int32(time.Minute / time.Millisecond), RebalanceTimeout: int32(time.Second / time.Millisecond), @@ -771,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { } func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) { - _, err := conn.joinGroup(joinGroupRequestV1{}) + _, err := conn.joinGroup(joinGroupRequest{}) if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) { t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err) } @@ -781,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequest{ GroupID: groupID, }) if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) { @@ -793,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequest{ GroupID: groupID, SessionTimeout: int32(3 * time.Second / time.Millisecond), }) diff --git a/consumergroup.go b/consumergroup.go index 41cf03531..b32f90162 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { type coordinator interface { io.Closer findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroup(joinGroupRequestV1) (joinGroupResponse, error) + joinGroup(joinGroupRequest) (joinGroupResponse, error) syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -588,7 +588,7 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find return t.conn.findCoordinator(req) } -func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponse, error) { +func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) { // in the case of join group, the consumer group coordinator may wait up // to rebalance timeout in order to wait for all members to join. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil { @@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // * InvalidSessionTimeout: // * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { - request, err := cg.makeJoinGroupRequestV1(memberID) + request, err := cg.makeJoinGroupRequest(memberID) if err != nil { return "", 0, nil, err } @@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup // request. -func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { - request := joinGroupRequestV1{ +func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) { + request := joinGroupRequest{ GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), @@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque for _, balancer := range cg.config.GroupBalancers { userData, err := balancer.UserData() if err != nil { - return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) + return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) } request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: balancer.ProtocolName(), @@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup } // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember. -func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) { +func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) { members := make([]GroupMember, 0, len(in)) for _, item := range in { metadata := groupMetadata{} diff --git a/consumergroup_test.go b/consumergroup_test.go index bbfecb42f..dbbe4ec47 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "reflect" + "strconv" "strings" "sync" "testing" @@ -15,7 +16,7 @@ var _ coordinator = mockCoordinator{} type mockCoordinator struct { closeFunc func() error findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroupFunc func(joinGroupRequestV1) (joinGroupResponse, error) + joinGroupFunc func(joinGroupRequest) (joinGroupResponse, error) syncGroupFunc func(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroupFunc func(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeatFunc func(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -38,7 +39,7 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor return c.findCoordinatorFunc(req) } -func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponse, error) { +func (c mockCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) { if c.joinGroupFunc == nil { return joinGroupResponse{}, errors.New("no joinGroup behavior specified") } @@ -140,34 +141,36 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } - newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponse { - resp := joinGroupResponse{ - v: v1, - GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(), - } + newJoinGroupResponse := func(topicsByMemberID map[string][]string) func(v apiVersion) joinGroupResponse { + return func(v apiVersion) joinGroupResponse { + resp := joinGroupResponse{ + v: v, + GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(), + } - for memberID, topics := range topicsByMemberID { - resp.Members = append(resp.Members, joinGroupResponseMemberV1{ - MemberID: memberID, - MemberMetadata: groupMetadata{ - Topics: topics, - }.bytes(), - }) - } + for memberID, topics := range topicsByMemberID { + resp.Members = append(resp.Members, joinGroupResponseMember{ + MemberID: memberID, + MemberMetadata: groupMetadata{ + Topics: topics, + }.bytes(), + }) + } - return resp + return resp + } } testCases := map[string]struct { - Members joinGroupResponse + MembersFunc func(v apiVersion) joinGroupResponse Assignments GroupMemberAssignments }{ "nil": { - Members: newJoinGroupResponseV1(nil), + MembersFunc: newJoinGroupResponse(nil), Assignments: GroupMemberAssignments{}, }, "one member, one topic": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, }), Assignments: GroupMemberAssignments{ @@ -177,7 +180,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "one member, two topics": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1", "topic-2"}, }), Assignments: GroupMemberAssignments{ @@ -188,7 +191,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "two members, one topic": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, "member-2": {"topic-1"}, }), @@ -202,7 +205,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "two members, two unshared topics": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, "member-2": {"topic-2"}, }), @@ -217,21 +220,24 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } + supportedVersions := []apiVersion{v1, v2} // joinGroup versions for label, tc := range testCases { - t.Run(label, func(t *testing.T) { - cg := ConsumerGroup{} - cg.config.GroupBalancers = []GroupBalancer{ - RangeGroupBalancer{}, - RoundRobinGroupBalancer{}, - } - assignments, err := cg.assignTopicPartitions(conn, tc.Members) - if err != nil { - t.Fatalf("bad err: %v", err) - } - if !reflect.DeepEqual(tc.Assignments, assignments) { - t.Errorf("expected %v; got %v", tc.Assignments, assignments) - } - }) + for _, v := range supportedVersions { + t.Run(label+"_v"+strconv.Itoa(int(v)), func(t *testing.T) { + cg := ConsumerGroup{} + cg.config.GroupBalancers = []GroupBalancer{ + RangeGroupBalancer{}, + RoundRobinGroupBalancer{}, + } + assignments, err := cg.assignTopicPartitions(conn, tc.MembersFunc(v)) + if err != nil { + t.Fatalf("bad err: %v", err) + } + if !reflect.DeepEqual(tc.Assignments, assignments) { + t.Errorf("expected %v; got %v", tc.Assignments, assignments) + } + }) + } } } @@ -420,7 +426,7 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{}, errors.New("join group failed") } // NOTE : no stub for leaving the group b/c the member never joined. @@ -450,7 +456,7 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{ ErrorCode: int16(InvalidTopic), }, nil @@ -473,7 +479,7 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to join group (leader, unsupported protocol)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "foo", @@ -499,7 +505,7 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to sync group (general error)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "range", diff --git a/createtopics.go b/createtopics.go index 708a314d7..3c02415bc 100644 --- a/createtopics.go +++ b/createtopics.go @@ -263,7 +263,7 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) { // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics type createTopicsRequest struct { - v apiVersion + v apiVersion // v0, v1, v2 // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. @@ -344,20 +344,20 @@ func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (re type createTopicsResponse struct { v apiVersion - ThrottleTime int32 // v1+ + ThrottleTime int32 // v2+ TopicErrors []createTopicsResponseTopicError } func (t createTopicsResponse) size() int32 { sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) - if t.v >= v1 { + if t.v >= v2 { sz += sizeofInt32(t.ThrottleTime) } return sz } func (t createTopicsResponse) writeTo(wb *writeBuffer) { - if t.v >= v1 { + if t.v >= v2 { wb.writeInt32(t.ThrottleTime) } wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) }) @@ -373,7 +373,7 @@ func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, return } remain = size - if t.v >= v1 { + if t.v >= v2 { if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { return } @@ -386,7 +386,7 @@ func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, } func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) { - version, err := c.negotiateVersion(createTopics, v0, v2) + version, err := c.negotiateVersion(createTopics, v0, v1, v2) if err != nil { return createTopicsResponse{}, err } diff --git a/createtopics_test.go b/createtopics_test.go index b3d080247..119d17094 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -160,32 +160,37 @@ func TestClientCreateTopics(t *testing.T) { } } -func TestCreateTopicsResponseV1(t *testing.T) { - item := createTopicsResponse{ - TopicErrors: []createTopicsResponseTopicError{ - { - Topic: "topic", - ErrorCode: 2, +func TestCreateTopicsResponse(t *testing.T) { + supportedVersions := []apiVersion{v0, v1, v2} + for _, v := range supportedVersions { + item := createTopicsResponse{ + v: v, + TopicErrors: []createTopicsResponseTopicError{ + { + v: v, + Topic: "topic", + ErrorCode: 2, + }, }, - }, - } + } - b := bytes.NewBuffer(nil) - w := &writeBuffer{w: b} - item.writeTo(w) + b := bytes.NewBuffer(nil) + w := &writeBuffer{w: b} + item.writeTo(w) - var found createTopicsResponse - remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) - if err != nil { - t.Error(err) - t.FailNow() - } - if remain != 0 { - t.Errorf("expected 0 remain, got %v", remain) - t.FailNow() - } - if !reflect.DeepEqual(item, found) { - t.Error("expected item and found to be the same") - t.FailNow() + found := createTopicsResponse{v: v} + remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } } } diff --git a/deletetopics.go b/deletetopics.go index a3674b4d7..ff73d553b 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D } // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -type deleteTopicsRequestV0 struct { +type deleteTopicsRequest struct { // Topics holds the topic names Topics []string @@ -77,18 +77,18 @@ type deleteTopicsRequestV0 struct { Timeout int32 } -func (t deleteTopicsRequestV0) size() int32 { +func (t deleteTopicsRequest) size() int32 { return sizeofStringArray(t.Topics) + sizeofInt32(t.Timeout) } -func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsRequest) writeTo(wb *writeBuffer) { wb.writeStringArray(t.Topics) wb.writeInt32(t.Timeout) } type deleteTopicsResponse struct { - v apiVersion + v apiVersion // v0, v1 ThrottleTime int32 // TopicErrorCodes holds per topic error codes @@ -162,7 +162,7 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) { // deleteTopics deletes the specified topics. // // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse, error) { +func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) { version, err := c.negotiateVersion(deleteTopics, v0, v1) if err != nil { return deleteTopicsResponse{}, err diff --git a/joingroup.go b/joingroup.go index c5a09eb52..f3d90a937 100644 --- a/joingroup.go +++ b/joingroup.go @@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) { wb.writeBytes(t.ProtocolMetadata) } -type joinGroupRequestV1 struct { +type joinGroupRequest struct { // GroupID holds the unique group identifier GroupID string @@ -264,7 +264,7 @@ type joinGroupRequestV1 struct { GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV1) size() int32 { +func (t joinGroupRequest) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { +func (t joinGroupRequest) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.SessionTimeout) wb.writeInt32(t.RebalanceTimeout) @@ -282,23 +282,23 @@ func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) }) } -type joinGroupResponseMemberV1 struct { +type joinGroupResponseMember struct { // MemberID assigned by the group coordinator MemberID string MemberMetadata []byte } -func (t joinGroupResponseMemberV1) size() int32 { +func (t joinGroupResponseMember) size() int32 { return sizeofString(t.MemberID) + sizeofBytes(t.MemberMetadata) } -func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) { +func (t joinGroupResponseMember) writeTo(wb *writeBuffer) { wb.writeString(t.MemberID) wb.writeBytes(t.MemberMetadata) } -func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *joinGroupResponseMember) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.MemberID); err != nil { return } @@ -309,7 +309,7 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain } type joinGroupResponse struct { - v apiVersion + v apiVersion // v1, v2 ThrottleTime int32 @@ -327,7 +327,7 @@ type joinGroupResponse struct { // MemberID assigned by the group coordinator MemberID string - Members []joinGroupResponseMemberV1 + Members []joinGroupResponseMember } func (t joinGroupResponse) size() int32 { @@ -379,7 +379,7 @@ func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err } fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var item joinGroupResponseMemberV1 + var item joinGroupResponseMember if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } diff --git a/joingroup_test.go b/joingroup_test.go index f98f97f7a..73922d6a0 100644 --- a/joingroup_test.go +++ b/joingroup_test.go @@ -217,39 +217,41 @@ func TestMemberMetadata(t *testing.T) { } } -func TestJoinGroupResponseV1(t *testing.T) { - const version = v1 - item := joinGroupResponse{ - v: version, - ErrorCode: 2, - GenerationID: 3, - GroupProtocol: "a", - LeaderID: "b", - MemberID: "c", - Members: []joinGroupResponseMemberV1{ - { - MemberID: "d", - MemberMetadata: []byte("blah"), +func TestJoinGroupResponse(t *testing.T) { + supportedVersions := []apiVersion{v1, v2} + for _, v := range supportedVersions { + item := joinGroupResponse{ + v: v, + ErrorCode: 2, + GenerationID: 3, + GroupProtocol: "a", + LeaderID: "b", + MemberID: "c", + Members: []joinGroupResponseMember{ + { + MemberID: "d", + MemberMetadata: []byte("blah"), + }, }, - }, - } + } - b := bytes.NewBuffer(nil) - w := &writeBuffer{w: b} - item.writeTo(w) + b := bytes.NewBuffer(nil) + w := &writeBuffer{w: b} + item.writeTo(w) - found := joinGroupResponse{v: version} - remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) - if err != nil { - t.Error(err) - t.FailNow() - } - if remain != 0 { - t.Errorf("expected 0 remain, got %v", remain) - t.FailNow() - } - if !reflect.DeepEqual(item, found) { - t.Error("expected item and found to be the same") - t.FailNow() + found := joinGroupResponse{v: v} + remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } } } diff --git a/listgroups.go b/listgroups.go index 229de9352..5034b5440 100644 --- a/listgroups.go +++ b/listgroups.go @@ -125,7 +125,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item listGroupsResponseGroupV1 - if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { + if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { return } t.Groups = append(t.Groups, item) diff --git a/offsetfetch.go b/offsetfetch.go index b85bc5c83..ce80213f8 100644 --- a/offsetfetch.go +++ b/offsetfetch.go @@ -229,7 +229,7 @@ func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (rem fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { item := offsetFetchResponseV1PartitionResponse{} - if fnRemain, fnErr = (&item).readFrom(r, size); err != nil { + if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } t.PartitionResponses = append(t.PartitionResponses, item) From 992667750002d81287ad990f7c12bd149573e033 Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Thu, 1 May 2025 20:47:00 +0300 Subject: [PATCH 4/5] fix scram sasl deployment for kafka 4.0 --- .circleci/config.yml | 11 +++++++---- sasl/sasl_test.go | 13 +++++++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index eb2ccbf8f..651b999b1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -318,10 +318,13 @@ jobs: KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' - entrypoint: - - "/bin/bash" - - "-c" - - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; exec /entrypoint.sh /run.sh + KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain + KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain + KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret + KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + KAFKA_INTER_BROKER_USER: adminscram512 + KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 + KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 steps: *steps workflows: diff --git a/sasl/sasl_test.go b/sasl/sasl_test.go index a4101391a..57ff8b7cf 100644 --- a/sasl/sasl_test.go +++ b/sasl/sasl_test.go @@ -18,6 +18,11 @@ const ( ) func TestSASL(t *testing.T) { + scramUsers := map[scram.Algorithm]string{scram.SHA256: "adminscram", scram.SHA512: "adminscram"} + // kafka 4.0.0 test environment supports only different users for different scram algorithms. + if ktesting.KafkaIsAtLeast("4.0.0") { + scramUsers = map[scram.Algorithm]string{scram.SHA256: "adminscram256", scram.SHA512: "adminscram512"} + } tests := []struct { valid func() sasl.Mechanism invalid func() sasl.Mechanism @@ -39,22 +44,22 @@ func TestSASL(t *testing.T) { }, { valid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256") + mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "admin-secret-256") return mech }, invalid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword") + mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "badpassword") return mech }, minKafka: "0.10.2.0", }, { valid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512") + mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "admin-secret-512") return mech }, invalid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword") + mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "badpassword") return mech }, minKafka: "0.10.2.0", From 51771ddac445722b67ab2ea26a4b251e357523d1 Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Wed, 7 May 2025 13:26:10 +0300 Subject: [PATCH 5/5] fix some kafka 4.0 ci errors --- .circleci/config.yml | 45 ++++++++++++++++++++++++++++++++------- alterclientquotas_test.go | 6 +++++- consumergroup_test.go | 6 +++--- describegroups_test.go | 2 +- electleaders_test.go | 2 ++ listgroups_test.go | 2 +- writer_test.go | 6 ++++-- 7 files changed, 53 insertions(+), 16 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 93969624c..c16afcf63 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -178,10 +178,10 @@ jobs: entrypoint: *entrypoint steps: *steps - kafka-400: + kafka-370-kraft: working_directory: *working_directory environment: - KAFKA_VERSION: "4.0.0" + KAFKA_VERSION: "3.7.0" # Need to skip nettest to avoid these kinds of errors: # --- FAIL: TestConn/nettest (17.56s) @@ -195,13 +195,17 @@ jobs: KAFKA_SKIP_NETTEST: "1" docker: - image: circleci/golang - - image: bitnami/kafka:4.0.0 + - image: bitnami/kafka:3.7.0 ports: - 9092:9092 - 9093:9093 - environment: + environment: &kraft-env + KAFKA_KRAFT_MODE: "true" KAFKA_CFG_NODE_ID: 1 KAFKA_CFG_BROKER_ID: 1 + KAFKA_CLUSTER_ID: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CFG_PROCESS_ROLES: broker,controller KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER @@ -212,12 +216,12 @@ jobs: KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 ALLOW_PLAINTEXT_LISTENER: yes - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' - KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true + KAFKA_CFG_DELETE_TOPIC_ENABLE: true KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret @@ -225,8 +229,32 @@ jobs: KAFKA_INTER_BROKER_USER: adminscram512 KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 + entrypoint: *entrypoint steps: *steps + kafka-400: + working_directory: *working_directory + environment: + KAFKA_VERSION: "4.0.0" + + # Need to skip nettest to avoid these kinds of errors: + # --- FAIL: TestConn/nettest (17.56s) + # --- FAIL: TestConn/nettest/PingPong (7.40s) + # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request + # conntest.go:118: mismatching value: got 77, want 78 + # conntest.go:118: mismatching value: got 78, want 79 + # ... + # + # TODO: Figure out why these are happening and fix them (they don't appear to be new). + KAFKA_SKIP_NETTEST: "1" + docker: + - image: circleci/golang + - image: bitnami/kafka:4.0.0 + ports: + - 9092:9092 + - 9093:9093 + environment: *kraft-env + steps: *steps workflows: version: 2 run: @@ -236,4 +264,5 @@ workflows: - kafka-270 - kafka-281 - kafka-370 + - kafka-370-kraft - kafka-400 \ No newline at end of file diff --git a/alterclientquotas_test.go b/alterclientquotas_test.go index d61c745e3..3bb1023e2 100644 --- a/alterclientquotas_test.go +++ b/alterclientquotas_test.go @@ -3,9 +3,11 @@ package kafka import ( "context" "testing" + "time" - ktesting "github.com/segmentio/kafka-go/testing" "github.com/stretchr/testify/assert" + + ktesting "github.com/segmentio/kafka-go/testing" ) func TestClientAlterClientQuotas(t *testing.T) { @@ -65,6 +67,8 @@ func TestClientAlterClientQuotas(t *testing.T) { assert.Equal(t, expectedAlterResp, *alterResp) + time.Sleep(1 * time.Second) // wait for the quota to be applie (Kafka 4.0.0+) + describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{ Components: []DescribeClientQuotasRequestComponent{ { diff --git a/consumergroup_test.go b/consumergroup_test.go index dbbe4ec47..da41fc3df 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -250,12 +250,12 @@ func TestConsumerGroup(t *testing.T) { scenario: "Next returns generations", function: func(t *testing.T, ctx context.Context, cg *ConsumerGroup) { gen1, err := cg.Next(ctx) - if gen1 == nil { - t.Fatalf("expected generation 1 not to be nil") - } if err != nil { t.Fatalf("expected no error, but got %+v", err) } + if gen1 == nil { + t.Fatalf("expected generation 1 not to be nil") + } // returning from this function should cause the generation to // exit. gen1.Start(func(context.Context) {}) diff --git a/describegroups_test.go b/describegroups_test.go index ad5890988..5d907366a 100644 --- a/describegroups_test.go +++ b/describegroups_test.go @@ -32,7 +32,7 @@ func TestClientDescribeGroups(t *testing.T) { Topic: topic, }) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // longer timeout for kafka-4.0 defer cancel() err := w.WriteMessages( diff --git a/electleaders_test.go b/electleaders_test.go index 3dbaa4704..8933bb4c1 100644 --- a/electleaders_test.go +++ b/electleaders_test.go @@ -3,6 +3,7 @@ package kafka import ( "context" "testing" + "time" ktesting "github.com/segmentio/kafka-go/testing" ) @@ -26,6 +27,7 @@ func TestClientElectLeaders(t *testing.T) { &ElectLeadersRequest{ Topic: topic, Partitions: []int{0, 1}, + Timeout: 5 * time.Second, }, ) diff --git a/listgroups_test.go b/listgroups_test.go index 8c389d712..a148718be 100644 --- a/listgroups_test.go +++ b/listgroups_test.go @@ -55,7 +55,7 @@ func TestClientListGroups(t *testing.T) { Topic: topic, }) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // longer timeout for kafka-4.0 defer cancel() err := w.WriteMessages( diff --git a/writer_test.go b/writer_test.go index 6f894ecd3..def7d8a63 100644 --- a/writer_test.go +++ b/writer_test.go @@ -856,7 +856,8 @@ func testWriterAutoCreateTopic(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err = w.WriteMessages(ctx, msg) - if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, UnknownTopicOrPartition) || + errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } @@ -924,7 +925,8 @@ func testWriterSasl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err = w.WriteMessages(ctx, msg) - if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, UnknownTopicOrPartition) || + errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue }