From 931dfaf28a18f1824f903f5cd60d1fae1c2c818d Mon Sep 17 00:00:00 2001 From: aheev Date: Wed, 20 Aug 2025 20:08:59 +0530 Subject: [PATCH 1/9] KAFKA-19487: Improving consistency of command-line arguments for consumer performance tests --- .../performance/consumer_performance.py | 4 +- .../performance/share_consumer_performance.py | 4 +- .../kafka/tools/ConsumerPerformance.java | 102 +++++++++------- .../kafka/tools/ShareConsumerPerformance.java | 114 ++++++++++-------- .../kafka/tools/ConsumerPerformanceTest.java | 38 +++++- .../tools/ShareConsumerPerformanceTest.java | 38 +++++- 6 files changed, 194 insertions(+), 106 deletions(-) diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 28086e8281887..7d2455030d610 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService): "socket-buffer-size", "The size of the tcp RECV size." "new-consumer", "Use the new consumer implementation." - "consumer.config", "Consumer config properties file." + "command-config", "Consumer config properties file." """ # Root directory for persistent output @@ -115,7 +115,7 @@ def start_cmd(self, node): for key, value in self.args(node.version).items(): cmd += " --%s %s" % (key, value) - cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE + cmd += " --command-config %s" % ConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) diff --git a/tests/kafkatest/services/performance/share_consumer_performance.py b/tests/kafkatest/services/performance/share_consumer_performance.py index ccb0952458009..63aaae7b8dec6 100644 --- a/tests/kafkatest/services/performance/share_consumer_performance.py +++ b/tests/kafkatest/services/performance/share_consumer_performance.py @@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService): "socket-buffer-size", "The size of the tcp RECV size." - "consumer.config", "Consumer config properties file." + "command-config", "Share consumer config properties file." """ # Root directory for persistent output @@ -100,7 +100,7 @@ def start_cmd(self, node): for key, value in self.args().items(): cmd += " --%s %s" % (key, value) - cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE + cmd += " --command-config %s" % ShareConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index 60b4b37abe4b6..1a075b2b743a1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -56,7 +56,7 @@ public static void main(String[] args) { try { LOG.info("Starting consumer..."); ConsumerPerfOptions options = new ConsumerPerfOptions(args); - AtomicLong totalMessagesRead = new AtomicLong(0); + AtomicLong totalRecordsRead = new AtomicLong(0); AtomicLong totalBytesRead = new AtomicLong(0); AtomicLong joinTimeMs = new AtomicLong(0); AtomicLong joinTimeMsInSingleRound = new AtomicLong(0); @@ -66,14 +66,14 @@ public static void main(String[] args) { KafkaConsumer consumer = new KafkaConsumer<>(options.props()); long bytesRead = 0L; - long messagesRead = 0L; + long recordsRead = 0L; long lastBytesRead = 0L; - long lastMessagesRead = 0L; + long lastRecordsRead = 0L; long currentTimeMs = System.currentTimeMillis(); long joinStartMs = currentTimeMs; long startMs = currentTimeMs; - consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs, - bytesRead, messagesRead, lastBytesRead, lastMessagesRead, + consume(consumer, options, totalRecordsRead, totalBytesRead, joinTimeMs, + bytesRead, recordsRead, lastBytesRead, lastRecordsRead, joinStartMs, joinTimeMsInSingleRound); long endMs = System.currentTimeMillis(); @@ -92,12 +92,12 @@ public static void main(String[] args) { options.dateFormat().format(endMs), totalMbRead, totalMbRead / elapsedSec, - totalMessagesRead.get(), - totalMessagesRead.get() / elapsedSec, + totalRecordsRead.get(), + totalRecordsRead.get() / elapsedSec, joinTimeMs.get(), fetchTimeInMs, totalMbRead / (fetchTimeInMs / 1000.0), - totalMessagesRead.get() / (fetchTimeInMs / 1000.0) + totalRecordsRead.get() / (fetchTimeInMs / 1000.0) ); } @@ -120,16 +120,16 @@ protected static void printHeader(boolean showDetailedStats) { private static void consume(KafkaConsumer consumer, ConsumerPerfOptions options, - AtomicLong totalMessagesRead, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, AtomicLong joinTimeMs, long bytesRead, - long messagesRead, + long recordsRead, long lastBytesRead, - long lastMessagesRead, + long lastRecordsRead, long joinStartMs, AtomicLong joinTimeMsInSingleRound) { - long numMessages = options.numMessages(); + long numRecords = options.numRecords(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); long reportingIntervalMs = options.reportingIntervalMs(); boolean showDetailedStats = options.showDetailedStats(); @@ -142,55 +142,55 @@ private static void consume(KafkaConsumer consumer, long lastReportTimeMs = currentTimeMs; long lastConsumedTimeMs = currentTimeMs; - while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) { + while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); currentTimeMs = System.currentTimeMillis(); if (!records.isEmpty()) lastConsumedTimeMs = currentTimeMs; for (ConsumerRecord record : records) { - messagesRead += 1; + recordsRead += 1; if (record.key() != null) bytesRead += record.key().length; if (record.value() != null) bytesRead += record.value().length; if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) { if (showDetailedStats) - printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, + printConsumerProgress(0, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get()); joinTimeMsInSingleRound = new AtomicLong(0); lastReportTimeMs = currentTimeMs; - lastMessagesRead = messagesRead; + lastRecordsRead = recordsRead; lastBytesRead = bytesRead; } } } - if (messagesRead < numMessages) - System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " + + if (recordsRead < numRecords) + System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " + "You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs); - totalMessagesRead.set(messagesRead); + totalRecordsRead.set(recordsRead); totalBytesRead.set(bytesRead); } protected static void printConsumerProgress(int id, long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, SimpleDateFormat dateFormat, long joinTimeMsInSingleRound) { - printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat); - printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound); + printBasicProgress(id, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, dateFormat); + printExtendedProgress(bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound); System.out.println(); } private static void printBasicProgress(int id, long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, SimpleDateFormat dateFormat) { @@ -198,25 +198,25 @@ private static void printBasicProgress(int id, double totalMbRead = (bytesRead * 1.0) / (1024 * 1024); double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs; - double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0; + double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / elapsedMs) * 1000.0; System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id, - totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec); + totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec); } private static void printExtendedProgress(long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, long joinTimeMsInSingleRound) { long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound; double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); - long intervalMessagesRead = messagesRead - lastMessagesRead; + long intervalRecordsRead = recordsRead - lastRecordsRead; double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMbRead / fetchTimeMs; - double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMessagesRead / fetchTimeMs; + double intervalRecordsPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalRecordsRead / fetchTimeMs; System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound, - fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec); + fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec); } public static class ConsumerPerfRebListener implements ConsumerRebalanceListener { @@ -250,11 +250,14 @@ protected static class ConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec fetchSizeOpt; private final OptionSpec resetBeginningOffsetOpt; private final OptionSpec socketBufferSizeOpt; + // Deprecated option, kept for backward compatibility + // and will be removed in a future version. private final OptionSpec consumerConfigOpt; + private final OptionSpec commandConfigOpt; private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; - private final OptionSpec numMessagesOpt; + private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; private final OptionSpec dateFormatOpt; private final OptionSpec hideHeaderOpt; @@ -280,14 +283,19 @@ public ConsumerPerfOptions(String[] args) { .ofType(Integer.class) .defaultsTo(1024 * 1024); resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message."); + "offset to consume from, start with the latest record present in the log rather than the earliest record."); socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") .withRequiredArg() .describedAs("size") .ofType(Integer.class) .defaultsTo(2 * 1024 * 1024); - consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") - .withRequiredArg() + consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. " + + "This option will be removed in a future version. Use --command-config instead.") + .withOptionalArg() + .describedAs("config file") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Consumer config properties file") + .withOptionalArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); @@ -298,7 +306,7 @@ public ConsumerPerfOptions(String[] args) { .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); - numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.") + numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.") .withRequiredArg() .describedAs("count") .ofType(Long.class); @@ -323,7 +331,10 @@ public ConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numRecordsOpt); + if (!options.has(consumerConfigOpt)) { + CommandLineUtils.checkRequiredArgs(parser, options, commandConfigOpt); + } } } @@ -336,8 +347,15 @@ public String brokerHostsAndPorts() { } public Properties props() throws IOException { - Properties props = (options.has(consumerConfigOpt)) - ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + String commandConfigFile; + if (options.has(consumerConfigOpt)) { + System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); + commandConfigFile = options.valueOf(consumerConfigOpt); + } else { + commandConfigFile = options.valueOf(commandConfigOpt); + } + Properties props = commandConfigFile != null + ? Utils.loadProps(commandConfigFile) : new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); @@ -357,8 +375,8 @@ public Set topic() { return Set.of(options.valueOf(topicOpt)); } - public long numMessages() { - return options.valueOf(numMessagesOpt); + public long numRecords() { + return options.valueOf(numRecordsOpt); } public long reportingIntervalMs() { diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index bb0aaba89d585..6f9a968be3c47 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -62,7 +62,7 @@ public static void main(String[] args) { try { LOG.info("Starting share consumer/consumers..."); ShareConsumerPerfOptions options = new ShareConsumerPerfOptions(args); - AtomicLong totalMessagesRead = new AtomicLong(0); + AtomicLong totalRecordsRead = new AtomicLong(0); AtomicLong totalBytesRead = new AtomicLong(0); if (!options.hideHeader()) @@ -73,7 +73,7 @@ public static void main(String[] args) { shareConsumers.add(new KafkaShareConsumer<>(options.props())); } long startMs = System.currentTimeMillis(); - consume(shareConsumers, options, totalMessagesRead, totalBytesRead, startMs); + consume(shareConsumers, options, totalRecordsRead, totalBytesRead, startMs); long endMs = System.currentTimeMillis(); List> shareConsumersMetrics = new ArrayList<>(); @@ -89,7 +89,7 @@ public static void main(String[] args) { // Print final stats for share group. double elapsedSec = (endMs - startMs) / 1_000.0; long fetchTimeInMs = endMs - startMs; - printStats(totalBytesRead.get(), totalMessagesRead.get(), elapsedSec, fetchTimeInMs, startMs, endMs, + printStats(totalBytesRead.get(), totalRecordsRead.get(), elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), -1); shareConsumersMetrics.forEach(ToolsUtils::printMetrics); @@ -108,15 +108,15 @@ protected static void printHeader() { private static void consume(List> shareConsumers, ShareConsumerPerfOptions options, - AtomicLong totalMessagesRead, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, long startMs) throws ExecutionException, InterruptedException, TimeoutException { - long numMessages = options.numMessages(); + long numRecords = options.numRecords(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic())); // Now start the benchmark. - AtomicLong messagesRead = new AtomicLong(0); + AtomicLong recordsRead = new AtomicLong(0); AtomicLong bytesRead = new AtomicLong(0); List shareConsumersConsumptionDetails = new ArrayList<>(); @@ -128,7 +128,7 @@ private static void consume(List> shareConsum ShareConsumerConsumption shareConsumerConsumption = new ShareConsumerConsumption(0, 0); futures.add(executorService.submit(() -> { try { - consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, bytesRead, options, + consumeRecordsForSingleShareConsumer(shareConsumers.get(index), recordsRead, bytesRead, options, shareConsumerConsumption, index + 1); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -166,22 +166,22 @@ private static void consume(List> shareConsum // Print stats for share consumer. double elapsedSec = (endMs - startMs) / 1_000.0; long fetchTimeInMs = endMs - startMs; - long messagesReadByConsumer = shareConsumersConsumptionDetails.get(index).messagesConsumed(); + long recordsReadByConsumer = shareConsumersConsumptionDetails.get(index).recordsConsumed(); long bytesReadByConsumer = shareConsumersConsumptionDetails.get(index).bytesConsumed(); - printStats(bytesReadByConsumer, messagesReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1); + printStats(bytesReadByConsumer, recordsReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1); } } - if (messagesRead.get() < numMessages) { - System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " + + if (recordsRead.get() < numRecords) { + System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " + "You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs); } - totalMessagesRead.set(messagesRead.get()); + totalRecordsRead.set(recordsRead.get()); totalBytesRead.set(bytesRead.get()); } - private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer shareConsumer, - AtomicLong totalMessagesRead, + private static void consumeRecordsForSingleShareConsumer(KafkaShareConsumer shareConsumer, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, ShareConsumerPerfOptions options, ShareConsumerConsumption shareConsumerConsumption, @@ -192,17 +192,17 @@ private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer records = shareConsumer.poll(Duration.ofMillis(100)); currentTimeMs = System.currentTimeMillis(); if (!records.isEmpty()) lastConsumedTimeMs = currentTimeMs; for (ConsumerRecord record : records) { - messagesReadByConsumer += 1; - totalMessagesRead.addAndGet(1); + recordsReadByConsumer += 1; + totalRecordsRead.addAndGet(1); if (record.key() != null) { bytesReadByConsumer += record.key().length; totalBytesRead.addAndGet(record.key().length); @@ -213,13 +213,13 @@ private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer= options.reportingIntervalMs()) { if (options.showDetailedStats()) - printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, messagesReadByConsumer, lastMessagesRead, + printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, recordsReadByConsumer, lastRecordsRead, lastReportTimeMs, currentTimeMs, dateFormat, index); lastReportTimeMs = currentTimeMs; - lastMessagesRead = messagesReadByConsumer; + lastRecordsRead = recordsReadByConsumer; lastBytesRead = bytesReadByConsumer; } - shareConsumerConsumption.updateMessagesConsumed(messagesReadByConsumer); + shareConsumerConsumption.updateRecordsConsumed(recordsReadByConsumer); shareConsumerConsumption.updateBytesConsumed(bytesReadByConsumer); } } @@ -227,8 +227,8 @@ private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer= 1. private static void printStats(long bytesRead, - long messagesRead, + long recordsRead, double elapsedSec, long fetchTimeInMs, long startMs, @@ -263,8 +263,8 @@ private static void printStats(long bytesRead, dateFormat.format(endMs), totalMbRead, totalMbRead / elapsedSec, - messagesRead / elapsedSec, - messagesRead, + recordsRead / elapsedSec, + recordsRead, fetchTimeInMs ); return; @@ -274,8 +274,8 @@ private static void printStats(long bytesRead, dateFormat.format(endMs), totalMbRead, totalMbRead / elapsedSec, - messagesRead / elapsedSec, - messagesRead, + recordsRead / elapsedSec, + recordsRead, fetchTimeInMs ); } @@ -286,11 +286,14 @@ protected static class ShareConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec groupIdOpt; private final OptionSpec fetchSizeOpt; private final OptionSpec socketBufferSizeOpt; + // Deprecated option, kept for backward compatibility + // and will be removed in a future version. private final OptionSpec consumerConfigOpt; + private final OptionSpec commandConfigOpt; private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; - private final OptionSpec numMessagesOpt; + private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; private final OptionSpec dateFormatOpt; private final OptionSpec hideHeaderOpt; @@ -322,8 +325,13 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("size") .ofType(Integer.class) .defaultsTo(2 * 1024 * 1024); - consumerConfigOpt = parser.accepts("consumer.config", "Share consumer config properties file.") - .withRequiredArg() + consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Share consumer config properties file. " + + "This option will be removed in a future version. Use --command-config instead.") + .withOptionalArg() + .describedAs("config file") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Share consumer config properties file.") + .withOptionalArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); @@ -334,7 +342,7 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); - numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.") + numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.") .withRequiredArg() .describedAs("count") .ofType(Long.class); @@ -366,7 +374,10 @@ public ShareConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the share consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numRecordsOpt); + if (!options.has(consumerConfigOpt)) { + CommandLineUtils.checkRequiredArgs(parser, options, commandConfigOpt); + } } } @@ -379,8 +390,15 @@ public String brokerHostsAndPorts() { } public Properties props() throws IOException { - Properties props = (options.has(consumerConfigOpt)) - ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + String commandConfigFile; + if (options.has(consumerConfigOpt)) { + System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); + commandConfigFile = options.valueOf(consumerConfigOpt); + } else { + commandConfigFile = options.valueOf(commandConfigOpt); + } + Properties props = commandConfigFile != null + ? Utils.loadProps(commandConfigFile) : new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); @@ -398,8 +416,8 @@ public Set topic() { return Set.of(options.valueOf(topicOpt)); } - public long numMessages() { - return options.valueOf(numMessagesOpt); + public long numRecords() { + return options.valueOf(numRecordsOpt); } public int threads() { @@ -434,26 +452,26 @@ public long recordFetchTimeoutMs() { } } - // Helper class to know the final messages and bytes consumer by share consumer at the end of consumption. + // Helper class to know the final records and bytes consumed by share consumer at the end of consumption. private static class ShareConsumerConsumption { - private long messagesConsumed; + private long recordsConsumed; private long bytesConsumed; - public ShareConsumerConsumption(long messagesConsumed, long bytesConsumed) { - this.messagesConsumed = messagesConsumed; + public ShareConsumerConsumption(long recordsConsumed, long bytesConsumed) { + this.recordsConsumed = recordsConsumed; this.bytesConsumed = bytesConsumed; } - public long messagesConsumed() { - return messagesConsumed; + public long recordsConsumed() { + return recordsConsumed; } public long bytesConsumed() { return bytesConsumed; } - public void updateMessagesConsumed(long messagesConsumed) { - this.messagesConsumed = messagesConsumed; + public void updateRecordsConsumed(long recordsConsumed) { + this.recordsConsumed = recordsConsumed; } public void updateBytesConsumed(long bytesConsumed) { diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index 270fab2cf805f..204197c5ea298 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -68,7 +68,7 @@ public void testConfigBootStrapServer() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", "--print-metrics" }; @@ -76,7 +76,7 @@ public void testConfigBootStrapServer() { assertEquals("localhost:9092", config.brokerHostsAndPorts()); assertTrue(config.topic().contains("test")); - assertEquals(10, config.numMessages()); + assertEquals(10, config.numRecords()); } @Test @@ -84,7 +84,7 @@ public void testConfigWithUnrecognizedOption() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", "--new-consumer" }; @@ -95,7 +95,9 @@ public void testConfigWithUnrecognizedOption() { @Test public void testClientIdOverride() throws IOException { - File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile(); + Path configPath = tempDir.resolve("test_consumer_config.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { output.println("client.id=consumer-1"); output.flush(); @@ -104,13 +106,37 @@ public void testClientIdOverride() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", + "--command-config", tempFile.getAbsolutePath() + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + Files.deleteIfExists(configPath); + } + + @Test + public void testConsumerConfigDeprecated() throws IOException { + Path configPath = tempDir.resolve("test_consumer_config.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", "--consumer.config", tempFile.getAbsolutePath() }; ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + Files.deleteIfExists(configPath); } @Test @@ -118,7 +144,7 @@ public void testDefaultClientId() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10" + "--num-records", "10" }; ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java index 853d79a7e3b35..c31b9fa96e9df 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -62,7 +62,7 @@ public void testConfigBootStrapServer() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", "--print-metrics" }; @@ -70,7 +70,7 @@ public void testConfigBootStrapServer() { assertEquals("localhost:9092", config.brokerHostsAndPorts()); assertTrue(config.topic().contains("test")); - assertEquals(10, config.numMessages()); + assertEquals(10, config.numRecords()); } @Test @@ -78,7 +78,7 @@ public void testConfigWithUnrecognizedOption() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", "--new-share-consumer" }; @@ -89,7 +89,9 @@ public void testConfigWithUnrecognizedOption() { @Test public void testClientIdOverride() throws IOException { - File tempFile = Files.createFile(tempDir.resolve("test_share_consumer_config.conf")).toFile(); + Path configPath = tempDir.resolve("test_share_consumer_config.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { output.println("client.id=share-consumer-1"); output.flush(); @@ -98,13 +100,37 @@ public void testClientIdOverride() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", + "--command-config", tempFile.getAbsolutePath() + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + + assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + Files.deleteIfExists(configPath); + } + + @Test + public void testConsumerConfigDeprecated() throws IOException { + Path configPath = tempDir.resolve("test_share_consumer_config.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=share-consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", "--consumer.config", tempFile.getAbsolutePath() }; ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + Files.deleteIfExists(configPath); } @Test @@ -112,7 +138,7 @@ public void testDefaultClientId() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10" + "--num-records", "10" }; ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); From 34651a9de21da9f8f549c2db326b74943298d413 Mon Sep 17 00:00:00 2001 From: ally heev Date: Mon, 25 Aug 2025 11:56:56 +0530 Subject: [PATCH 2/9] Add deprecation cycle for num-records --- .../kafka/tools/ConsumerPerformance.java | 20 +++++++++++++----- .../kafka/tools/ShareConsumerPerformance.java | 21 +++++++++++++------ .../kafka/tools/ConsumerPerformanceTest.java | 12 +++++++++++ .../tools/ShareConsumerPerformanceTest.java | 12 +++++++++++ 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index 45ccbda41752a..f91abb5f3e0d0 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -265,6 +265,9 @@ protected static class ConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; + // Deprecated option, kept for backward compatibility + // and will be removed in a future version. + private final OptionSpec numMessagesOpt; private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; private final OptionSpec dateFormatOpt; @@ -302,12 +305,12 @@ public ConsumerPerfOptions(String[] args) { .ofType(Integer.class) .defaultsTo(2 * 1024 * 1024); consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. " + - "This option will be removed in a future version. Use --command-config instead.") - .withOptionalArg() + "This option will be removed in a future version. Use --command-config instead") + .withRequiredArg() .describedAs("config file") .ofType(String.class); commandConfigOpt = parser.accepts("command-config", "Consumer config properties file") - .withOptionalArg() + .withRequiredArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); @@ -318,6 +321,11 @@ public ConsumerPerfOptions(String[] args) { .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); + numMessagesOpt = parser.accepts("messages", "(DEPRECATED) REQUIRED: The number of messages to consume. " + + "This option will be removed in a future version. Use --num-records instead") + .withRequiredArg() + .describedAs("count") + .ofType(Long.class); numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.") .withRequiredArg() .describedAs("count") @@ -343,7 +351,7 @@ public ConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, numRecordsOpt); + CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); CommandLineUtils.checkOneOfArgs(parser, options, consumerConfigOpt, commandConfigOpt); } @@ -395,7 +403,9 @@ public Optional include() { } public long numRecords() { - return options.valueOf(numRecordsOpt); + return options.has(numMessagesOpt) + ? options.valueOf(numMessagesOpt) + : options.valueOf(numRecordsOpt); } public long reportingIntervalMs() { diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index 6f9a968be3c47..207b76920e681 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -293,6 +293,9 @@ protected static class ShareConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; + // Deprecated option, kept for backward compatibility + // and will be removed in a future version. + private final OptionSpec numMessagesOpt; private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; private final OptionSpec dateFormatOpt; @@ -327,11 +330,11 @@ public ShareConsumerPerfOptions(String[] args) { .defaultsTo(2 * 1024 * 1024); consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Share consumer config properties file. " + "This option will be removed in a future version. Use --command-config instead.") - .withOptionalArg() + .withRequiredArg() .describedAs("config file") .ofType(String.class); commandConfigOpt = parser.accepts("command-config", "Share consumer config properties file.") - .withOptionalArg() + .withRequiredArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); @@ -342,6 +345,11 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); + numMessagesOpt = parser.accepts("messages", "(DEPRECATED) REQUIRED: The number of records to consume. " + + "This option will be removed in a future version. Use --num-records instead.") + .withRequiredArg() + .describedAs("count") + .ofType(Long.class); numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.") .withRequiredArg() .describedAs("count") @@ -375,9 +383,8 @@ public ShareConsumerPerfOptions(String[] args) { if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the share consumer performance."); CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numRecordsOpt); - if (!options.has(consumerConfigOpt)) { - CommandLineUtils.checkRequiredArgs(parser, options, commandConfigOpt); - } + CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); + CommandLineUtils.checkOneOfArgs(parser, options, consumerConfigOpt, commandConfigOpt); } } @@ -417,7 +424,9 @@ public Set topic() { } public long numRecords() { - return options.valueOf(numRecordsOpt); + return options.has(numMessagesOpt) + ? options.valueOf(numMessagesOpt) + : options.valueOf(numRecordsOpt); } public int threads() { diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index 509321ac4a303..70f2565bfff8e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -79,6 +79,18 @@ public void testConfigBootStrapServer() { assertEquals(10, config.numRecords()); } + @Test + public void testNumOfRecordsDeprecated() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + assertEquals(10, config.numRecords()); + } + @Test public void testConfigWithUnrecognizedOption() { String[] args = new String[]{ diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java index c31b9fa96e9df..0ec981a7a6742 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -73,6 +73,18 @@ public void testConfigBootStrapServer() { assertEquals(10, config.numRecords()); } + @Test + public void testNumOfRecordsDeprecated() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + assertEquals(10, config.numRecords()); + } + @Test public void testConfigWithUnrecognizedOption() { String[] args = new String[]{ From d82ce4febae8d6bf544ec77630d109ee488e1509 Mon Sep 17 00:00:00 2001 From: ally heev Date: Tue, 26 Aug 2025 16:40:22 +0530 Subject: [PATCH 3/9] Add deprecation signatures to old options; Remove `consumer config` and `messages` traces from the tools --- .../org/apache/kafka/tools/ConsumerPerformance.java | 10 ++++------ .../apache/kafka/tools/ShareConsumerPerformance.java | 10 ++++------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index f91abb5f3e0d0..d8392cac8e81f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -258,15 +258,13 @@ protected static class ConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec fetchSizeOpt; private final OptionSpec resetBeginningOffsetOpt; private final OptionSpec socketBufferSizeOpt; - // Deprecated option, kept for backward compatibility - // and will be removed in a future version. + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec consumerConfigOpt; private final OptionSpec commandConfigOpt; private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; - // Deprecated option, kept for backward compatibility - // and will be removed in a future version. + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec numMessagesOpt; private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; @@ -309,7 +307,7 @@ public ConsumerPerfOptions(String[] args) { .withRequiredArg() .describedAs("config file") .ofType(String.class); - commandConfigOpt = parser.accepts("command-config", "Consumer config properties file") + commandConfigOpt = parser.accepts("command-config", "Config properties file") .withRequiredArg() .describedAs("config file") .ofType(String.class); @@ -321,7 +319,7 @@ public ConsumerPerfOptions(String[] args) { .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); - numMessagesOpt = parser.accepts("messages", "(DEPRECATED) REQUIRED: The number of messages to consume. " + + numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " + "This option will be removed in a future version. Use --num-records instead") .withRequiredArg() .describedAs("count") diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index 207b76920e681..e9f6c96d9f542 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -286,15 +286,13 @@ protected static class ShareConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec groupIdOpt; private final OptionSpec fetchSizeOpt; private final OptionSpec socketBufferSizeOpt; - // Deprecated option, kept for backward compatibility - // and will be removed in a future version. + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec consumerConfigOpt; private final OptionSpec commandConfigOpt; private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; - // Deprecated option, kept for backward compatibility - // and will be removed in a future version. + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec numMessagesOpt; private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; @@ -333,7 +331,7 @@ public ShareConsumerPerfOptions(String[] args) { .withRequiredArg() .describedAs("config file") .ofType(String.class); - commandConfigOpt = parser.accepts("command-config", "Share consumer config properties file.") + commandConfigOpt = parser.accepts("command-config", "Config properties file.") .withRequiredArg() .describedAs("config file") .ofType(String.class); @@ -345,7 +343,7 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); - numMessagesOpt = parser.accepts("messages", "(DEPRECATED) REQUIRED: The number of records to consume. " + + numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " + "This option will be removed in a future version. Use --num-records instead.") .withRequiredArg() .describedAs("count") From c4cd51e452ccd8f6fbd3d0f917158815b20345ee Mon Sep 17 00:00:00 2001 From: ally heev Date: Wed, 27 Aug 2025 01:30:12 +0530 Subject: [PATCH 4/9] Add checks for deprecated options --- .../kafka/tools/ConsumerPerformance.java | 14 ++++-- .../kafka/tools/ShareConsumerPerformance.java | 16 ++++-- .../kafka/tools/ConsumerPerformanceTest.java | 44 +++++++++++++++- .../tools/ShareConsumerPerformanceTest.java | 50 +++++++++++++++++-- 4 files changed, 114 insertions(+), 10 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index d8392cac8e81f..c66740c97f65a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -349,9 +349,18 @@ public ConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); - CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); - CommandLineUtils.checkOneOfArgs(parser, options, consumerConfigOpt, commandConfigOpt); + + CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); + CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt); + + if (options.has(numMessagesOpt)) { + System.out.println("Warning: --messages is deprecated. Use --num-records instead."); + } + + if (options.has(consumerConfigOpt)) { + System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); + } } } @@ -366,7 +375,6 @@ public String brokerHostsAndPorts() { public Properties props() throws IOException { String commandConfigFile; if (options.has(consumerConfigOpt)) { - System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); commandConfigFile = options.valueOf(consumerConfigOpt); } else { commandConfigFile = options.valueOf(commandConfigOpt); diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index e9f6c96d9f542..b41c0de78b11a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -380,9 +380,20 @@ public ShareConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the share consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numRecordsOpt); + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt); + CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); - CommandLineUtils.checkOneOfArgs(parser, options, consumerConfigOpt, commandConfigOpt); + CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt); + + if (options.has(numMessagesOpt)) { + System.out.println("Warning: --messages is deprecated. Use --num-records instead."); + } + + if (options.has(consumerConfigOpt)) { + System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); + } + + } } @@ -397,7 +408,6 @@ public String brokerHostsAndPorts() { public Properties props() throws IOException { String commandConfigFile; if (options.has(consumerConfigOpt)) { - System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); commandConfigFile = options.valueOf(consumerConfigOpt); } else { commandConfigFile = options.valueOf(commandConfigOpt); diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index 70f2565bfff8e..e0d12f16f157b 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -79,6 +79,18 @@ public void testConfigBootStrapServer() { assertEquals(10, config.numRecords()); } + @Test + public void testNumOfRecordsNotPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required:")); + } + @Test public void testNumOfRecordsDeprecated() { String[] args = new String[]{ @@ -91,6 +103,20 @@ public void testNumOfRecordsDeprecated() { assertEquals(10, config.numRecords()); } + @Test + public void testNumOfRecordsWithMessagesPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--num-records", "20" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required")); + } + @Test public void testConfigWithUnrecognizedOption() { String[] args = new String[]{ @@ -170,7 +196,7 @@ public void testClientIdOverride() throws IOException { } @Test - public void testConsumerConfigDeprecated() throws IOException { + public void testCommandConfigDeprecated() throws IOException { Path configPath = tempDir.resolve("test_consumer_config.conf"); Files.deleteIfExists(configPath); File tempFile = Files.createFile(configPath).toFile(); @@ -192,6 +218,22 @@ public void testConsumerConfigDeprecated() throws IOException { Files.deleteIfExists(configPath); } + @Test + public void testCommandConfigWithConsumerConfigPresent() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--consumer.config", "some-path", + "--command-config", "some-path" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains(String.format("Option \"%s\" can't be used with option \"%s\"", + "[consumer.config]", "[command-config]"))); + } + @Test public void testDefaultClientId() throws IOException { String[] args = new String[]{ diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java index 0ec981a7a6742..7ec97c353afee 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -73,6 +73,18 @@ public void testConfigBootStrapServer() { assertEquals(10, config.numRecords()); } + @Test + public void testNumOfRecordsNotPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required:")); + } + @Test public void testNumOfRecordsDeprecated() { String[] args = new String[]{ @@ -81,10 +93,25 @@ public void testNumOfRecordsDeprecated() { "--messages", "10" }; - ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + ShareConsumerPerformance.ShareConsumerPerfOptions config = + new ShareConsumerPerformance.ShareConsumerPerfOptions(args); assertEquals(10, config.numRecords()); } + @Test + public void testNumOfRecordsWithMessagesPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--num-records", "20" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required")); + } + @Test public void testConfigWithUnrecognizedOption() { String[] args = new String[]{ @@ -123,7 +150,7 @@ public void testClientIdOverride() throws IOException { } @Test - public void testConsumerConfigDeprecated() throws IOException { + public void testCommandConfigDeprecated() throws IOException { Path configPath = tempDir.resolve("test_share_consumer_config.conf"); Files.deleteIfExists(configPath); File tempFile = Files.createFile(configPath).toFile(); @@ -139,12 +166,29 @@ public void testConsumerConfigDeprecated() throws IOException { "--consumer.config", tempFile.getAbsolutePath() }; - ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + ShareConsumerPerformance.ShareConsumerPerfOptions config = + new ShareConsumerPerformance.ShareConsumerPerfOptions(args); assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); Files.deleteIfExists(configPath); } + @Test + public void testCommandConfigWithConsumerConfigPresent() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--consumer.config", "some-path", + "--command-config", "some-path" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains(String.format("Option \"%s\" can't be used with option \"%s\"", + "[consumer.config]", "[command-config]"))); + } + @Test public void testDefaultClientId() throws IOException { String[] args = new String[]{ From 5c8fd2b2f694b5d85ff0698115546aae57ab205a Mon Sep 17 00:00:00 2001 From: ally heev Date: Wed, 27 Aug 2025 14:55:55 +0530 Subject: [PATCH 5/9] Rename tests to old config names --- .../java/org/apache/kafka/tools/ShareConsumerPerformance.java | 2 -- .../java/org/apache/kafka/tools/ConsumerPerformanceTest.java | 4 ++-- .../org/apache/kafka/tools/ShareConsumerPerformanceTest.java | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index b41c0de78b11a..6f0a707c09bdc 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -392,8 +392,6 @@ public ShareConsumerPerfOptions(String[] args) { if (options.has(consumerConfigOpt)) { System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); } - - } } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index e0d12f16f157b..ea8411a5848a9 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -92,7 +92,7 @@ public void testNumOfRecordsNotPresent() { } @Test - public void testNumOfRecordsDeprecated() { + public void testMessagesDeprecated() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", @@ -196,7 +196,7 @@ public void testClientIdOverride() throws IOException { } @Test - public void testCommandConfigDeprecated() throws IOException { + public void testConsumerConfigDeprecated() throws IOException { Path configPath = tempDir.resolve("test_consumer_config.conf"); Files.deleteIfExists(configPath); File tempFile = Files.createFile(configPath).toFile(); diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java index 7ec97c353afee..bdc691da40ef0 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -86,7 +86,7 @@ public void testNumOfRecordsNotPresent() { } @Test - public void testNumOfRecordsDeprecated() { + public void testMessagesDeprecated() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", @@ -150,7 +150,7 @@ public void testClientIdOverride() throws IOException { } @Test - public void testCommandConfigDeprecated() throws IOException { + public void testConsumerConfigDeprecated() throws IOException { Path configPath = tempDir.resolve("test_share_consumer_config.conf"); Files.deleteIfExists(configPath); File tempFile = Files.createFile(configPath).toFile(); From 9d9a8c63e77c856720b2ab77b3d1f835e61f5cd6 Mon Sep 17 00:00:00 2001 From: ally heev Date: Wed, 27 Aug 2025 15:36:33 +0530 Subject: [PATCH 6/9] Rename consumer config files to unique prefix-suffixes --- .../org/apache/kafka/tools/ConsumerPerformanceTest.java | 6 +++--- .../apache/kafka/tools/ShareConsumerPerformanceTest.java | 8 +++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index ea8411a5848a9..e1ea5954aaf2b 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -174,7 +174,7 @@ public void testConfigWithoutTopicAndInclude() { @Test public void testClientIdOverride() throws IOException { - Path configPath = tempDir.resolve("test_consumer_config.conf"); + Path configPath = tempDir.resolve("test_client_id_override_consumer_perf.conf"); Files.deleteIfExists(configPath); File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { @@ -197,7 +197,7 @@ public void testClientIdOverride() throws IOException { @Test public void testConsumerConfigDeprecated() throws IOException { - Path configPath = tempDir.resolve("test_consumer_config.conf"); + Path configPath = tempDir.resolve("test_consumer_config_deprecated_consumer_perf.conf"); Files.deleteIfExists(configPath); File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { @@ -219,7 +219,7 @@ public void testConsumerConfigDeprecated() throws IOException { } @Test - public void testCommandConfigWithConsumerConfigPresent() throws IOException { + public void testCommandConfigWithConsumerConfigPresent() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java index bdc691da40ef0..49e38745951f3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -128,7 +128,7 @@ public void testConfigWithUnrecognizedOption() { @Test public void testClientIdOverride() throws IOException { - Path configPath = tempDir.resolve("test_share_consumer_config.conf"); + Path configPath = tempDir.resolve("test_client_id_override_share_consumer_perf.conf"); Files.deleteIfExists(configPath); File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { @@ -146,12 +146,11 @@ public void testClientIdOverride() throws IOException { ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); - Files.deleteIfExists(configPath); } @Test public void testConsumerConfigDeprecated() throws IOException { - Path configPath = tempDir.resolve("test_share_consumer_config.conf"); + Path configPath = tempDir.resolve("test_consumer_config_deprecated_share_consumer_perf.conf"); Files.deleteIfExists(configPath); File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { @@ -170,11 +169,10 @@ public void testConsumerConfigDeprecated() throws IOException { new ShareConsumerPerformance.ShareConsumerPerfOptions(args); assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); - Files.deleteIfExists(configPath); } @Test - public void testCommandConfigWithConsumerConfigPresent() throws IOException { + public void testCommandConfigWithConsumerConfigPresent() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", From 9e53ae513a5ec477a763fc71a69feccd4591471a Mon Sep 17 00:00:00 2001 From: ally heev Date: Thu, 28 Aug 2025 20:15:04 +0530 Subject: [PATCH 7/9] replace consumer config -> config in performance service files --- tests/kafkatest/services/performance/consumer_performance.py | 2 +- .../services/performance/share_consumer_performance.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 7d2455030d610..c49390f1d69f6 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService): "socket-buffer-size", "The size of the tcp RECV size." "new-consumer", "Use the new consumer implementation." - "command-config", "Consumer config properties file." + "command-config", "Config properties file." """ # Root directory for persistent output diff --git a/tests/kafkatest/services/performance/share_consumer_performance.py b/tests/kafkatest/services/performance/share_consumer_performance.py index 63aaae7b8dec6..1e68e583705d0 100644 --- a/tests/kafkatest/services/performance/share_consumer_performance.py +++ b/tests/kafkatest/services/performance/share_consumer_performance.py @@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService): "socket-buffer-size", "The size of the tcp RECV size." - "command-config", "Share consumer config properties file." + "command-config", "Config properties file." """ # Root directory for persistent output From 7020430e21636f0183680aad52c4916c1490e049 Mon Sep 17 00:00:00 2001 From: ally heev Date: Thu, 28 Aug 2025 21:16:58 +0530 Subject: [PATCH 8/9] add required check on bootstrap server opt --- .../org/apache/kafka/tools/ConsumerPerformance.java | 11 ++++++----- .../apache/kafka/tools/ShareConsumerPerformance.java | 6 +++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index c66740c97f65a..734e6d33cef9c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -303,24 +303,24 @@ public ConsumerPerfOptions(String[] args) { .ofType(Integer.class) .defaultsTo(2 * 1024 * 1024); consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. " + - "This option will be removed in a future version. Use --command-config instead") + "This option will be removed in a future version. Use --command-config instead.") .withRequiredArg() .describedAs("config file") .ofType(String.class); - commandConfigOpt = parser.accepts("command-config", "Config properties file") + commandConfigOpt = parser.accepts("command-config", "Config properties file.") .withRequiredArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval"); + "interval as configured by reporting-interval."); recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") .withOptionalArg() .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " + - "This option will be removed in a future version. Use --num-records instead") + "This option will be removed in a future version. Use --num-records instead.") .withRequiredArg() .describedAs("count") .ofType(Long.class); @@ -340,7 +340,7 @@ public ConsumerPerfOptions(String[] args) { .describedAs("date format") .ofType(String.class) .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS"); - hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats"); + hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats."); try { options = parser.parse(args); } catch (OptionException e) { @@ -349,6 +349,7 @@ public ConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index 6f0a707c09bdc..4a2c1c3f50729 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -337,7 +337,7 @@ public ShareConsumerPerfOptions(String[] args) { .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval"); + "interval as configured by reporting-interval."); recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") .withOptionalArg() .describedAs("milliseconds") @@ -364,7 +364,7 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("date format") .ofType(String.class) .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS"); - hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats"); + hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats."); numThreadsOpt = parser.accepts("threads", "The number of share consumers to use for sharing the load.") .withRequiredArg() .describedAs("count") @@ -380,7 +380,7 @@ public ShareConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the share consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt); + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, bootstrapServerOpt); CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt); From a99f389dd5eaa30b2e2f40b567d3fca4d44cfa6d Mon Sep 17 00:00:00 2001 From: ally heev Date: Thu, 28 Aug 2025 21:24:23 +0530 Subject: [PATCH 9/9] add required check on bootstrap server opt tests --- .../apache/kafka/tools/ConsumerPerformanceTest.java | 11 +++++++++++ .../kafka/tools/ShareConsumerPerformanceTest.java | 11 +++++++++++ 2 files changed, 22 insertions(+) diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index e1ea5954aaf2b..badec8aaf7199 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -79,6 +79,17 @@ public void testConfigBootStrapServer() { assertEquals(10, config.numRecords()); } + @Test + public void testBootstrapServerNotPresent() { + String[] args = new String[]{ + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Missing required argument \"[bootstrap-server]\"")); + } + @Test public void testNumOfRecordsNotPresent() { String[] args = new String[]{ diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java index 49e38745951f3..e3f7a62d9dac3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -73,6 +73,17 @@ public void testConfigBootStrapServer() { assertEquals(10, config.numRecords()); } + @Test + public void testBootstrapServerNotPresent() { + String[] args = new String[]{ + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Missing required argument \"[bootstrap-server]\"")); + } + @Test public void testNumOfRecordsNotPresent() { String[] args = new String[]{