diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 3915ff7c8df88..5bc47f02b911b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -468,6 +468,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, time, + config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, @@ -660,6 +661,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, ); this.applicationEventHandler = new ApplicationEventHandler(logContext, time, + config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, @@ -677,6 +679,7 @@ interface ApplicationEventHandlerFactory { ApplicationEventHandler build( final LogContext logContext, final Time time, + final int initializationTimeoutMs, final BlockingQueue applicationEventQueue, final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index d2d178a88c38b..4851878af929e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; @@ -24,6 +25,10 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.utils.KafkaThread; @@ -41,8 +46,13 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import javax.security.auth.spi.LoginModule; + import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -68,6 +78,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private RequestManagers requestManagers; private volatile boolean running; private final IdempotentCloser closer = new IdempotentCloser(); + private final CountDownLatch initializationLatch = new CountDownLatch(1); + private final AtomicReference initializationError = new AtomicReference<>(); private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS; private long lastPollTimeMs = 0L; @@ -92,13 +104,57 @@ public ConsumerNetworkThread(LogContext logContext, this.asyncConsumerMetrics = asyncConsumerMetrics; } + /** + * Start the network thread and let it complete its initialization before proceeding. The + * {@link ClassicKafkaConsumer} constructor blocks during creation of its {@link NetworkClient}, providing + * precedent for waiting here. + * + * In certain cases (e.g. an invalid {@link LoginModule} in {@link SaslConfigs#SASL_JAAS_CONFIG}), an error + * could be thrown during {@link #initializeResources()}. This would result in the {@link #run()} method + * exiting, no longer able to process events, which means that the consumer effectively hangs. + * + * @param timeoutMs Length of time, in milliseconds, to wait for the thread to start and complete initialization + */ + public void start(int timeoutMs) { + // start() is invoked internally instead of by the caller to avoid SpotBugs errors about starting a thread + // in a constructor. + start(); + + try { + if (!initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)) { + maybeSetInitializationError( + new TimeoutException("Consumer network thread resource initialization timed out after " + timeoutMs + " ms") + ); + } + } catch (InterruptedException e) { + maybeSetInitializationError( + new InterruptException("Consumer network thread resource initialization was interrupted", e) + ); + } + + KafkaException e = initializationError.get(); + + if (e != null) + throw e; + } + @Override public void run() { try { log.debug("Consumer network thread started"); // Wait until we're securely in the background network thread to initialize these objects... - initializeResources(); + try { + initializeResources(); + } catch (Throwable t) { + KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + maybeSetInitializationError(e); + + // This will still call cleanup() via the `finally` section below. + return; + } finally { + initializationLatch.countDown(); + } while (running) { try { @@ -108,13 +164,20 @@ public void run() { log.error("Unexpected error caught in consumer network thread", e); } } - } catch (final Throwable e) { - log.error("Failed to initialize resources for consumer network thread", e); + } catch (Throwable t) { + log.error("Unexpected failure in consumer network thread", t); } finally { cleanup(); } } + private void maybeSetInitializationError(KafkaException error) { + if (initializationError.compareAndSet(null, error)) + return; + + log.error("Consumer network thread resource initialization error ({}) will be suppressed as an error was already set", error.getMessage(), error); + } + void initializeResources() { applicationEventProcessor = applicationEventProcessorSupplier.get(); networkClientDelegate = networkClientDelegateSupplier.get(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 4a7e19a6e5694..91f06fcb47c38 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -307,6 +307,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, time, + config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, @@ -413,6 +414,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { this.applicationEventHandler = new ApplicationEventHandler( logContext, time, + config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, @@ -478,6 +480,7 @@ interface ApplicationEventHandlerFactory { ApplicationEventHandler build( final LogContext logContext, final Time time, + final int initializationTimeoutMs, final BlockingQueue applicationEventQueue, final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index 6ab827b617c19..645b121483f88 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -51,6 +51,7 @@ public class ApplicationEventHandler implements Closeable { public ApplicationEventHandler(final LogContext logContext, final Time time, + final int initializationTimeoutMs, final BlockingQueue applicationEventQueue, final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, @@ -69,7 +70,8 @@ public ApplicationEventHandler(final LogContext logContext, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics); - this.networkThread.start(); + + this.networkThread.start(initializationTimeoutMs); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 78ff15cee5f8e..9e8e0b88a017c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InterruptException; @@ -93,6 +94,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -131,6 +133,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; @@ -152,6 +155,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; +import javax.security.auth.login.LoginException; import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON; @@ -3796,6 +3800,58 @@ void testMonitorablePlugins(GroupProtocol groupProtocol) { } } + /** + * This test ensures that both {@link Consumer} implementations fail on creation when the underlying + * {@link NetworkClient} fails creation. + * + * The logic to check for this case is admittedly a bit awkward because the constructor can fail for all + * manner of reasons. So a failure case is created by specifying an invalid + * {@link javax.security.auth.spi.LoginModule} class name, which in turn causes the {@link NetworkClient} + * to fail. + * + * This test was created to validate the change for KAFKA-19394 for the {@link AsyncKafkaConsumer}. The fix + * should handle the case where failure during initialization of resources (in this test, the underlying + * {@link NetworkClient}) will not cause the creation of the {@link AsyncKafkaConsumer} to hang. + */ + @ParameterizedTest + @EnumSource(value = GroupProtocol.class) + public void testConstructorFailOnNetworkClientConstructorFailure(GroupProtocol groupProtocol) { + Map configs = Map.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name, + SaslConfigs.SASL_MECHANISM, "PLAIN", + SaslConfigs.SASL_JAAS_CONFIG, "org.example.InvalidLoginModule required ;", + ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT) + ); + + KafkaException e = assertThrows(KafkaException.class, () -> { + try (KafkaConsumer ignored = new KafkaConsumer<>(configs)) { + fail("Should not be able to create the consumer"); + } + }); + + assertEquals("Failed to construct kafka consumer", e.getMessage()); + + // The root cause is multiple exceptions deep. This code is more concise and should hopefully be trivial + // to update should the underlying implementation change. + Throwable cause = e.getCause(); + assertNotNull(cause); + assertInstanceOf(KafkaException.class, cause); + assertEquals("Failed to create new NetworkClient", cause.getMessage()); + + cause = cause.getCause(); + assertNotNull(cause); + assertInstanceOf(KafkaException.class, cause); + assertEquals(LoginException.class.getName() + ": No LoginModule found for org.example.InvalidLoginModule", cause.getMessage()); + + cause = cause.getCause(); + assertNotNull(cause); + assertInstanceOf(LoginException.class, cause); + assertEquals("No LoginModule found for org.example.InvalidLoginModule", cause.getMessage()); + } + private MetricName expectedMetricName(String clientId, String config, Class clazz) { Map expectedTags = new LinkedHashMap<>(); expectedTags.put("client-id", clientId); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java index 402697227ee80..534ba2e3bc139 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -22,23 +22,31 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Supplier; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; public class ApplicationEventHandlerTest { private final Time time = new MockTime(); + private final int initializationTimeoutMs = 50; private final BlockingQueue applicationEventsQueue = new LinkedBlockingQueue<>(); private final ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class); private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); @@ -53,6 +61,7 @@ public void testRecordApplicationEventQueueSize(String groupName) { ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler( new LogContext(), time, + initializationTimeoutMs, applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, @@ -65,4 +74,52 @@ public void testRecordApplicationEventQueueSize(String groupName) { verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1); } } + + @Test + public void testFailOnInitializeResources() { + RuntimeException rootFailure = new RuntimeException("root failure"); + KafkaException error = assertInitializeResourcesError( + KafkaException.class, + () -> { + throw rootFailure; + } + ); + assertEquals(rootFailure, error.getCause()); + } + + @Test + public void testDelayInInitializeResources() { + assertInitializeResourcesError( + TimeoutException.class, + () -> { + long delayMs = initializationTimeoutMs * 2; + org.apache.kafka.common.utils.Utils.sleep(delayMs); + return networkClientDelegate; + } + ); + } + + @Test + public void testInterruptInInitializeResources() { + Thread.currentThread().interrupt(); + assertInitializeResourcesError(InterruptException.class, () -> networkClientDelegate); + } + + private T assertInitializeResourcesError(Class exceptionClass, + Supplier networkClientDelegateSupplier) { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = spy(new AsyncConsumerMetrics(metrics, "test-group"))) { + return assertThrows(exceptionClass, () -> new ApplicationEventHandler( + new LogContext(), + time, + initializationTimeoutMs, + applicationEventsQueue, + applicationEventReaper, + () -> applicationEventProcessor, + networkClientDelegateSupplier, + () -> requestManagers, + asyncConsumerMetrics + )); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index bccd4ebeb8149..21747e7afedb2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -223,7 +223,7 @@ private AsyncKafkaConsumer newConsumerWithStreamRebalanceData( new StringDeserializer(), new StringDeserializer(), time, - (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, + (logContext, time, initializationTimeoutMs, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, logContext -> backgroundEventReaper, (logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector, (consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata, @@ -238,7 +238,7 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, + (logContext, time, initializationTimeoutMs, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, logContext -> backgroundEventReaper, (logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector, (consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index edb244b06aa26..8eae3cd0dd84a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -141,7 +141,7 @@ private ShareConsumerImpl newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g, h) -> applicationEventHandler, + (a, b, c, d, e, f, g, h, i) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e) -> fetchCollector, backgroundEventQueue