diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 306f4691e2f9..60bcc7474a8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -673,7 +673,6 @@ private void maybeCompleteRestoration(final StreamTask task, measureCheckpointLatency(() -> task.maybeCheckpoint(true)); changelogReader.unregister(changelogPartitions); addToRestoredTasks(task); - updatingTasks.remove(task.id()); log.info("Stateful active task " + task.id() + " completed restoration"); transitToUpdateStandbysIfOnlyStandbysLeft(); } @@ -689,6 +688,7 @@ private void addToRestoredTasks(final StreamTask task) { restoredActiveTasksLock.lock(); try { restoredActiveTasks.add(task); + updatingTasks.remove(task.id()); log.debug("Active task " + task.id() + " was added to the restored tasks"); restoredActiveTasksCondition.signalAll(); } finally {