-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19829: Implement group-level initial rebalance delay #20755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 9 commits
fc586ac
33f9667
e57d4a1
164143f
e2410bc
3c163b7
b0814e2
f402c05
6ea4ee5
6867fca
c94fed0
90a29c5
82bf5f8
b581df2
7569d3c
c3824bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream | |
|
|
||
| // Actually bump the group epoch | ||
| int groupEpoch = group.groupEpoch(); | ||
| boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0); | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (bumpGroupEpoch) { | ||
| groupEpoch += 1; | ||
| if (groupEpoch == 0) { | ||
| groupEpoch += 2; | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } else { | ||
| groupEpoch += 1; | ||
| } | ||
| records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch, currentAssignmentConfigs)); | ||
| log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {} and validated topic epoch {}.", groupId, memberId, groupEpoch, metadataHash, validatedTopologyEpoch); | ||
| metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME); | ||
| group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); | ||
| } | ||
|
|
||
| // Schedule initial rebalance delay for new streams groups to coalesce joins. | ||
| int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId); | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (isInitialRebalance && initialDelayMs > 0) { | ||
|
||
| timer.scheduleIfAbsent( | ||
| streamsInitialRebalanceKey(groupId), | ||
| initialDelayMs, | ||
| TimeUnit.MILLISECONDS, | ||
| false, | ||
| () -> fireStreamsInitialRebalance(groupId) | ||
| ); | ||
| } | ||
|
||
|
|
||
| // 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member | ||
| // replaces an existing static member. | ||
| // The delta between the existing and the new target assignment is persisted to the partition. | ||
| int targetAssignmentEpoch; | ||
| TasksTuple targetAssignment; | ||
| if (groupEpoch > group.assignmentEpoch()) { | ||
| targetAssignment = updateStreamsTargetAssignment( | ||
| group, | ||
| groupEpoch, | ||
| updatedMember, | ||
| updatedConfiguredTopology, | ||
| metadataImage, | ||
| records, | ||
| currentAssignmentConfigs | ||
| ); | ||
| targetAssignmentEpoch = groupEpoch; | ||
| boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId)); | ||
RaidenE1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (initialDelayActive && group.assignmentEpoch() == 0) { | ||
| // During initial rebalance delay, return empty assignment to first joining members. | ||
| targetAssignmentEpoch = groupEpoch; | ||
| targetAssignment = TasksTuple.EMPTY; | ||
| } else { | ||
| targetAssignment = updateStreamsTargetAssignment( | ||
| group, | ||
| groupEpoch, | ||
| updatedMember, | ||
| updatedConfiguredTopology, | ||
| metadataImage, | ||
| records, | ||
| currentAssignmentConfigs | ||
| ); | ||
| targetAssignmentEpoch = groupEpoch; | ||
| } | ||
| } else { | ||
| targetAssignmentEpoch = group.assignmentEpoch(); | ||
| targetAssignment = group.targetAssignment(updatedMember.memberId()); | ||
|
|
@@ -3693,7 +3717,7 @@ private StreamsGroupMember maybeReconcile( | |
| List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks, | ||
| List<CoordinatorRecord> records | ||
| ) { | ||
| if (member.isReconciledTo(targetAssignmentEpoch)) { | ||
| if (member.isReconciledTo(targetAssignmentEpoch) && member.assignedTasks().equals(targetAssignment)) { | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return member; | ||
| } | ||
|
|
||
|
|
@@ -4003,6 +4027,69 @@ private TasksTuple updateStreamsTargetAssignment( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Fires the initial rebalance for a streams group when the delay timer expires. | ||
| * Computes and persists target assignment for all members if conditions are met. | ||
| * | ||
| * @param groupId The group id. | ||
| * @return A CoordinatorResult with records to persist the target assignment, or EMPTY_RESULT. | ||
| */ | ||
| private CoordinatorResult<Void, CoordinatorRecord> fireStreamsInitialRebalance( | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| String groupId | ||
| ) { | ||
| try { | ||
| StreamsGroup group = streamsGroup(groupId); | ||
|
|
||
| if (group.groupEpoch() <= group.assignmentEpoch()) { | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return EMPTY_RESULT; | ||
| } | ||
|
|
||
| if (!group.configuredTopology().isPresent()) { | ||
| return EMPTY_RESULT; | ||
RaidenE1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| TaskAssignor assignor = streamsGroupAssignor(group.groupId()); | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| try { | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder assignmentResultBuilder = | ||
| new org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder( | ||
| group.groupId(), | ||
| group.groupEpoch(), | ||
| assignor, | ||
| group.lastAssignmentConfigs() | ||
| ) | ||
| .withMembers(group.members()) | ||
| .withTopology(group.configuredTopology().get()) | ||
| .withStaticMembers(group.staticMembers()) | ||
| .withMetadataImage(metadataImage) | ||
| .withTargetAssignment(group.targetAssignment()); | ||
|
|
||
| long startTimeMs = time.milliseconds(); | ||
| org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = | ||
| assignmentResultBuilder.build(); | ||
| long assignorTimeMs = time.milliseconds() - startTimeMs; | ||
|
|
||
| if (log.isDebugEnabled()) { | ||
| log.debug("[GroupId {}] Initial rebalance: Computed target assignment for epoch {} with '{}' assignor in {}ms: {}.", | ||
| group.groupId(), group.groupEpoch(), assignor, assignorTimeMs, assignmentResult.targetAssignment()); | ||
| } else { | ||
| log.info("[GroupId {}] Initial rebalance: Computed target assignment for epoch {} with '{}' assignor in {}ms.", | ||
| group.groupId(), group.groupEpoch(), assignor, assignorTimeMs); | ||
| } | ||
|
|
||
| return new CoordinatorResult<>(assignmentResult.records(), null); | ||
| } catch (TaskAssignorException ex) { | ||
| String msg = String.format("Failed to compute target assignment for initial rebalance at epoch %d: %s", | ||
| group.groupEpoch(), ex.getMessage()); | ||
| log.error("[GroupId {}] {}.", group.groupId(), msg); | ||
| throw new UnknownServerException(msg, ex); | ||
| } | ||
| } catch (GroupIdNotFoundException ex) { | ||
| log.warn("[GroupId {}] Group not found during initial rebalance.", groupId); | ||
| return EMPTY_RESULT; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handles leave request from a consumer group member. | ||
| * @param groupId The group id from the request. | ||
|
|
@@ -8570,6 +8657,10 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRec | |
| // Add tombstones for the previous streams group. The tombstones won't actually be | ||
| // replayed because its coordinator result has a non-null appendFuture. | ||
| createGroupTombstoneRecords(group, records); | ||
| // Cancel any pending initial rebalance timer. | ||
| if (timer.isScheduled(streamsInitialRebalanceKey(groupId))) { | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| timer.cancel(streamsInitialRebalanceKey(groupId)); | ||
| } | ||
| removeGroup(groupId); | ||
| return true; | ||
| } | ||
|
|
@@ -8659,6 +8750,15 @@ private int streamsGroupHeartbeatIntervalMs(String groupId) { | |
| .orElse(config.streamsGroupHeartbeatIntervalMs()); | ||
| } | ||
|
|
||
| /** | ||
| * Get the initial rebalance delay of the provided streams group. | ||
| */ | ||
| private int streamsGroupInitialRebalanceDelayMs(String groupId) { | ||
| Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId); | ||
| return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs) | ||
| .orElse(config.streamsGroupInitialRebalanceDelayMs()); | ||
| } | ||
|
|
||
| /** | ||
| * Get the assignor of the provided streams group. | ||
| */ | ||
|
|
@@ -8716,6 +8816,20 @@ static String classicGroupSyncKey(String groupId) { | |
| return "sync-" + groupId; | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Generate a streams group initial rebalance key for the timer. | ||
| * | ||
| * Package private for testing. | ||
| * | ||
| * @param groupId The group id. | ||
| * | ||
| * @return the initial rebalance key. | ||
| */ | ||
| static String streamsInitialRebalanceKey(String groupId) { | ||
| return "initial-rebalance-timeout-" + groupId; | ||
| } | ||
RaidenE1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Generate a consumer group join key for the timer. | ||
| * | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax Are we good with a default of 3 seconds for delaying the initial rebalance?