Skip to content

KAFKA-18066: Fix mismatched StreamThread ID in log messages #19517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ActiveTaskCreator {
final String threadId,
final int threadIdx,
final UUID processId,
final Logger log,
final LogContext logContext,
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled) {
this.topologyMetadata = topologyMetadata;
Expand All @@ -91,15 +91,12 @@ class ActiveTaskCreator {
this.threadId = threadId;
this.threadIdx = threadIdx;
this.processId = processId;
this.log = log;
this.log = logContext.logger(getClass());
this.stateUpdaterEnabled = stateUpdaterEnabled;
this.processingThreadsEnabled = processingThreadsEnabled;

createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);

final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
final LogContext logContext = new LogContext(threadIdPrefix);

streamsProducer = new StreamsProducer(
producer(),
processingMode(applicationConfig),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ class StandbyTaskCreator {
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
final String threadId,
final Logger log,
final LogContext logContext,
final boolean stateUpdaterEnabled) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
this.log = log;
this.log = logContext.logger(getClass());
this.stateUpdaterEnabled = stateUpdaterEnabled;

createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);

dummyCache = new ThreadCache(
new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
logContext,
0,
streamsMetrics
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.kafka.streams.state.internals.ThreadCache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -394,15 +395,15 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
final String logPrefix = String.format("stream-thread [%s] ", threadId);
final LogContext logContext = new LogContext(logPrefix);
final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext;
final Logger log = logContext.logger(StreamThread.class);
final Logger log = LoggerFactory.getLogger(StreamThread.class);

final ReferenceContainer referenceContainer = new ReferenceContainer();
referenceContainer.adminClient = adminClient;
referenceContainer.streamsMetadataState = streamsMetadataState;
referenceContainer.time = time;
referenceContainer.clientTags = config.getClientTags();

log.info("Creating restore consumer client");
log.info("Creating restore consumer client for thread {}", threadId);
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId));
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);

Expand Down Expand Up @@ -431,7 +432,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
threadId,
threadIdx,
processId,
log,
logContext,
stateUpdaterEnabled,
proceessingThreadsEnabled
);
Expand All @@ -442,10 +443,10 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
stateDirectory,
changelogReader,
threadId,
log,
logContext,
stateUpdaterEnabled);

final Tasks tasks = new Tasks(new LogContext(logPrefix));
final Tasks tasks = new Tasks(logContext);
final boolean processingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());

Expand Down Expand Up @@ -480,7 +481,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
);
referenceContainer.taskManager = taskManager;

log.info("Creating consumer client");
log.info("Creating consumer client for thread {}", threadId);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx);
consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
Expand All @@ -491,7 +492,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
}

final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, consumerConfigs);
final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, threadId, consumerConfigs);

taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;
Expand Down Expand Up @@ -533,12 +534,13 @@ private static MainConsumerSetup setupMainConsumer(final TopologyMetadata topolo
final KafkaClientSupplier clientSupplier,
final UUID processId,
final Logger log,
final String threadId,
final Map<String, Object> consumerConfigs) {
if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) {
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalStateException("Named topologies and the CONSUMER protocol cannot be used at the same time.");
}
log.info("Streams rebalance protocol enabled");
log.info("Streams rebalance protocol enabled for thread {}", threadId);

final Optional<StreamsRebalanceData> streamsRebalanceData = Optional.of(
initStreamsRebalanceData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private void createTasks() {
"clientId-StreamThread-0",
0,
uuid,
new LogContext().logger(ActiveTaskCreator.class),
new LogContext(),
false,
false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3653,7 +3653,6 @@ private void setupInternalTopologyWithoutState(final StreamsConfig config) {
// TODO: change return type to `StandbyTask`
private Collection<Task> createStandbyTask(final StreamsConfig config) {
final LogContext logContext = new LogContext("test");
final Logger log = logContext.logger(StreamThreadTest.class);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
Expand All @@ -3663,7 +3662,7 @@ private Collection<Task> createStandbyTask(final StreamsConfig config) {
stateDirectory,
new MockChangelogReader(),
CLIENT_ID,
log,
logContext,
false);
return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet()));
}
Expand Down