-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19829: Implement group-level initial rebalance delay #20755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new group-level configuration for streams groups to delay the initial rebalance. This helps prevent the first joining member from being assigned all active tasks and then slowly revoking them when additional members join, which was causing performance issues particularly with slow or overloaded members.
Key changes:
- Added
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIGconfiguration with a default value of 3000ms at both the group coordinator and group config levels - Implemented accessor methods to retrieve the initial rebalance delay setting
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| GroupCoordinatorConfig.java | Added broker-level configuration constant, field, initialization, and accessor for streams group initial rebalance delay |
| GroupConfig.java | Added group-level configuration constant, field, initialization, and accessor for streams initial rebalance delay |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me as a first step, but I am a bit uneasy introducing a config that is not used at all, not even in tests. Maybe we should just go ahead and extend this PR to a working subset of the feature.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
Show resolved
Hide resolved
| public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG; | ||
|
|
||
| public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.streams.initial.rebalance.delay.ms"; | ||
| public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax Are we good with a default of 3 seconds for delaying the initial rebalance?
|
@lucasbru Hi Lucas, I'm not going to merge it now, I'll add more things to it. Just want you to take a quick look to make sure I'm on the right direction! |
| * | ||
| * @return An empty result. | ||
| */ | ||
| private CoordinatorResult<Void, CoordinatorRecord> fireStreamsInitialRebalance(String groupId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is not doing anything, I think we can inline this.
|
|
||
| // Actually bump the group epoch | ||
| int groupEpoch = group.groupEpoch(); | ||
| boolean isInitialRebalance = group.isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Group being empty does not mean initial rebalance right? It could be that the group became empty again?
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense to me (except for the weirdness around initial epoch that I commented on), but I am wondering now if we could indeed compute a new target assignment when the timer triggers. This could be especially useful when we have offloaded assignments (in the future) and we'd otherwise even not get an assignment on the next heartbeat.
Maybe @squah-confluent could have a quick look how this would interact with the offloaded assignment code that he is going to implement in AK.
I think inside fireStreamsInitialRebalance, we could, if group.epoch() > group.assignmentEpoch(), just create the targetAssignmentbuilder for the current group ID. You can get all the information about the group from the groups map instance variable.
I can think of one corner case: If configuredTopology is not defined, we should just skip computing the target assignment. It will be computed on the next heartbeat.
|
Let's make sure we do not forget to update the docs, for the new config. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0); | ||
| if (bumpGroupEpoch) { | ||
| groupEpoch += 1; | ||
| if (isInitialRebalance) { | ||
| groupEpoch += 2; | ||
| } else { | ||
| groupEpoch += 1; | ||
| } |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isInitialRebalance check on line 1988 is evaluated before bumpGroupEpoch is processed (line 1989-1999), but the condition relies on group.isEmpty() which becomes false after the member is added to the group earlier in the method. This means isInitialRebalance will always be false for the first member join since the member is already added to the group before this check. The timer scheduling on line 2003 may never execute as intended.
| // Schedule initial rebalance delay for new streams groups to coalesce joins. | ||
| int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId); | ||
| if (isInitialRebalance && initialDelayMs > 0) { |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isInitialRebalance check on line 1988 is evaluated before bumpGroupEpoch is processed (line 1989-1999), but the condition relies on group.isEmpty() which becomes false after the member is added to the group earlier in the method. This means isInitialRebalance will always be false for the first member join since the member is already added to the group before this check. The timer scheduling on line 2003 may never execute as intended.
| if (isInitialRebalance && initialDelayMs > 0) { | ||
| timer.scheduleIfAbsent( | ||
| streamsInitialRebalanceKey(groupId), | ||
| initialDelayMs, | ||
| TimeUnit.MILLISECONDS, | ||
| false, | ||
| () -> fireStreamsInitialRebalance(groupId) | ||
| ); | ||
| } |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timer is only scheduled when bumpGroupEpoch is true (due to placement after line 1999), but isInitialRebalance is checked independently. If the first member join doesn't trigger a group epoch bump (which could happen if no metadata changes occur), the initial rebalance timer may not be scheduled. Consider scheduling the timer before the bumpGroupEpoch condition or ensuring it's always scheduled for the first member.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
...nator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I made a pass on the production code.
...nator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
Outdated
Show resolved
Hide resolved
...coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good to me, I have a another few comments.
@squah-confluent could you maybe have a look at the code around the timer?
| } | ||
|
|
||
| // Schedule initial rebalance delay for new streams groups to coalesce joins. | ||
| boolean isInitialRebalance = (initialGroupEpoch == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we just use group.groupEpoch() here?
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
| // replayed because its coordinator result has a non-null appendFuture. | ||
| createGroupTombstoneRecords(group, records); | ||
| // Cancel initial rebalance timer. | ||
| timer.cancel(streamsInitialRebalanceKey(groupId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to move this into createGroupTombstoneRecords, so that it's also done when delete group is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an odd place to move the cancellation, purely because of the naming, but I can't see where else it'd go.
|
Thanks for the PR! The interaction with delayed assignments would be fine. The initial rebalance delay would be just another condition to take into account when deciding whether to start an assignment run. The timer usage looks fine. We're missing some cleanup as @lucasbru pointed out. |
During testing, an artifact of the new rebalance protocol showed up. In
some cases, the first joining member gets all active tasks assigned, and
is slow to revoke the tasks after more member has joined the group. This
affects in particular cases where the first member is slow (possibly
overloaded in the case of cloudlimits benchmarks) and there are a lot of
tasks to be assigned.
To help with this situation, we want to introduce a new group-specific
configuration to delay the initial rebalance.
Main changes:
GroupConfig&GroupCoordinatorConfigis_scheduled methodto timerbumpGroup&groupEpoch == 0by two
DescribeStreamsGroupTestsince it's flaky nowsetGroupStreamsInitialRebalanceDelayto EmbeddedKafkaCluster` sowe can manully set the config.