From 1ae278caf09a555634f17d1808444f582aedc088 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sun, 19 Oct 2025 18:25:11 +0800 Subject: [PATCH] KAFKA-19804: Improve heartbeat request manager initial HB interval --- .../AbstractHeartbeatRequestManager.java | 5 +++-- .../internals/HeartbeatRequestState.java | 5 +++++ .../StreamsGroupHeartbeatRequestManager.java | 3 ++- .../ConsumerHeartbeatRequestManagerTest.java | 15 +++++++++++---- .../internals/HeartbeatRequestStateTest.java | 14 ++++++++++++++ .../ShareHeartbeatRequestManagerTest.java | 16 +++++++++++----- 6 files changed, 46 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index cba2b65cbba7d..7827e097cc717 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -87,7 +87,7 @@ public abstract class AbstractHeartbeatRequestManager groupInstanceId, CloseOptions.Gro heartbeatState, heartbeatRequestState, backgroundEventHandler); + // Clear the initial heartbeat request + assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); when(membershipManager.state()).thenReturn(MemberState.LEAVING); when(membershipManager.groupInstanceId()).thenReturn(groupInstanceId); when(membershipManager.leaveGroupOperation()).thenReturn(operation); if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation) { assertNoHeartbeat(heartbeatRequestManager); - verify(membershipManager, never()).onHeartbeatRequestGenerated(); + // The onHeartbeatRequestGenerated was triggered by the initial heartbeat request + verify(membershipManager, times(1)).onHeartbeatRequestGenerated(); } else { assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); - verify(membershipManager).onHeartbeatRequestGenerated(); + // The onHeartbeatRequestGenerated was triggered by the initial heartbeat request and leaveGroupOperation + verify(membershipManager, times(2)).onHeartbeatRequestGenerated(); } - } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestStateTest.java index 025e99a9aecc1..43276da360123 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestStateTest.java @@ -47,6 +47,10 @@ public void testCanSendRequestAndTimeToNextHeartbeatMs() { JITTER ); + assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt + heartbeatRequestState.onSendAttempt(time.milliseconds()); + heartbeatRequestState.onSuccessfulAttempt(time.milliseconds()); + assertFalse(heartbeatRequestState.canSendRequest(time.milliseconds())); assertEquals(HEARTBEAT_INTERVAL_MS, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds())); time.sleep(HEARTBEAT_INTERVAL_MS - 1); @@ -70,6 +74,11 @@ public void testResetTimer() { RETRY_BACKOFF_MAX_MS, JITTER ); + + assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt + heartbeatRequestState.onSendAttempt(time.milliseconds()); + heartbeatRequestState.onSuccessfulAttempt(time.milliseconds()); + time.sleep(HEARTBEAT_INTERVAL_MS + 100); assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); assertEquals(0, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds())); @@ -90,6 +99,11 @@ public void testUpdateHeartbeatIntervalMs() { RETRY_BACKOFF_MAX_MS, JITTER ); + + assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt + heartbeatRequestState.onSendAttempt(time.milliseconds()); + heartbeatRequestState.onSuccessfulAttempt(time.milliseconds()); + final long updatedHeartbeatIntervalMs = 2 * HEARTBEAT_INTERVAL_MS; time.sleep(HEARTBEAT_INTERVAL_MS + 100); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java index 8952271b250d5..f74f7d4a366ec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java @@ -169,7 +169,7 @@ private void createHeartbeatStateAndRequestManager() { @Test public void testHeartbeatOnStartup() { NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(0, result.unsentRequests.size()); + assertEquals(1, result.unsentRequests.size()); // the initial heartbeat request attempt createHeartbeatRequestStateWithZeroHeartbeatInterval(); assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); @@ -184,14 +184,16 @@ public void testHeartbeatOnStartup() { @Test public void testSuccessfulHeartbeatTiming() { NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(0, result.unsentRequests.size(), - "No heartbeat should be sent while interval has not expired"); + assertEquals(1, result.unsentRequests.size(), + "initial heartbeat request should be send on the first poll"); assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs); + NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); - NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + inflightReq = result.unsentRequests.get(0); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), "Heartbeat timer was not reset to the interval when the heartbeat request was sent."); @@ -262,8 +264,12 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { - time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); // the initial heartbeat request attempt + inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); + + time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent + result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs);