Skip to content

Commit

Permalink
KAFKA-18569: New consumer close may wait on unneeded FindCoordinator
Browse files Browse the repository at this point in the history
JIRA: KAFKA-18569
Please refer to ticker for further details
  • Loading branch information
frankvicky committed Jan 17, 2025
1 parent 8cc560e commit 3bb4130
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
Expand Down Expand Up @@ -1339,6 +1340,7 @@ private void close(Duration timeout, boolean swallowException) {
() -> leaveGroupOnClose(closeTimer), firstException);
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callbacks while closing consumer",
() -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException);
swallow(log, Level.ERROR, "Failed to stop finding coordinator", this::stopFindCoordinatorOnClose, firstException);
if (applicationEventHandler != null)
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
closeTimer.update();
Expand Down Expand Up @@ -1421,6 +1423,14 @@ private void leaveGroupOnClose(final Timer timer) {
}
}

private void stopFindCoordinatorOnClose() {
if (groupMetadata.get().isEmpty())
return;

log.debug("Stop finding coordinator during consumer close");
applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
}

// Visible for testing
void commitSyncAllConsumed(final Timer timer) {
log.debug("Sending synchronous auto-commit on closing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class CoordinatorRequestManager implements RequestManager {
private final RequestState coordinatorRequestState;
private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
private long totalDisconnectedMin = 0;
private boolean closing = false;
private Node coordinator;

public CoordinatorRequestManager(
Expand All @@ -80,6 +81,11 @@ public CoordinatorRequestManager(
);
}

@Override
public void signalClose() {
closing = true;
}

/**
* Poll for the FindCoordinator request.
* If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list.
Expand All @@ -92,7 +98,7 @@ public CoordinatorRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (this.coordinator != null)
if (closing || this.coordinator != null)
return EMPTY;

if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public enum Type {
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE,
PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ public void process(ApplicationEvent event) {
process((LeaveGroupOnCloseEvent) event);
return;

case STOP_FIND_COORDINATOR_ON_CLOSE:
process((StopFindCoordinatorOnCloseEvent) event);
return;

case CREATE_FETCH_REQUESTS:
process((CreateFetchRequestsEvent) event);
return;
Expand Down Expand Up @@ -452,6 +456,13 @@ private void process(final LeaveGroupOnCloseEvent event) {
future.whenComplete(complete(event.future()));
}

private void process(@SuppressWarnings("unused") final StopFindCoordinatorOnCloseEvent event) {
requestManagers.coordinatorRequestManager.ifPresent(manager -> {
log.debug("Signal CoordinatorRequestManager closing");
manager.signalClose();
});
}

/**
* Process event that tells the share consume request manager to fetch more records.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

public class StopFindCoordinatorOnCloseEvent extends ApplicationEvent {
public StopFindCoordinatorOnCloseEvent() {
super(Type.STOP_FIND_COORDINATOR_ON_CLOSE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,26 @@ public void testNetworkTimeout() {
assertEquals(1, res2.unsentRequests.size());
}

@Test
public void testSignalOnClose() {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);

expectFindCoordinatorRequest(coordinatorManager, Errors.NONE);
assertTrue(coordinatorManager.coordinator().isPresent());

coordinatorManager.markCoordinatorUnknown("coordinator changed", time.milliseconds());
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);

coordinatorManager.signalClose();

time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);

time.sleep(RETRY_BACKOFF_MS);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests,
"Should not generate find coordinator request during close");
}

private void expectFindCoordinatorRequest(
CoordinatorRequestManager coordinatorManager,
Errors error
Expand Down

0 comments on commit 3bb4130

Please sign in to comment.