-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19831: Improved error handling in DefaultStateUpdater. #20767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 1 commit
d549788
80d297f
cf9cf54
f98c083
e6de6cd
bc1737c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.kafka.streams.integration; | ||
|
|
||
| import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
| import org.apache.kafka.common.serialization.Serdes; | ||
| import org.apache.kafka.common.utils.MockTime; | ||
| import org.apache.kafka.streams.KafkaStreams; | ||
| import org.apache.kafka.streams.StreamsConfig; | ||
| import org.apache.kafka.streams.TopologyWrapper; | ||
| import org.apache.kafka.streams.errors.ProcessorStateException; | ||
| import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; | ||
| import org.apache.kafka.streams.processor.StateStore; | ||
| import org.apache.kafka.streams.processor.StateStoreContext; | ||
| import org.apache.kafka.streams.state.KeyValueStore; | ||
| import org.apache.kafka.streams.state.StoreBuilder; | ||
| import org.apache.kafka.streams.state.internals.AbstractStoreBuilder; | ||
| import org.apache.kafka.test.MockApiProcessorSupplier; | ||
| import org.apache.kafka.test.MockKeyValueStore; | ||
| import org.apache.kafka.test.TestUtils; | ||
| import org.junit.jupiter.api.AfterEach; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.TestInfo; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Duration; | ||
| import java.util.Properties; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
|
||
| public class StateUpdaterFailureIntegrationTest { | ||
|
|
||
| private static final int NUM_BROKERS = 1; | ||
| protected static final String INPUT_TOPIC_NAME = "input-topic"; | ||
| private static final int NUM_PARTITIONS = 6; | ||
|
|
||
| private final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS); | ||
|
|
||
| private Properties streamsConfiguration; | ||
| private final MockTime mockTime = cluster.time; | ||
| private KafkaStreams streams; | ||
|
|
||
| @BeforeEach | ||
| public void before(final TestInfo testInfo) throws InterruptedException, IOException { | ||
| cluster.start(); | ||
| cluster.createTopic(INPUT_TOPIC_NAME, NUM_PARTITIONS, 1); | ||
| streamsConfiguration = new Properties(); | ||
| final String safeTestName = safeUniqueTestName(testInfo); | ||
| streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); | ||
| streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); | ||
| streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
| streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); | ||
| streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); | ||
| streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); | ||
| streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); | ||
| streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); | ||
| streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); | ||
|
|
||
| } | ||
|
|
||
| @AfterEach | ||
| public void after() { | ||
| cluster.stop(); | ||
| if (streams != null) { | ||
| streams.close(Duration.ofSeconds(30)); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void correctlyHandleFlushErrorsDuringRebalance() throws InterruptedException { | ||
|
||
| final AtomicInteger numberOfStoreInits = new AtomicInteger(); | ||
| final AtomicReference<KafkaStreams.State> currentState = new AtomicReference<>(); | ||
|
|
||
| final StoreBuilder<KeyValueStore<Object, Object>> storeBuilder = new AbstractStoreBuilder<>("testStateStore", Serdes.Integer(), Serdes.ByteArray(), new MockTime()) { | ||
|
|
||
| @Override | ||
| public KeyValueStore<Object, Object> build() { | ||
| return new MockKeyValueStore(name, false) { | ||
|
|
||
| @Override | ||
| public void init(final StateStoreContext stateStoreContext, final StateStore root) { | ||
| super.init(stateStoreContext, root); | ||
| numberOfStoreInits.incrementAndGet(); | ||
| } | ||
|
|
||
| @Override | ||
| public void flush() { | ||
| if (numberOfStoreInits.get() == NUM_PARTITIONS * 1.5) { | ||
mjsax marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try { | ||
| TestUtils.waitForCondition(() -> currentState.get() == KafkaStreams.State.PENDING_SHUTDOWN, "Streams never reached PENDING_SHUTDOWN state"); | ||
|
||
| } catch (final InterruptedException e) { | ||
mjsax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| throw new RuntimeException(e); | ||
| } | ||
| throw new ProcessorStateException("flush"); | ||
| } | ||
| } | ||
| }; | ||
| } | ||
| }; | ||
|
|
||
| final TopologyWrapper topology = new TopologyWrapper(); | ||
| topology.addSource("ingest", INPUT_TOPIC_NAME); | ||
| topology.addProcessor("my-processor", new MockApiProcessorSupplier<>(), "ingest"); | ||
| topology.addStateStore(storeBuilder, "my-processor"); | ||
|
|
||
| streams = new KafkaStreams(topology, streamsConfiguration); | ||
| streams.setStateListener((newState, oldState) -> currentState.set(newState)); | ||
| streams.start(); | ||
|
|
||
| TestUtils.waitForCondition(() -> currentState.get() == KafkaStreams.State.RUNNING, "Streams never reached RUNNING state"); | ||
|
|
||
| streams.removeStreamThread(); | ||
|
|
||
| TestUtils.waitForCondition(() -> numberOfStoreInits.get() == NUM_PARTITIONS * 1.5, "Streams never reinitialized the store enough times"); | ||
mjsax marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| assertTrue(streams.close(Duration.ofSeconds(60))); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -208,11 +208,7 @@ private void performActionsOnTasks() { | |
| addTask(taskAndAction.task()); | ||
| break; | ||
| case REMOVE: | ||
| if (taskAndAction.futureForRemove() == null) { | ||
| removeTask(taskAndAction.taskId()); | ||
| } else { | ||
| removeTask(taskAndAction.taskId(), taskAndAction.futureForRemove()); | ||
| } | ||
| removeTask(taskAndAction.taskId(), taskAndAction.futureForRemove()); | ||
mjsax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| break; | ||
| default: | ||
| throw new IllegalStateException("Unknown action type " + action); | ||
|
|
@@ -349,23 +345,26 @@ private void handleTaskCorruptedException(final TaskCorruptedException taskCorru | |
| // TODO: we can let the exception encode the actual corrupted changelog partitions and only | ||
| // mark those instead of marking all changelogs | ||
| private void removeCheckpointForCorruptedTask(final Task task) { | ||
| task.markChangelogAsCorrupted(task.changelogPartitions()); | ||
| try { | ||
| task.markChangelogAsCorrupted(task.changelogPartitions()); | ||
|
|
||
| // we need to enforce a checkpoint that removes the corrupted partitions | ||
| measureCheckpointLatency(() -> task.maybeCheckpoint(true)); | ||
| // we need to enforce a checkpoint that removes the corrupted partitions | ||
| measureCheckpointLatency(() -> task.maybeCheckpoint(true)); | ||
| } catch (final StreamsException e) { | ||
|
||
| log.warn("Checkpoint failed for corrupted task {}", task.id(), e); | ||
| } | ||
| } | ||
|
|
||
| private void handleStreamsException(final StreamsException streamsException) { | ||
| log.info("Encountered streams exception: ", streamsException); | ||
| if (streamsException.taskId().isPresent()) { | ||
| handleStreamsExceptionWithTask(streamsException); | ||
| handleStreamsExceptionWithTask(streamsException, streamsException.taskId().get()); | ||
Nikita-Shupletsov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } else { | ||
| handleStreamsExceptionWithoutTask(streamsException); | ||
| } | ||
| } | ||
|
|
||
| private void handleStreamsExceptionWithTask(final StreamsException streamsException) { | ||
| final TaskId failedTaskId = streamsException.taskId().get(); | ||
| private void handleStreamsExceptionWithTask(final StreamsException streamsException, final TaskId failedTaskId) { | ||
mjsax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (updatingTasks.containsKey(failedTaskId)) { | ||
| addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks( | ||
| new ExceptionAndTask(streamsException, updatingTasks.get(failedTaskId)) | ||
|
|
@@ -518,7 +517,7 @@ private void removeTask(final TaskId taskId, final CompletableFuture<RemovedTask | |
| + " own this task.", taskId); | ||
| } | ||
| } catch (final StreamsException streamsException) { | ||
| handleStreamsException(streamsException); | ||
| handleStreamsExceptionWithTask(streamsException, taskId); | ||
| future.completeExceptionally(streamsException); | ||
| } catch (final RuntimeException runtimeException) { | ||
| handleRuntimeException(runtimeException); | ||
|
|
@@ -607,44 +606,22 @@ private boolean removeFailedTask(final TaskId taskId, final CompletableFuture<Re | |
| } | ||
| } | ||
|
|
||
| private void removeTask(final TaskId taskId) { | ||
| final Task task; | ||
| if (updatingTasks.containsKey(taskId)) { | ||
| task = updatingTasks.get(taskId); | ||
| private void pauseTask(final Task task) { | ||
Nikita-Shupletsov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final TaskId taskId = task.id(); | ||
| // do not need to unregister changelog partitions for paused tasks | ||
| try { | ||
| measureCheckpointLatency(() -> task.maybeCheckpoint(true)); | ||
| final Collection<TopicPartition> changelogPartitions = task.changelogPartitions(); | ||
| changelogReader.unregister(changelogPartitions); | ||
| removedTasks.add(task); | ||
| pausedTasks.put(taskId, task); | ||
| updatingTasks.remove(taskId); | ||
| if (task.isActive()) { | ||
| transitToUpdateStandbysIfOnlyStandbysLeft(); | ||
| } | ||
| log.info((task.isActive() ? "Active" : "Standby") | ||
| + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks."); | ||
| } else if (pausedTasks.containsKey(taskId)) { | ||
| task = pausedTasks.get(taskId); | ||
| final Collection<TopicPartition> changelogPartitions = task.changelogPartitions(); | ||
| changelogReader.unregister(changelogPartitions); | ||
| removedTasks.add(task); | ||
| pausedTasks.remove(taskId); | ||
| log.info((task.isActive() ? "Active" : "Standby") | ||
| + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks."); | ||
| } else { | ||
| log.info("Task " + taskId + " was not removed since it is not updating or paused."); | ||
| } | ||
| } | ||
| + " task " + task.id() + " was paused from the updating tasks and added to the paused tasks."); | ||
|
|
||
| private void pauseTask(final Task task) { | ||
| final TaskId taskId = task.id(); | ||
| // do not need to unregister changelog partitions for paused tasks | ||
| measureCheckpointLatency(() -> task.maybeCheckpoint(true)); | ||
| pausedTasks.put(taskId, task); | ||
| updatingTasks.remove(taskId); | ||
| if (task.isActive()) { | ||
| transitToUpdateStandbysIfOnlyStandbysLeft(); | ||
| } catch (final StreamsException streamsException) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems we are adding a similar
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the idea was to handle one task at a time instead of failing all of them together. so if we have 3 tasks, one fails but the other 2 succeed, we will have only one failed task instead of all 3.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Hard to read on the PR, as there is no loop here... But I am not even sure why we would want to fail a task if we cannot checkpoint for this case? We only \cc @lucasbru |
||
| handleStreamsExceptionWithTask(streamsException, taskId); | ||
| } | ||
| log.info((task.isActive() ? "Active" : "Standby") | ||
| + " task " + task.id() + " was paused from the updating tasks and added to the paused tasks."); | ||
| } | ||
|
|
||
| private void resumeTask(final Task task) { | ||
|
|
@@ -671,11 +648,15 @@ private void maybeCompleteRestoration(final StreamTask task, | |
| final Set<TopicPartition> restoredChangelogs) { | ||
| final Collection<TopicPartition> changelogPartitions = task.changelogPartitions(); | ||
| if (restoredChangelogs.containsAll(changelogPartitions)) { | ||
| measureCheckpointLatency(() -> task.maybeCheckpoint(true)); | ||
| changelogReader.unregister(changelogPartitions); | ||
| addToRestoredTasks(task); | ||
| log.info("Stateful active task " + task.id() + " completed restoration"); | ||
| transitToUpdateStandbysIfOnlyStandbysLeft(); | ||
| try { | ||
| measureCheckpointLatency(() -> task.maybeCheckpoint(true)); | ||
| changelogReader.unregister(changelogPartitions); | ||
| addToRestoredTasks(task); | ||
| log.info("Stateful active task " + task.id() + " completed restoration"); | ||
| transitToUpdateStandbysIfOnlyStandbysLeft(); | ||
| } catch (final StreamsException streamsException) { | ||
| handleStreamsExceptionWithTask(streamsException, task.id()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -707,8 +688,12 @@ private void maybeCheckpointTasks(final long now) { | |
|
|
||
| measureCheckpointLatency(() -> { | ||
| for (final Task task : updatingTasks.values()) { | ||
| // do not enforce checkpointing during restoration if its position has not advanced much | ||
| task.maybeCheckpoint(false); | ||
| try { | ||
| // do not enforce checkpointing during restoration if its position has not advanced much | ||
| task.maybeCheckpoint(false); | ||
| } catch (final StreamsException streamsException) { | ||
| handleStreamsExceptionWithTask(streamsException, task.id()); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,11 +55,6 @@ public static TaskAndAction createRemoveTask(final TaskId taskId, | |
| return new TaskAndAction(null, taskId, Action.REMOVE, future); | ||
| } | ||
|
|
||
| public static TaskAndAction createRemoveTask(final TaskId taskId) { | ||
|
||
| Objects.requireNonNull(taskId, "Task ID of task to remove is null!"); | ||
| return new TaskAndAction(null, taskId, Action.REMOVE, null); | ||
| } | ||
|
|
||
| public Task task() { | ||
| if (action != Action.ADD) { | ||
| throw new IllegalStateException("Action type " + action + " cannot have a task!"); | ||
|
|
@@ -84,4 +79,4 @@ public CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemove() { | |
| public Action action() { | ||
| return action; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,7 @@ | |
| import java.util.TreeSet; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
@@ -772,7 +773,7 @@ private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId, | |
| final CompletableFuture<StateUpdater.RemovedTaskResult> future) { | ||
| final StateUpdater.RemovedTaskResult removedTaskResult; | ||
| try { | ||
| removedTaskResult = future.get(); | ||
| removedTaskResult = future.get(1, TimeUnit.MINUTES); | ||
mjsax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (removedTaskResult == null) { | ||
| throw new IllegalStateException("Task " + taskId + " was not found in the state updater. " | ||
| + BUG_ERROR_MESSAGE); | ||
|
|
@@ -787,6 +788,10 @@ private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId, | |
| Thread.currentThread().interrupt(); | ||
| log.error(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen); | ||
| throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen); | ||
| } catch (final java.util.concurrent.TimeoutException timeoutException) { | ||
mjsax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| log.warn("The state updater wasn't able to remove task {} in time. The state updater thread may be dead. " | ||
| + BUG_ERROR_MESSAGE, taskId, timeoutException); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1567,6 +1572,12 @@ void shutdown(final boolean clean) { | |
|
|
||
| private void shutdownStateUpdater() { | ||
| if (stateUpdater != null) { | ||
| // If there are failed tasks handling them first | ||
| for (final StateUpdater.ExceptionAndTask exceptionAndTask : stateUpdater.drainExceptionsAndFailedTasks()) { | ||
| final Task failedTask = exceptionAndTask.task(); | ||
| closeTaskDirty(failedTask, false); | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Below, there is
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also just found (inside TaskManager.java): This does also "smell"...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about that, but then came across this change: #17018 and decided to leave it as is.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seem the other PR wants to make use of the timeout. But if we pass in MAX_VALUE this does not make much sense? As it effectively blocks forever... If we really want to block forever when joining the thread (ie, This is all very confusing to me. \cc @lucasbru can you chime in? |
||
| final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>(); | ||
| for (final Task task : stateUpdater.tasks()) { | ||
| final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id()); | ||
|
|
@@ -1583,10 +1594,16 @@ private void shutdownStateUpdater() { | |
| for (final Task task : tasksToCloseDirty) { | ||
| closeTaskDirty(task, false); | ||
| } | ||
| // Handling all failures that occurred during the remove process | ||
| for (final StateUpdater.ExceptionAndTask exceptionAndTask : stateUpdater.drainExceptionsAndFailedTasks()) { | ||
mjsax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final Task failedTask = exceptionAndTask.task(); | ||
| closeTaskDirty(failedTask, false); | ||
| } | ||
|
|
||
| // If there is anything left unhandled due to timeouts, handling now | ||
| for (final Task task : stateUpdater.tasks()) { | ||
| closeTaskDirty(task, false); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.