Skip to content

Conversation

jack2012aa
Copy link

@jack2012aa jack2012aa commented Aug 20, 2025

Description

KafkaProducerTest#testTopicExpiryInMetadata,
KafkaProducerTest#testTopicRefreshInMetadata, and
KafkaProducerTest#testTopicNotExistingInMetadata used busy waiting and
triggered IllegalStateException in MockClient#pool.

Changes

IllegalStateException

When sender calls MockClient#poll and metadata needs an update, the
client tries to get an update from list or use the original metadata as
an update. However, the original test bypasses the client and updates
the metadata directly. The client doesn't have the last update
information, so it throws the exception.

The exception is reasonable, but we should allow developers to choose
whether to update the metadata automatically. I add a flag
shouldUpdateWithCurrentMetadata so developers can control the
behavior.

Missing cluster

To let MockClient catch the updating request, tests have to use their
clients to update metadata. However, the initial metadata lacks cluster
information, so clients won't try updating. Adding a warmup update fixes
the bug.

Busy-waiting and additional thread

The original test use a thread to mock behavior of a broker which
responds to requests. The attempt is modified to use
MockClient#prepareMetadataUpdate to prepare some updates before
requesting, and use the MockClient#advanceTimeDuringPoll flag to move
the mock time to trigger timeout. Additional assertions are used to
check whether the client consume those updates.

@github-actions github-actions bot added triage PRs from the community producer tests Test fixes (including flaky tests) clients small Small PRs labels Aug 20, 2025
Copy link
Collaborator

@Yunyung Yunyung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. As I understand, the one goal is to "mock these metadata updates" to avoid or remove any usage of Thread.sleep / Utils.sleep / Time.SYSTEM, as well as any additional threads if possible. But, looks like the code still sleep in every place without actually mocking the metadata updates.

@@ -1036,8 +1048,9 @@ public void testTopicNotExistingInMetadata() throws InterruptedException {
@Test
public void testTopicExpiryInMetadata() throws InterruptedException {
Map<String, Object> configs = new HashMap<>();
final String maxBlockMs = "300000";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add an extra zero? Was that intentional?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this is a mistake. I'll fix it.

@@ -969,20 +970,28 @@ public void testTopicRefreshInMetadata() throws InterruptedException {
final Time time = new MockTime();
final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs,
new LogContext(), new ClusterResourceListeners(), time);
final String warmupTopic = "warmup-topic";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a warmupTopic?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first metadata update is used to update the cluster information. Adding another topic can better indicating that the update is just for warmup. It is also used in line 1083 to indicate that this update is to clean the expired topic.

@jack2012aa
Copy link
Author

Thanks for the PR. As I understand, the one goal is to "mock these metadata updates" to avoid or remove any usage of Thread.sleep / Utils.sleep / Time.SYSTEM, as well as any additional threads if possible. But, looks like the code still sleep in every place without actually mocking the metadata updates.

Hello @Yunyung, thank you for the review.

The major reason I keep the threads is to control the mock time. Although MockClient has the advanceTimeDuringPoll flag, its comment in MockClient#poll says that it works when no response is received (though it does not work like that).

A thread-free approach will be combining MockClient#prepareMetadataUpdate and the advance time flag. However, a new issue is that we can't distinguish whether the producer requests an update; the timeout may be triggered automatically by MockClient#poll. We can add another check on how many updates do the mock client consume. If you think this is a better method, I am willing to write a new version.

As for sleep, I only use CountDownLatch for the new await method. Since it is not a busy waiting, I seem it as acceptable. It can also be removed if we use the thread-free approach.

@github-actions github-actions bot removed the small Small PRs label Aug 20, 2025
@jack2012aa
Copy link
Author

Hi @Yunyung I have pushed a thread-freed version. Please take a look when you are not busy.

@github-actions github-actions bot removed the triage PRs from the community label Aug 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients producer tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants