Skip to content

KAFKA-19042: [6/N] Move PlaintextConsumerFetchTest to client-integration-tests module #19520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from

Conversation

m1a2st
Copy link
Collaborator

@m1a2st m1a2st commented Apr 20, 2025

Use Java to rewrite PlaintextConsumerFetchTest by new test infra and
move it to client-integration-tests module.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) clients labels Apr 20, 2025
);
}

assertEquals(expected, actual);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please consider comparing the fields one by one to produce accurate error message?

            for (var i = 0; i < consumerRecords.size(); i++) {
                var producerRecord = producerRecords.get(i);
                var consumerRecord = consumerRecords.get(i);
                assertEquals(producerRecord.topic(), consumerRecord.topic());
                assertEquals(producerRecord.partition(), consumerRecord.partition());
            }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, addressed it :)

@github-actions github-actions bot removed the triage PRs from the community label Apr 21, 2025
Copy link
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Leave some minor comments.

Comment on lines 124 to 129
try (Producer<byte[], byte[]> producer = cluster.producer()) {
for (var i = 0; i < numRecords; i++) {
sendRecord(producer, tp, startingTimestamp, i, -1);
}
producer.flush();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try (Producer<byte[], byte[]> producer = cluster.producer()) {
for (var i = 0; i < numRecords; i++) {
sendRecord(producer, tp, startingTimestamp, i, -1);
}
producer.flush();
}
sendRecords(cluster, tp, numRecords, startingTimestamp, -1);

TopicPartition tp,
int numRecords
) {
sendRecords(cluster, tp, numRecords, System.currentTimeMillis(), -1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sendRecords(cluster, tp, numRecords, System.currentTimeMillis(), -1);
sendRecords(cluster, tp, numRecords, System.currentTimeMillis());

assertEquals("value " + keyAndValueIndex, new String(record.value()));
// this is true only because K and V are byte arrays
assertEquals(("key " + keyAndValueIndex).length(), record.serializedKeySize());
assertEquals(("value " + keyAndValueIndex).length(), record.serializedValueSize());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we define two static variables for "key " and "value "? They are reused many times in this class.

private void testFetchOutOfRangeOffsetResetConfigEarliest(GroupProtocol groupProtocol) throws InterruptedException {
Map<String, Object> config = Map.of(
GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT),
FETCH_MAX_WAIT_MS_CONFIG, 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add comment for this variable?

// ensure no in-flight fetch request so that the offset can be reset immediately

Map<String, Object> config = Map.of(
GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT),
AUTO_OFFSET_RESET_CONFIG, "latest",
FETCH_MAX_WAIT_MS_CONFIG, 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines 468 to 469
var produced = producedByPartition.getOrDefault(partition, Collections.emptyList());
var consumed = consumedByPartition.getOrDefault(partition, Collections.emptyList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var produced = producedByPartition.getOrDefault(partition, Collections.emptyList());
var consumed = consumedByPartition.getOrDefault(partition, Collections.emptyList());
var produced = producedByPartition.getOrDefault(partition, List.of());
var consumed = consumedByPartition.getOrDefault(partition, List.of());

cluster.createTopic(topicName, partitionCount, (short) BROKER_COUNT);
}

List<TopicPartition> partitions = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use HashSet here, so we don't need to use Set.copyOf?

Comment on lines 394 to 395
var records = consumer.poll(Duration.ofMillis(20000));
assertEquals(1, records.count());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about reusing consumeRecords?

Suggested change
var records = consumer.poll(Duration.ofMillis(20000));
assertEquals(1, records.count());
var records = consumeRecords(consumer, 1);
assertEquals(1, records.size());

Comment on lines 322 to 323
var records = consumer.poll(Duration.ofMillis(20000));
assertEquals(1, records.count());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about reusing consumerRecords?

Suggested change
var records = consumer.poll(Duration.ofMillis(20000));
assertEquals(1, records.count());
var records = consumeRecords(consumer, 1);
assertEquals(1, records.size());

@m1a2st
Copy link
Collaborator Author

m1a2st commented Apr 22, 2025

Thanks for @FrankYang0529 comments, addressed all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients core Kafka Broker tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants