Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class ActiveTaskCreator {
private final Logger log;
private final Sensor createTaskSensor;
private final StreamsProducer streamsProducer;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
private boolean isClosed = false;

Expand All @@ -78,7 +77,6 @@ class ActiveTaskCreator {
final int threadIdx,
final UUID processId,
final LogContext logContext,
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
Expand All @@ -92,7 +90,6 @@ class ActiveTaskCreator {
this.threadIdx = threadIdx;
this.processId = processId;
this.log = logContext.logger(getClass());
this.stateUpdaterEnabled = stateUpdaterEnabled;
this.processingThreadsEnabled = processingThreadsEnabled;

createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
Expand Down Expand Up @@ -157,7 +154,7 @@ public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
storeChangelogReader,
topology.storeToChangelogTopic(),
partitions,
stateUpdaterEnabled);
true);

final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,20 @@ class StandbyTaskCreator {
private final ThreadCache dummyCache;
private final Logger log;
private final Sensor createTaskSensor;
private final boolean stateUpdaterEnabled;

StandbyTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
final String threadId,
final LogContext logContext,
final boolean stateUpdaterEnabled) {
final LogContext logContext) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
this.log = logContext.logger(getClass());
this.stateUpdaterEnabled = stateUpdaterEnabled;

createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);

Expand Down Expand Up @@ -90,7 +87,7 @@ Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>> tasksToBeCre
storeChangelogReader,
topology.storeToChangelogTopic(),
partitions,
stateUpdaterEnabled);
true);

final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics);
final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we pass this into ProcessorStateManager only -- let's move the variable, and hardcode true for now.

final boolean stateUpdaterEnabled = true;

// discover all non-empty task directories in StateDirectory
for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
Expand Down Expand Up @@ -236,7 +235,7 @@ public StoreChangelogReader(final Time time,
this.stateRestoreListener = stateRestoreListener;
this.standbyUpdateListener = standbyUpdateListener;

this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only use this in one place, and it should be possible to remove the variable (and it's usage) completely, in one go

this.stateUpdaterEnabled = true;

this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {

final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is more complex. Might be better to split into a different PR, as it required more cleanup. Doing a partial cleanup for a single class, is not a "clean cut" and make reviewing more complex.

final boolean stateUpdaterEnabled = true;

final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING);
Expand Down Expand Up @@ -440,7 +440,6 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
threadIdx,
processId,
logContext,
stateUpdaterEnabled,
proceessingThreadsEnabled
);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
Expand All @@ -450,8 +449,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
stateDirectory,
changelogReader,
threadId,
logContext,
stateUpdaterEnabled);
logContext);

final Tasks tasks = new Tasks(logContext);
final boolean processingThreadsEnabled =
Expand Down Expand Up @@ -858,7 +856,7 @@ public StreamThread(final Time time,

this.numIterations = 1;
this.eosEnabled = eosEnabled(config);
this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
this.stateUpdaterEnabled = true;
this.processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals());
this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ private void createTasks() {
0,
uuid,
new LogContext(),
false,
false);

assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.test.StreamsTestUtils;

import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -416,6 +417,7 @@ public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType type)
shouldPollWithRightTimeout(true, type);
}

@Disabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not disable this test, but rather clean it up directly. Or exclude StoreChangelogReader from this PR.

@ParameterizedTest
@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
public void shouldPollWithRightTimeoutWithoutStateUpdater(final Task.TaskType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ public class StreamThreadTest {

static Stream<Arguments> data() {
return Stream.of(
Arguments.of(false, false),
Arguments.of(true, false),
Arguments.of(true, true)
);
Expand Down Expand Up @@ -4317,8 +4316,7 @@ private Collection<Task> createStandbyTask(final StreamsConfig config) {
stateDirectory,
new MockChangelogReader(),
CLIENT_ID,
logContext,
false);
logContext);
return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet()));
}

Expand Down