From 82736282a81a1708bf807e561dc74feb01472560 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 16 Jan 2025 19:27:48 -0800 Subject: [PATCH] MINOR: Improve Producer error message for failed metadata update We should provide the same informative error message for both timeout cases. --- .../kafka/clients/producer/KafkaProducer.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 608bde98b6d46..648de3ab4b90a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1104,8 +1104,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs - final String errorMessage = String.format("Topic %s not present in metadata after %d ms.", - topic, maxWaitMs); + final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs); if (metadata.getError(topic) != null) { throw new TimeoutException(errorMessage, metadata.getError(topic).exception()); } @@ -1114,11 +1113,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long cluster = metadata.fetch(); elapsed = time.milliseconds() - nowMs; if (elapsed >= maxWaitMs) { - final String errorMessage = partitionsCount == null ? - String.format("Topic %s not present in metadata after %d ms.", - topic, maxWaitMs) : - String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", - partition, topic, partitionsCount, maxWaitMs); + final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs); if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) { throw new TimeoutException(errorMessage, metadata.getError(topic).exception()); } @@ -1134,6 +1129,13 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long return new ClusterAndWaitTime(cluster, elapsed); } + private String getErrorMessage(Integer partitionsCount, String topic, Integer partition, long maxWaitMs) { + return partitionsCount == null ? + String.format("Topic %s not present in metadata after %d ms.", + topic, maxWaitMs) : + String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", + partition, topic, partitionsCount, maxWaitMs); + } /** * Validate that the record size isn't too large */