From 1432f09d1bf3d1d1a8d97c68a1a2d9b864d309b7 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 17 Jan 2025 20:00:11 -0800 Subject: [PATCH] KAFKA-17402: DefaultStateUpdated should transite task atomically --- .../kafka/streams/processor/internals/DefaultStateUpdater.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 306f4691e2f9a..60bcc7474a8f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -673,7 +673,6 @@ private void maybeCompleteRestoration(final StreamTask task, measureCheckpointLatency(() -> task.maybeCheckpoint(true)); changelogReader.unregister(changelogPartitions); addToRestoredTasks(task); - updatingTasks.remove(task.id()); log.info("Stateful active task " + task.id() + " completed restoration"); transitToUpdateStandbysIfOnlyStandbysLeft(); } @@ -689,6 +688,7 @@ private void addToRestoredTasks(final StreamTask task) { restoredActiveTasksLock.lock(); try { restoredActiveTasks.add(task); + updatingTasks.remove(task.id()); log.debug("Active task " + task.id() + " was added to the restored tasks"); restoredActiveTasksCondition.signalAll(); } finally {