diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 448853c63670b..b39f470b0bb90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -62,7 +62,6 @@ class ActiveTaskCreator { private final Logger log; private final Sensor createTaskSensor; private final StreamsProducer streamsProducer; - private final boolean stateUpdaterEnabled; private final boolean processingThreadsEnabled; private boolean isClosed = false; @@ -78,7 +77,6 @@ class ActiveTaskCreator { final int threadIdx, final UUID processId, final LogContext logContext, - final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { this.topologyMetadata = topologyMetadata; this.applicationConfig = applicationConfig; @@ -92,7 +90,6 @@ class ActiveTaskCreator { this.threadIdx = threadIdx; this.processId = processId; this.log = logContext.logger(getClass()); - this.stateUpdaterEnabled = stateUpdaterEnabled; this.processingThreadsEnabled = processingThreadsEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); @@ -157,7 +154,7 @@ public Collection createTasks(final Consumer consumer, storeChangelogReader, topology.storeToChangelogTopic(), partitions, - stateUpdaterEnabled); + true); final InternalProcessorContext context = new ProcessorContextImpl( taskId, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index eb8bcafea695a..b03f620382aac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -44,7 +44,6 @@ class StandbyTaskCreator { private final ThreadCache dummyCache; private final Logger log; private final Sensor createTaskSensor; - private final boolean stateUpdaterEnabled; StandbyTaskCreator(final TopologyMetadata topologyMetadata, final StreamsConfig applicationConfig, @@ -52,15 +51,13 @@ class StandbyTaskCreator { final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final String threadId, - final LogContext logContext, - final boolean stateUpdaterEnabled) { + final LogContext logContext) { this.topologyMetadata = topologyMetadata; this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; this.log = logContext.logger(getClass()); - this.stateUpdaterEnabled = stateUpdaterEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); @@ -90,7 +87,7 @@ Collection createTasks(final Map> tasksToBeCre storeChangelogReader, topology.storeToChangelogTopic(), partitions, - stateUpdaterEnabled); + true); final InternalProcessorContext context = new ProcessorContextImpl( taskId, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index a95d20ddae0a1..173565be704be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -204,7 +204,7 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); - final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); + final boolean stateUpdaterEnabled = true; // discover all non-empty task directories in StateDirectory for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index c91e9b38b9923..d842f3eb693b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.StandbyUpdateListener; @@ -236,7 +235,7 @@ public StoreChangelogReader(final Time time, this.stateRestoreListener = stateRestoreListener; this.standbyUpdateListener = standbyUpdateListener; - this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); + this.stateUpdaterEnabled = true; this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d893220621721..ca0525806fa41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -393,7 +393,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Runnable shutdownErrorHook, final BiConsumer streamsUncaughtExceptionHandler) { - final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); + final boolean stateUpdaterEnabled = true; final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx; final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING); @@ -440,7 +440,6 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, threadIdx, processId, logContext, - stateUpdaterEnabled, proceessingThreadsEnabled ); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( @@ -450,8 +449,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, stateDirectory, changelogReader, threadId, - logContext, - stateUpdaterEnabled); + logContext); final Tasks tasks = new Tasks(logContext); final boolean processingThreadsEnabled = @@ -858,7 +856,7 @@ public StreamThread(final Time time, this.numIterations = 1; this.eosEnabled = eosEnabled(config); - this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); + this.stateUpdaterEnabled = true; this.processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 362c32592ca8e..bc997b8a3e801 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -280,7 +280,6 @@ private void createTasks() { 0, uuid, new LogContext(), - false, false); assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 69655b4d6426a..053b82a4caf8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -46,6 +46,7 @@ import org.apache.kafka.test.StreamsTestUtils; import org.apache.logging.log4j.Level; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -416,6 +417,7 @@ public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType type) shouldPollWithRightTimeout(true, type); } + @Disabled @ParameterizedTest @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"}) public void shouldPollWithRightTimeoutWithoutStateUpdater(final Task.TaskType type) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 213ff8a5b10fb..81f0beea6951b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -228,7 +228,6 @@ public class StreamThreadTest { static Stream data() { return Stream.of( - Arguments.of(false, false), Arguments.of(true, false), Arguments.of(true, true) ); @@ -4317,8 +4316,7 @@ private Collection createStandbyTask(final StreamsConfig config) { stateDirectory, new MockChangelogReader(), CLIENT_ID, - logContext, - false); + logContext); return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet())); }