Skip to content

Commit 51771dd

Browse files
committed
fix some kafka 4.0 ci errors
1 parent 85067e2 commit 51771dd

File tree

7 files changed

+53
-16
lines changed

7 files changed

+53
-16
lines changed

.circleci/config.yml

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,10 @@ jobs:
178178
entrypoint: *entrypoint
179179
steps: *steps
180180

181-
kafka-400:
181+
kafka-370-kraft:
182182
working_directory: *working_directory
183183
environment:
184-
KAFKA_VERSION: "4.0.0"
184+
KAFKA_VERSION: "3.7.0"
185185

186186
# Need to skip nettest to avoid these kinds of errors:
187187
# --- FAIL: TestConn/nettest (17.56s)
@@ -195,13 +195,17 @@ jobs:
195195
KAFKA_SKIP_NETTEST: "1"
196196
docker:
197197
- image: circleci/golang
198-
- image: bitnami/kafka:4.0.0
198+
- image: bitnami/kafka:3.7.0
199199
ports:
200200
- 9092:9092
201201
- 9093:9093
202-
environment:
202+
environment: &kraft-env
203+
KAFKA_KRAFT_MODE: "true"
203204
KAFKA_CFG_NODE_ID: 1
204205
KAFKA_CFG_BROKER_ID: 1
206+
KAFKA_CLUSTER_ID: 1
207+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
208+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
205209
KAFKA_CFG_PROCESS_ROLES: broker,controller
206210
KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
207211
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
@@ -212,21 +216,45 @@ jobs:
212216
KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
213217
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
214218
ALLOW_PLAINTEXT_LISTENER: yes
215-
KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
216-
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
217-
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
218-
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
219+
KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true
220+
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
221+
KAFKA_CFG_DELETE_TOPIC_ENABLE: true
219222
KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
220223
KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer'
224+
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
221225
KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain
222226
KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain
223227
KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret
224228
KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
225229
KAFKA_INTER_BROKER_USER: adminscram512
226230
KAFKA_INTER_BROKER_PASSWORD: admin-secret-512
227231
KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
232+
entrypoint: *entrypoint
228233
steps: *steps
229234

235+
kafka-400:
236+
working_directory: *working_directory
237+
environment:
238+
KAFKA_VERSION: "4.0.0"
239+
240+
# Need to skip nettest to avoid these kinds of errors:
241+
# --- FAIL: TestConn/nettest (17.56s)
242+
# --- FAIL: TestConn/nettest/PingPong (7.40s)
243+
# conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
244+
# conntest.go:118: mismatching value: got 77, want 78
245+
# conntest.go:118: mismatching value: got 78, want 79
246+
# ...
247+
#
248+
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
249+
KAFKA_SKIP_NETTEST: "1"
250+
docker:
251+
- image: circleci/golang
252+
- image: bitnami/kafka:4.0.0
253+
ports:
254+
- 9092:9092
255+
- 9093:9093
256+
environment: *kraft-env
257+
steps: *steps
230258
workflows:
231259
version: 2
232260
run:
@@ -236,4 +264,5 @@ workflows:
236264
- kafka-270
237265
- kafka-281
238266
- kafka-370
267+
- kafka-370-kraft
239268
- kafka-400

alterclientquotas_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package kafka
33
import (
44
"context"
55
"testing"
6+
"time"
67

7-
ktesting "github.com/segmentio/kafka-go/testing"
88
"github.com/stretchr/testify/assert"
9+
10+
ktesting "github.com/segmentio/kafka-go/testing"
911
)
1012

1113
func TestClientAlterClientQuotas(t *testing.T) {
@@ -65,6 +67,8 @@ func TestClientAlterClientQuotas(t *testing.T) {
6567

6668
assert.Equal(t, expectedAlterResp, *alterResp)
6769

70+
time.Sleep(1 * time.Second) // wait for the quota to be applie (Kafka 4.0.0+)
71+
6872
describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{
6973
Components: []DescribeClientQuotasRequestComponent{
7074
{

consumergroup_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,12 @@ func TestConsumerGroup(t *testing.T) {
250250
scenario: "Next returns generations",
251251
function: func(t *testing.T, ctx context.Context, cg *ConsumerGroup) {
252252
gen1, err := cg.Next(ctx)
253-
if gen1 == nil {
254-
t.Fatalf("expected generation 1 not to be nil")
255-
}
256253
if err != nil {
257254
t.Fatalf("expected no error, but got %+v", err)
258255
}
256+
if gen1 == nil {
257+
t.Fatalf("expected generation 1 not to be nil")
258+
}
259259
// returning from this function should cause the generation to
260260
// exit.
261261
gen1.Start(func(context.Context) {})

describegroups_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestClientDescribeGroups(t *testing.T) {
3232
Topic: topic,
3333
})
3434

35-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
35+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // longer timeout for kafka-4.0
3636
defer cancel()
3737

3838
err := w.WriteMessages(

electleaders_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
ktesting "github.com/segmentio/kafka-go/testing"
89
)
@@ -26,6 +27,7 @@ func TestClientElectLeaders(t *testing.T) {
2627
&ElectLeadersRequest{
2728
Topic: topic,
2829
Partitions: []int{0, 1},
30+
Timeout: 5 * time.Second,
2931
},
3032
)
3133

listgroups_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestClientListGroups(t *testing.T) {
5555
Topic: topic,
5656
})
5757

58-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
58+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // longer timeout for kafka-4.0
5959
defer cancel()
6060

6161
err := w.WriteMessages(

writer_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,8 @@ func testWriterAutoCreateTopic(t *testing.T) {
856856
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
857857
defer cancel()
858858
err = w.WriteMessages(ctx, msg)
859-
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
859+
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, UnknownTopicOrPartition) ||
860+
errors.Is(err, context.DeadlineExceeded) {
860861
time.Sleep(time.Millisecond * 250)
861862
continue
862863
}
@@ -924,7 +925,8 @@ func testWriterSasl(t *testing.T) {
924925
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
925926
defer cancel()
926927
err = w.WriteMessages(ctx, msg)
927-
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
928+
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, UnknownTopicOrPartition) ||
929+
errors.Is(err, context.DeadlineExceeded) {
928930
time.Sleep(time.Millisecond * 250)
929931
continue
930932
}

0 commit comments

Comments
 (0)