diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index f5e12407be52e..ab40e54153499 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -62,6 +62,7 @@ import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent; import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; +import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent; import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent; @@ -1339,6 +1340,7 @@ private void close(Duration timeout, boolean swallowException) { () -> leaveGroupOnClose(closeTimer), firstException); swallow(log, Level.ERROR, "Failed invoking asynchronous commit callbacks while closing consumer", () -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException); + swallow(log, Level.ERROR, "Failed to stop finding coordinator", this::stopFindCoordinatorOnClose, firstException); if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); @@ -1421,6 +1423,14 @@ private void leaveGroupOnClose(final Timer timer) { } } + private void stopFindCoordinatorOnClose() { + if (groupMetadata.get().isEmpty()) + return; + + log.debug("Stop finding coordinator during consumer close"); + applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent()); + } + // Visible for testing void commitSyncAllConsumed(final Timer timer) { log.debug("Sending synchronous auto-commit on closing"); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 4664267a0e858..0f1650d0e674b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -59,6 +59,7 @@ public class CoordinatorRequestManager implements RequestManager { private final RequestState coordinatorRequestState; private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while private long totalDisconnectedMin = 0; + private boolean closing = false; private Node coordinator; public CoordinatorRequestManager( @@ -80,6 +81,11 @@ public CoordinatorRequestManager( ); } + @Override + public void signalClose() { + closing = true; + } + /** * Poll for the FindCoordinator request. * If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list. @@ -92,7 +98,7 @@ public CoordinatorRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - if (this.coordinator != null) + if (closing || this.coordinator != null) return EMPTY; if (coordinatorRequestState.canSendRequest(currentTimeMs)) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index dfb775f8947c1..4e0b8e3d2d17f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -34,7 +34,7 @@ public enum Type { TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE, UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, - COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, + COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE, PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 9c119e28b7b10..69cc0072a39b1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -148,6 +148,10 @@ public void process(ApplicationEvent event) { process((LeaveGroupOnCloseEvent) event); return; + case STOP_FIND_COORDINATOR_ON_CLOSE: + process((StopFindCoordinatorOnCloseEvent) event); + return; + case CREATE_FETCH_REQUESTS: process((CreateFetchRequestsEvent) event); return; @@ -452,6 +456,13 @@ private void process(final LeaveGroupOnCloseEvent event) { future.whenComplete(complete(event.future())); } + private void process(@SuppressWarnings("unused") final StopFindCoordinatorOnCloseEvent event) { + requestManagers.coordinatorRequestManager.ifPresent(manager -> { + log.debug("Signal CoordinatorRequestManager closing"); + manager.signalClose(); + }); + } + /** * Process event that tells the share consume request manager to fetch more records. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StopFindCoordinatorOnCloseEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StopFindCoordinatorOnCloseEvent.java new file mode 100644 index 0000000000000..8c927faa84d38 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StopFindCoordinatorOnCloseEvent.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class StopFindCoordinatorOnCloseEvent extends ApplicationEvent { + public StopFindCoordinatorOnCloseEvent() { + super(Type.STOP_FIND_COORDINATOR_ON_CLOSE); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java index 7e805dc3cd3b6..a3c839cc92d20 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java @@ -254,6 +254,26 @@ public void testNetworkTimeout() { assertEquals(1, res2.unsentRequests.size()); } + @Test + public void testSignalOnClose() { + CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); + + expectFindCoordinatorRequest(coordinatorManager, Errors.NONE); + assertTrue(coordinatorManager.coordinator().isPresent()); + + coordinatorManager.markCoordinatorUnknown("coordinator changed", time.milliseconds()); + assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); + + coordinatorManager.signalClose(); + + time.sleep(RETRY_BACKOFF_MS - 1); + assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); + + time.sleep(RETRY_BACKOFF_MS); + assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests, + "Should not generate find coordinator request during close"); + } + private void expectFindCoordinatorRequest( CoordinatorRequestManager coordinatorManager, Errors error