From ff88da7ce7bb2823d1283d7f13d1517729513c36 Mon Sep 17 00:00:00 2001 From: Uladzislau Blok <blokv75@gmail.com> Date: Thu, 10 Apr 2025 20:58:52 +0200 Subject: [PATCH 1/3] fix: create log without context in create method --- .../apache/kafka/streams/processor/internals/StreamThread.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a96d968ca27f0..a4ece227b6f14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -70,6 +70,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; @@ -394,7 +395,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final String logPrefix = String.format("stream-thread [%s] ", threadId); final LogContext logContext = new LogContext(logPrefix); final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext; - final Logger log = logContext.logger(StreamThread.class); + final Logger log = LoggerFactory.getLogger(StreamThread.class); final ReferenceContainer referenceContainer = new ReferenceContainer(); referenceContainer.adminClient = adminClient; From 726554e7ce524a1bfee26c46c16e86ca2af25b4a Mon Sep 17 00:00:00 2001 From: Uladzislau Blok <blokv75@gmail.com> Date: Tue, 15 Apr 2025 18:57:27 +0200 Subject: [PATCH 2/3] Mark logger calls --- .../processor/internals/ActiveTaskCreator.java | 7 ++----- .../processor/internals/StandbyTaskCreator.java | 6 +++--- .../streams/processor/internals/StreamThread.java | 13 +++++++------ 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 93953d815bee4..448853c63670b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -77,7 +77,7 @@ class ActiveTaskCreator { final String threadId, final int threadIdx, final UUID processId, - final Logger log, + final LogContext logContext, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { this.topologyMetadata = topologyMetadata; @@ -91,15 +91,12 @@ class ActiveTaskCreator { this.threadId = threadId; this.threadIdx = threadIdx; this.processId = processId; - this.log = log; + this.log = logContext.logger(getClass()); this.stateUpdaterEnabled = stateUpdaterEnabled; this.processingThreadsEnabled = processingThreadsEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); - final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); - final LogContext logContext = new LogContext(threadIdPrefix); - streamsProducer = new StreamsProducer( producer(), processingMode(applicationConfig), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 668bd832c86d8..2a750949005b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -52,20 +52,20 @@ class StandbyTaskCreator { final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final String threadId, - final Logger log, + final LogContext logContext, final boolean stateUpdaterEnabled) { this.topologyMetadata = topologyMetadata; this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; - this.log = log; + this.log = logContext.logger(getClass()); this.stateUpdaterEnabled = stateUpdaterEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); dummyCache = new ThreadCache( - new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), + logContext, 0, streamsMetrics ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a4ece227b6f14..61025d29c73ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -403,7 +403,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, referenceContainer.time = time; referenceContainer.clientTags = config.getClientTags(); - log.info("Creating restore consumer client"); + log.info("Creating restore consumer client for thread {}", threadId); final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId)); final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); @@ -432,7 +432,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, threadId, threadIdx, processId, - log, + logContext, stateUpdaterEnabled, proceessingThreadsEnabled ); @@ -443,7 +443,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, stateDirectory, changelogReader, threadId, - log, + logContext, stateUpdaterEnabled); final Tasks tasks = new Tasks(new LogContext(logPrefix)); @@ -481,7 +481,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, ); referenceContainer.taskManager = taskManager; - log.info("Creating consumer client"); + log.info("Creating consumer client for thread {}", threadId); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx); consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); @@ -492,7 +492,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } - final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, consumerConfigs); + final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, threadId, consumerConfigs); taskManager.setMainConsumer(mainConsumerSetup.mainConsumer); referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer; @@ -534,12 +534,13 @@ private static MainConsumerSetup setupMainConsumer(final TopologyMetadata topolo final KafkaClientSupplier clientSupplier, final UUID processId, final Logger log, + final String threadId, final Map<String, Object> consumerConfigs) { if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) { if (topologyMetadata.hasNamedTopologies()) { throw new IllegalStateException("Named topologies and the CONSUMER protocol cannot be used at the same time."); } - log.info("Streams rebalance protocol enabled"); + log.info("Streams rebalance protocol enabled for thread {}", threadId); final Optional<StreamsRebalanceData> streamsRebalanceData = Optional.of( initStreamsRebalanceData( From ec68f5a2ce193486fb90cd58e014264305b78b03 Mon Sep 17 00:00:00 2001 From: Uladzislau Blok <blokv75@gmail.com> Date: Sat, 19 Apr 2025 18:42:19 +0200 Subject: [PATCH 3/3] update tests --- .../apache/kafka/streams/processor/internals/StreamThread.java | 2 +- .../streams/processor/internals/ActiveTaskCreatorTest.java | 2 +- .../kafka/streams/processor/internals/StreamThreadTest.java | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 61025d29c73ef..2d182365983bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -446,7 +446,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, logContext, stateUpdaterEnabled); - final Tasks tasks = new Tasks(new LogContext(logPrefix)); + final Tasks tasks = new Tasks(logContext); final boolean processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 17eb27bb3d8f1..362c32592ca8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -279,7 +279,7 @@ private void createTasks() { "clientId-StreamThread-0", 0, uuid, - new LogContext().logger(ActiveTaskCreator.class), + new LogContext(), false, false); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index fa72b116f004c..18cee43a10e80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -3653,7 +3653,6 @@ private void setupInternalTopologyWithoutState(final StreamsConfig config) { // TODO: change return type to `StandbyTask` private Collection<Task> createStandbyTask(final StreamsConfig config) { final LogContext logContext = new LogContext("test"); - final Logger log = logContext.logger(StreamThreadTest.class); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( @@ -3663,7 +3662,7 @@ private Collection<Task> createStandbyTask(final StreamsConfig config) { stateDirectory, new MockChangelogReader(), CLIENT_ID, - log, + logContext, false); return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet())); }