Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
private final BackgroundEventHandler backgroundEventHandler;

/**
* Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop
* Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop
* sending heartbeat until the next poll.
*/
private final Timer pollTimer;
Expand All @@ -114,7 +114,8 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs,
int requestTimeout = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, requestTimeout, retryBackoffMs,
retryBackoffMaxMs, RETRY_BACKOFF_JITTER);
this.pollTimer = time.timer(maxPollIntervalMs);
this.metricsManager = metricsManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void onFailedAttempt(final long currentTimeMs) {
@Override
public boolean canSendRequest(final long currentTimeMs) {
update(currentTimeMs);
// Allow the first heartbeat to be sent immediately, after the first heartbeat is sent,
// lastSentMs will be set by onSendAttempt(), and subsequent heartbeats will be controlled by the timer.
if (lastSentMs == -1) {
return super.canSendRequest(currentTimeMs);
}
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,11 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.heartbeatState = new HeartbeatState(streamsRebalanceData, membershipManager, maxPollIntervalMs);
int requestTimeout = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.heartbeatRequestState = new HeartbeatRequestState(
logContext,
time,
0,
requestTimeout,
retryBackoffMs,
retryBackoffMaxMs,
RETRY_BACKOFF_JITTER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -97,6 +98,7 @@ public class ConsumerHeartbeatRequestManagerTest {
private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id";
private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
private static final int DEFAULT_REQUEST_TIMEOUT_MS = 30000;
private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
private static final long DEFAULT_RETRY_BACKOFF_MS = 80;
private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
Expand Down Expand Up @@ -218,7 +220,7 @@ public void testHeartBeatRequestStateToStringBase() {
@Test
public void testHeartbeatOnStartup() {
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size());
assertEquals(1, result.unsentRequests.size());

createHeartbeatRequestStateWithZeroHeartbeatInterval();
assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
Expand All @@ -232,6 +234,7 @@ public void testHeartbeatOnStartup() {

@Test
public void testSuccessfulHeartbeatTiming() {
assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); // the initial heartbeat request
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(),
"No heartbeat should be sent while interval has not expired");
Expand Down Expand Up @@ -316,6 +319,7 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) {

@Test
public void testTimerNotDue() {
assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); // the initial heartbeat request
time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());

Expand Down Expand Up @@ -780,18 +784,21 @@ public void testPollOnLeaving(Optional<String> groupInstanceId, CloseOptions.Gro
heartbeatState,
heartbeatRequestState,
backgroundEventHandler);
// Clear the initial heartbeat request
assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
when(membershipManager.state()).thenReturn(MemberState.LEAVING);
when(membershipManager.groupInstanceId()).thenReturn(groupInstanceId);
when(membershipManager.leaveGroupOperation()).thenReturn(operation);

if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation) {
assertNoHeartbeat(heartbeatRequestManager);
verify(membershipManager, never()).onHeartbeatRequestGenerated();
// The onHeartbeatRequestGenerated was triggered by the initial heartbeat request
verify(membershipManager, times(1)).onHeartbeatRequestGenerated();
} else {
assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
verify(membershipManager).onHeartbeatRequestGenerated();
// The onHeartbeatRequestGenerated was triggered by the initial heartbeat request and leaveGroupOperation
verify(membershipManager, times(2)).onHeartbeatRequestGenerated();
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public void testCanSendRequestAndTimeToNextHeartbeatMs() {
JITTER
);

assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt
heartbeatRequestState.onSendAttempt(time.milliseconds());
heartbeatRequestState.onSuccessfulAttempt(time.milliseconds());

assertFalse(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(HEARTBEAT_INTERVAL_MS, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
time.sleep(HEARTBEAT_INTERVAL_MS - 1);
Expand All @@ -70,6 +74,11 @@ public void testResetTimer() {
RETRY_BACKOFF_MAX_MS,
JITTER
);

assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt
heartbeatRequestState.onSendAttempt(time.milliseconds());
heartbeatRequestState.onSuccessfulAttempt(time.milliseconds());

time.sleep(HEARTBEAT_INTERVAL_MS + 100);
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(0, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
Expand All @@ -90,6 +99,11 @@ public void testUpdateHeartbeatIntervalMs() {
RETRY_BACKOFF_MAX_MS,
JITTER
);

assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds())); // the initial heartbeat request attempt
heartbeatRequestState.onSendAttempt(time.milliseconds());
heartbeatRequestState.onSuccessfulAttempt(time.milliseconds());

final long updatedHeartbeatIntervalMs = 2 * HEARTBEAT_INTERVAL_MS;
time.sleep(HEARTBEAT_INTERVAL_MS + 100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private void createHeartbeatStateAndRequestManager() {
@Test
public void testHeartbeatOnStartup() {
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size());
assertEquals(1, result.unsentRequests.size()); // the initial heartbeat request attempt

createHeartbeatRequestStateWithZeroHeartbeatInterval();
assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
Expand All @@ -184,14 +184,16 @@ public void testHeartbeatOnStartup() {
@Test
public void testSuccessfulHeartbeatTiming() {
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(),
"No heartbeat should be sent while interval has not expired");
assertEquals(1, result.unsentRequests.size(),
"initial heartbeat request should be send on the first poll");
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()), result.timeUntilNextPollMs);
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0);
inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE));
assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires");
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0);
inflightReq = result.unsentRequests.get(0);
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()),
"Heartbeat timer was not reset to the interval when the heartbeat request was sent.");
Expand Down Expand Up @@ -262,8 +264,12 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) {

@Test
public void testTimerNotDue() {
time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); // the initial heartbeat request attempt
inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE));

time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent
result = heartbeatRequestManager.poll(time.milliseconds());

assertEquals(0, result.unsentRequests.size());
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs);
Expand Down
Loading