diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 93953d815bee4..448853c63670b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -77,7 +77,7 @@ class ActiveTaskCreator { final String threadId, final int threadIdx, final UUID processId, - final Logger log, + final LogContext logContext, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { this.topologyMetadata = topologyMetadata; @@ -91,15 +91,12 @@ class ActiveTaskCreator { this.threadId = threadId; this.threadIdx = threadIdx; this.processId = processId; - this.log = log; + this.log = logContext.logger(getClass()); this.stateUpdaterEnabled = stateUpdaterEnabled; this.processingThreadsEnabled = processingThreadsEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); - final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); - final LogContext logContext = new LogContext(threadIdPrefix); - streamsProducer = new StreamsProducer( producer(), processingMode(applicationConfig), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 668bd832c86d8..2a750949005b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -52,20 +52,20 @@ class StandbyTaskCreator { final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final String threadId, - final Logger log, + final LogContext logContext, final boolean stateUpdaterEnabled) { this.topologyMetadata = topologyMetadata; this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; - this.log = log; + this.log = logContext.logger(getClass()); this.stateUpdaterEnabled = stateUpdaterEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); dummyCache = new ThreadCache( - new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), + logContext, 0, streamsMetrics ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a96d968ca27f0..2d182365983bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -70,6 +70,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; @@ -394,7 +395,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final String logPrefix = String.format("stream-thread [%s] ", threadId); final LogContext logContext = new LogContext(logPrefix); final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext; - final Logger log = logContext.logger(StreamThread.class); + final Logger log = LoggerFactory.getLogger(StreamThread.class); final ReferenceContainer referenceContainer = new ReferenceContainer(); referenceContainer.adminClient = adminClient; @@ -402,7 +403,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, referenceContainer.time = time; referenceContainer.clientTags = config.getClientTags(); - log.info("Creating restore consumer client"); + log.info("Creating restore consumer client for thread {}", threadId); final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId)); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); @@ -431,7 +432,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, threadId, threadIdx, processId, - log, + logContext, stateUpdaterEnabled, proceessingThreadsEnabled ); @@ -442,10 +443,10 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, stateDirectory, changelogReader, threadId, - log, + logContext, stateUpdaterEnabled); - final Tasks tasks = new Tasks(new LogContext(logPrefix)); + final Tasks tasks = new Tasks(logContext); final boolean processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); @@ -480,7 +481,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, ); referenceContainer.taskManager = taskManager; - log.info("Creating consumer client"); + log.info("Creating consumer client for thread {}", threadId); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); final Map consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx); consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); @@ -491,7 +492,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } - final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, consumerConfigs); + final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, threadId, consumerConfigs); taskManager.setMainConsumer(mainConsumerSetup.mainConsumer); referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer; @@ -533,12 +534,13 @@ private static MainConsumerSetup setupMainConsumer(final TopologyMetadata topolo final KafkaClientSupplier clientSupplier, final UUID processId, final Logger log, + final String threadId, final Map consumerConfigs) { if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) { if (topologyMetadata.hasNamedTopologies()) { throw new IllegalStateException("Named topologies and the CONSUMER protocol cannot be used at the same time."); } - log.info("Streams rebalance protocol enabled"); + log.info("Streams rebalance protocol enabled for thread {}", threadId); final Optional streamsRebalanceData = Optional.of( initStreamsRebalanceData( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 17eb27bb3d8f1..362c32592ca8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -279,7 +279,7 @@ private void createTasks() { "clientId-StreamThread-0", 0, uuid, - new LogContext().logger(ActiveTaskCreator.class), + new LogContext(), false, false); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index fa72b116f004c..18cee43a10e80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -3653,7 +3653,6 @@ private void setupInternalTopologyWithoutState(final StreamsConfig config) { // TODO: change return type to `StandbyTask` private Collection createStandbyTask(final StreamsConfig config) { final LogContext logContext = new LogContext("test"); - final Logger log = logContext.logger(StreamThreadTest.class); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( @@ -3663,7 +3662,7 @@ private Collection createStandbyTask(final StreamsConfig config) { stateDirectory, new MockChangelogReader(), CLIENT_ID, - log, + logContext, false); return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet())); }