Skip to content

Commit c6d452b

Browse files
committed
KAFKA-17561: add processId tag to thread-state metric (#18581)
Part of KIP-1091. Reviewers: Bill Bejeck <bill@confluent.io>
1 parent fa0475b commit c6d452b

7 files changed

Lines changed: 82 additions & 13 deletions

File tree

docs/ops.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2826,7 +2826,7 @@ <h5 class="anchor-heading"><a id="kafka_streams_thread_monitoring" class="anchor
28262826
<tr>
28272827
<td>thread-state</td>
28282828
<td>The state of the thread as a number (<code>ordinal()</code> of the corresponding enum).</td>
2829-
<td>kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)</td>
2829+
<td>kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+),process-id=([-.\w]+)</td>
28302830
</tr>
28312831
<tr>
28322832
<td>commit-latency-avg</td>

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
497497
stateUpdater,
498498
streamsMetrics,
499499
topologyMetadata,
500+
processId,
500501
threadId,
501502
logContext,
502503
referenceContainer.assignmentErrorCode,
@@ -574,6 +575,7 @@ public StreamThread(final Time time,
574575
final StateUpdater stateUpdater,
575576
final StreamsMetricsImpl streamsMetrics,
576577
final TopologyMetadata topologyMetadata,
578+
final UUID processId,
577579
final String threadId,
578580
final LogContext logContext,
579581
final AtomicInteger assignmentErrorCode,
@@ -618,6 +620,7 @@ public StreamThread(final Time time,
618620
time.milliseconds()
619621
);
620622
ThreadMetrics.addThreadStateTelemetryMetric(
623+
processId.toString(),
621624
threadId,
622625
streamsMetrics,
623626
(metricConfig, now) -> this.state().ordinal());

streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,15 @@ public <T> void addThreadLevelMutableMetric(final String name,
232232
final String description,
233233
final String threadId,
234234
final Gauge<T> valueProvider) {
235+
addThreadLevelMutableMetric(name, description, threadId, Collections.emptyMap(), valueProvider);
236+
}
237+
public <T> void addThreadLevelMutableMetric(final String name,
238+
final String description,
239+
final String threadId,
240+
final Map<String, String> additionalTags,
241+
final Gauge<T> valueProvider) {
235242
final MetricName metricName = metrics.metricName(
236-
name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
243+
name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId, additionalTags));
237244
synchronized (threadLevelMetrics) {
238245
threadLevelMetrics.computeIfAbsent(
239246
threadSensorPrefix(threadId),
@@ -279,7 +286,11 @@ public Map<String, String> clientLevelTagMap() {
279286
}
280287

281288
public Map<String, String> threadLevelTagMap(final String threadId) {
282-
final Map<String, String> tagMap = new LinkedHashMap<>();
289+
return threadLevelTagMap(threadId, Collections.emptyMap());
290+
}
291+
292+
public Map<String, String> threadLevelTagMap(final String threadId, final Map<String, String> additionalTags) {
293+
final Map<String, String> tagMap = new LinkedHashMap<>(additionalTags);
283294
tagMap.put(THREAD_ID_TAG, threadId);
284295
return tagMap;
285296
}
@@ -325,32 +336,40 @@ public final void removeAllThreadLevelMetrics(final String threadId) {
325336
}
326337

327338
public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) {
328-
final Map<String, String> tagMap = threadLevelTagMap(threadId);
339+
final Map<String, String> tagMap = new LinkedHashMap<>();
340+
tagMap.put(THREAD_ID_TAG, threadId);
329341
tagMap.put(TASK_ID_TAG, taskId);
330342
return tagMap;
331343
}
332344

333345
public Map<String, String> nodeLevelTagMap(final String threadId,
334-
final String taskName,
346+
final String taskId,
335347
final String processorNodeName) {
336-
final Map<String, String> tagMap = taskLevelTagMap(threadId, taskName);
348+
final Map<String, String> tagMap = new LinkedHashMap<>();
349+
tagMap.put(THREAD_ID_TAG, threadId);
350+
tagMap.put(TASK_ID_TAG, taskId);
337351
tagMap.put(PROCESSOR_NODE_ID_TAG, processorNodeName);
338352
return tagMap;
339353
}
340354

341355
public Map<String, String> topicLevelTagMap(final String threadId,
342-
final String taskName,
356+
final String taskId,
343357
final String processorNodeName,
344358
final String topicName) {
345-
final Map<String, String> tagMap = nodeLevelTagMap(threadId, taskName, processorNodeName);
359+
final Map<String, String> tagMap = new LinkedHashMap<>();
360+
tagMap.put(THREAD_ID_TAG, threadId);
361+
tagMap.put(TASK_ID_TAG, taskId);
362+
tagMap.put(PROCESSOR_NODE_ID_TAG, processorNodeName);
346363
tagMap.put(TOPIC_NAME_TAG, topicName);
347364
return tagMap;
348365
}
349366

350-
public Map<String, String> storeLevelTagMap(final String taskName,
367+
public Map<String, String> storeLevelTagMap(final String taskId,
351368
final String storeType,
352369
final String storeName) {
353-
final Map<String, String> tagMap = taskLevelTagMap(Thread.currentThread().getName(), taskName);
370+
final Map<String, String> tagMap = new LinkedHashMap<>();
371+
tagMap.put(THREAD_ID_TAG, Thread.currentThread().getName());
372+
tagMap.put(TASK_ID_TAG, taskId);
354373
tagMap.put(storeType + "-" + STORE_ID_TAG, storeName);
355374
return tagMap;
356375
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.apache.kafka.streams.processor.internals.StreamThread;
2323
import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
2424

25+
import java.util.Collections;
2526
import java.util.Map;
2627

2728
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
29+
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
2830
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION;
2931
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
3032
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
@@ -296,13 +298,15 @@ public static void addThreadStartTimeMetric(final String threadId,
296298
);
297299
}
298300

299-
public static void addThreadStateTelemetryMetric(final String threadId,
301+
public static void addThreadStateTelemetryMetric(final String processId,
302+
final String threadId,
300303
final StreamsMetricsImpl streamsMetrics,
301304
final Gauge<Integer> threadStateProvider) {
302305
streamsMetrics.addThreadLevelMutableMetric(
303306
THREAD_STATE,
304307
THREAD_STATE_DESCRIPTION,
305308
threadId,
309+
Collections.singletonMap(PROCESS_ID_TAG, processId),
306310
threadStateProvider
307311
);
308312
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,7 @@ public long restore(final Map<TaskId, Task> tasks) {
14401440
null,
14411441
streamsMetrics,
14421442
new TopologyMetadata(internalTopologyBuilder, config),
1443+
PROCESS_ID,
14431444
CLIENT_ID,
14441445
new LogContext(""),
14451446
new AtomicInteger(),
@@ -2666,6 +2667,7 @@ public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolea
26662667
null,
26672668
streamsMetrics,
26682669
topologyMetadata,
2670+
PROCESS_ID,
26692671
CLIENT_ID,
26702672
new LogContext(""),
26712673
new AtomicInteger(),
@@ -2724,6 +2726,7 @@ public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHan
27242726
null,
27252727
streamsMetrics,
27262728
topologyMetadata,
2729+
PROCESS_ID,
27272730
CLIENT_ID,
27282731
new LogContext(""),
27292732
new AtomicInteger(),
@@ -2791,6 +2794,7 @@ public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final
27912794
null,
27922795
streamsMetrics,
27932796
topologyMetadata,
2797+
PROCESS_ID,
27942798
CLIENT_ID,
27952799
new LogContext(""),
27962800
new AtomicInteger(),
@@ -2854,6 +2858,7 @@ public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveT
28542858
null,
28552859
streamsMetrics,
28562860
topologyMetadata,
2861+
PROCESS_ID,
28572862
CLIENT_ID,
28582863
new LogContext(""),
28592864
new AtomicInteger(),
@@ -2914,6 +2919,7 @@ public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInac
29142919
null,
29152920
streamsMetrics,
29162921
topologyMetadata,
2922+
PROCESS_ID,
29172923
CLIENT_ID,
29182924
new LogContext(""),
29192925
new AtomicInteger(),
@@ -3147,6 +3153,7 @@ public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final
31473153
null,
31483154
streamsMetrics,
31493155
topologyMetadata,
3156+
PROCESS_ID,
31503157
CLIENT_ID,
31513158
new LogContext(""),
31523159
new AtomicInteger(),
@@ -3203,6 +3210,7 @@ public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, fi
32033210
null,
32043211
streamsMetrics,
32053212
topologyMetadata,
3213+
PROCESS_ID,
32063214
CLIENT_ID,
32073215
new LogContext(""),
32083216
new AtomicInteger(),
@@ -3582,6 +3590,7 @@ private StreamThread setUpThread(final Properties streamsConfigProps) {
35823590
null,
35833591
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime),
35843592
topologyMetadata,
3593+
PROCESS_ID,
35853594
"thread-id",
35863595
new LogContext(),
35873596
null,
@@ -3703,6 +3712,7 @@ private StreamThread buildStreamThread(final Consumer<byte[], byte[]> consumer,
37033712
null,
37043713
streamsMetrics,
37053714
topologyMetadata,
3715+
PROCESS_ID,
37063716
CLIENT_ID,
37073717
new LogContext(""),
37083718
new AtomicInteger(),

streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,7 +1281,35 @@ public void shouldAddThreadLevelMutableMetric() {
12811281
final MetricName name = metrics.metricName(
12821282
"foobar",
12831283
THREAD_LEVEL_GROUP,
1284-
Collections.singletonMap("thread-id", "t1")
1284+
mkMap(
1285+
mkEntry("thread-id", "t1")
1286+
)
1287+
);
1288+
assertThat(metrics.metric(name), notNullValue());
1289+
assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
1290+
}
1291+
1292+
@Test
1293+
public void shouldAddThreadLevelMutableMetricWithAdditionalTags() {
1294+
final int measuredValue = 123;
1295+
final StreamsMetricsImpl streamsMetrics
1296+
= new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
1297+
1298+
streamsMetrics.addThreadLevelMutableMetric(
1299+
"foobar",
1300+
"test metric",
1301+
"t1",
1302+
Collections.singletonMap("additional-tag", "additional-value"),
1303+
(c, t) -> measuredValue
1304+
);
1305+
1306+
final MetricName name = metrics.metricName(
1307+
"foobar",
1308+
THREAD_LEVEL_GROUP,
1309+
mkMap(
1310+
mkEntry("thread-id", "t1"),
1311+
mkEntry("additional-tag", "additional-value")
1312+
)
12851313
);
12861314
assertThat(metrics.metric(name), notNullValue());
12871315
assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
@@ -1325,7 +1353,9 @@ public void shouldAddThreadLevelImmutableMetric() {
13251353
final MetricName name = metrics.metricName(
13261354
"foobar",
13271355
THREAD_LEVEL_GROUP,
1328-
Collections.singletonMap("thread-id", "t1")
1356+
mkMap(
1357+
mkEntry("thread-id", "t1")
1358+
)
13291359
);
13301360
assertThat(metrics.metric(name), notNullValue());
13311361
assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));

streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
public class ThreadMetricsTest {
4343

44+
private static final String PROCESS_ID = "process-id";
4445
private static final String THREAD_ID = "thread-id";
4546
private static final String THREAD_LEVEL_GROUP = "stream-thread-metrics";
4647

@@ -418,6 +419,7 @@ public void shouldAddThreadStartTimeMetric() {
418419
public void shouldAddThreadStateTelemetryMetric() {
419420
final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal();
420421
ThreadMetrics.addThreadStateTelemetryMetric(
422+
PROCESS_ID,
421423
THREAD_ID,
422424
streamsMetrics,
423425
threadStateProvider
@@ -426,6 +428,7 @@ public void shouldAddThreadStateTelemetryMetric() {
426428
"thread-state",
427429
"The current state of the thread",
428430
THREAD_ID,
431+
Collections.singletonMap("process-id", PROCESS_ID),
429432
threadStateProvider
430433
);
431434
}

0 commit comments

Comments
 (0)