Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public List<MetricSample> queryMetrics(final MetricsQueryParams params) {
Set<String> metricFilter = params.metricNames() != null
? new HashSet<>(params.metricNames()) : null;

LOG.debugf("Scraping %d pod target(s), metric filter: %s",
targets.size(),
metricFilter != null ? metricFilter : "none");

List<MetricSample> allSamples = new ArrayList<>();

for (PodTarget target : targets) {
Expand All @@ -78,21 +82,26 @@ public List<MetricSample> queryMetrics(final MetricsQueryParams params) {
String body = kubernetesClient.raw(proxyUrl);
if (body != null && !body.isEmpty()) {
List<MetricSample> samples = PrometheusTextParser.parse(body, metricFilter);
LOG.debugf("Scraped %d sample(s) from pod %s/%s",
samples.size(), target.namespace(), target.podName());
for (MetricSample sample : samples) {
allSamples.add(MetricSample.of(sample.name(),
MetricLabelFilter.filterLabels(sample.labels()),
sample.value(), sample.timestamp()));
}
} else {
LOG.warnf("Empty metrics response from pod %s/%s",
LOG.debugf("Empty metrics response from pod %s/%s",
target.namespace(), target.podName());
}
} catch (Exception e) {
LOG.warnf("Error scraping metrics from pod %s/%s: %s",
LOG.debugf("Error scraping metrics from pod %s/%s: %s",
target.namespace(), target.podName(), e.getMessage());
}
}

LOG.debugf("Scraping complete: collected %d total sample(s) from %d target(s)",
allSamples.size(), targets.size());

return allSamples;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ public static final class ComponentTypes {
*/
public static final String KAFKA = "kafka";

/**
* Component type value for Kafka Exporter components.
*/
public static final String KAFKA_EXPORTER = "kafka-exporter";

/**
* Component types representing Kafka broker pods.
*/
public static final java.util.Set<String> BROKER_TYPES =
java.util.Set.of(KAFKA);

private ComponentTypes() {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ public final class StrimziToolsPrompts {
+ " Defaults to 'reconciliation' if no category"
+ " or metric names are provided.";

/**
* Kafka Exporter metrics category parameter description.
*/
public static final String EXPORTER_METRICS_CATEGORY_DESC =
"Metric category: 'consumer_lag', 'partitions',"
+ " or 'resources'."
+ " Defaults to 'consumer_lag' if omitted.";

/**
* Metric names parameter description.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright StreamsHub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streamshub.mcp.strimzi.config.metrics;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Curated metric name categories for Kafka Exporter metrics.
* Maps human-friendly category names to lists of Prometheus metric names,
* and provides interpretation guides for each category.
*/
public final class KafkaExporterMetricCategories {

private static final Map<String, List<String>> CATEGORIES = Map.of(
"consumer_lag", List.of(
"kafka_consumergroup_current_offset",
"kafka_consumergroup_lag",
"kafka_consumergroup_lag_seconds"
),
"partitions", List.of(
"kafka_topic_partitions",
"kafka_topic_partition_current_offset",
"kafka_topic_partition_oldest_offset",
"kafka_topic_partition_in_sync_replica",
"kafka_topic_partition_under_replicated_partition",
"kafka_topic_partition_replicas"
),
"resources", List.of(
"jvm_memory_used_bytes",
"jvm_memory_max_bytes",
"jvm_gc_collection_seconds_count",
"jvm_gc_collection_seconds_sum",
"process_cpu_seconds_total",
"jvm_threads_current"
)
);

private static final Map<String, String> DESCRIPTIONS = Map.of(
"consumer_lag",
"**[HIGH - CONSUMER GROUP LAG]**\n\n"
+ "kafka_consumergroup_lag: Number of messages a consumer group is behind the latest offset. "
+ "**THRESHOLDS**: 0 = fully caught up, <1000 = healthy, 1000-10000 = monitor, "
+ ">10000 = consumers falling behind (scale consumers or investigate slowness). "
+ "Sustained growth = consumers cannot keep up with producer throughput.\n\n"
+ "kafka_consumergroup_lag_seconds: Estimated time in seconds for the consumer group "
+ "to catch up. >60s = significant delay, >300s = investigate consumer health.\n\n"
+ "kafka_consumergroup_current_offset: Current committed offset per consumer group/partition. "
+ "Stalled offset = consumer is stuck or dead. Compare with partition end offset to calculate lag.",
"partitions",
"**[HIGH - PARTITION HEALTH]**\n\n"
+ "kafka_topic_partition_under_replicated_partition: Partitions where ISR count < replica count. "
+ "Should be 0. >0 means replicas are lagging — check broker health and disk I/O. "
+ "**TIME-SENSITIVE**: Transient during rolling restarts, persistent = data loss risk.\n\n"
+ "kafka_topic_partition_in_sync_replica: Number of in-sync replicas per partition. "
+ "Should equal kafka_topic_partition_replicas. Drop below min.insync.replicas = "
+ "producers with acks=all will fail.\n\n"
+ "kafka_topic_partition_replicas: Configured replica count per partition.\n\n"
+ "**[MEDIUM - PARTITION OFFSETS]**\n\n"
+ "kafka_topic_partition_current_offset: Latest offset (end of log) per partition. "
+ "Rate of change = write throughput per partition.\n\n"
+ "kafka_topic_partition_oldest_offset: Earliest available offset per partition. "
+ "Gap between oldest and current = retained data volume. "
+ "Oldest offset advancing = log segments being cleaned/compacted.\n\n"
+ "kafka_topic_partitions: Number of partitions per topic.",
"resources",
MetricsDescriptions.jvmDescription("get_kafka_cluster_pods",
"exporter scraping failures or missing metrics")
);

private KafkaExporterMetricCategories() {
// Utility class — no instantiation
}

/**
* Resolves a category name to its list of metric names.
*
* @param category the category name (case-insensitive)
* @return the list of metric names, or an empty list if the category is unknown
*/
public static List<String> resolve(final String category) {
if (category == null) {
return List.of();
}
return CATEGORIES.getOrDefault(category.toLowerCase(java.util.Locale.ROOT), List.of());
}

/**
* Returns all available category names.
*
* @return the set of category names
*/
public static Set<String> allCategories() {
return CATEGORIES.keySet();
}

/**
* Returns an interpretation guide for the given categories.
*
* @param categories the category names to get descriptions for
* @return a combined interpretation guide, or null if no categories match
*/
public static String interpretation(final List<String> categories) {
if (categories == null || categories.isEmpty()) {
return null;
}
String result = categories.stream()
.map(c -> c.toLowerCase(java.util.Locale.ROOT))
.filter(DESCRIPTIONS::containsKey)
.map(DESCRIPTIONS::get)
.collect(Collectors.joining("\n\n"));
return result.isEmpty() ? null : result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright StreamsHub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streamshub.mcp.strimzi.dto.metrics;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.streamshub.mcp.common.dto.metrics.MetricSample;
import io.streamshub.mcp.common.dto.metrics.MetricTimeSeries;

import java.time.Instant;
import java.util.List;

/**
* Response containing metrics data from a Kafka Exporter.
* For instant queries, metrics are returned as a flat list in {@code metrics}.
* For range queries, metrics are grouped into compact time series in {@code timeSeries}.
*
* @param clusterName the Kafka cluster name
* @param namespace the Kubernetes namespace
* @param provider the metrics provider used
* @param categories the metric categories requested
* @param metricCount the number of distinct metric names in the response
* @param sampleCount the total number of metric samples
* @param metrics the flat list of metric samples (instant queries)
* @param timeSeries the grouped time series (range queries)
* @param interpretation brief guide for interpreting the returned metrics
* @param timestamp the time this result was generated
* @param message a human-readable summary of the result
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public record KafkaExporterMetricsResponse(
@JsonProperty("cluster_name") String clusterName,
@JsonProperty("namespace") String namespace,
@JsonProperty("provider") String provider,
@JsonProperty("categories") List<String> categories,
@JsonProperty("metric_count") long metricCount,
@JsonProperty("sample_count") int sampleCount,
@JsonProperty("metrics") List<MetricSample> metrics,
@JsonProperty("time_series") List<MetricTimeSeries> timeSeries,
@JsonProperty("interpretation") String interpretation,
@JsonProperty("timestamp") Instant timestamp,
@JsonProperty("message") String message
) {

/**
* Creates a response with metric data. If samples have timestamps (range query),
* groups them into compact time series. Otherwise returns a flat metrics list.
*
* @param clusterName the Kafka cluster name
* @param namespace the Kubernetes namespace
* @param provider the metrics provider name
* @param categories the requested categories
* @param samples the metric samples
* @param interpretation brief guide for interpreting the returned metrics
* @return a response with the metric data
*/
public static KafkaExporterMetricsResponse of(final String clusterName, final String namespace,
final String provider, final List<String> categories,
final List<MetricSample> samples,
final String interpretation) {
long metricCount = samples.stream()
.map(MetricSample::name)
.distinct()
.count();

String msg = String.format("Retrieved %d samples across %d metrics from Kafka Exporter for cluster '%s'",
samples.size(), metricCount, clusterName);

boolean isRangeData = !samples.isEmpty() && samples.getFirst().timestamp() != null;

if (isRangeData) {
List<MetricTimeSeries> series = MetricTimeSeries.fromSamples(samples);
return new KafkaExporterMetricsResponse(clusterName, namespace, provider, categories,
metricCount, samples.size(), null, series, interpretation, Instant.now(), msg);
}

return new KafkaExporterMetricsResponse(clusterName, namespace, provider, categories,
metricCount, samples.size(), samples, null, interpretation, Instant.now(), msg);
}

/**
* Creates an empty response when no metrics are available.
*
* @param clusterName the Kafka cluster name
* @param namespace the Kubernetes namespace
* @param message descriptive message explaining why no metrics are available
* @return an empty response
*/
public static KafkaExporterMetricsResponse empty(final String clusterName, final String namespace,
final String message) {
return new KafkaExporterMetricsResponse(clusterName, namespace, null, List.of(),
0, 0, List.of(), null, null, Instant.now(), message);
}
}
Loading
Loading