diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 28086e8281887..c49390f1d69f6 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService): "socket-buffer-size", "The size of the tcp RECV size." "new-consumer", "Use the new consumer implementation." - "consumer.config", "Consumer config properties file." + "command-config", "Config properties file." """ # Root directory for persistent output @@ -115,7 +115,7 @@ def start_cmd(self, node): for key, value in self.args(node.version).items(): cmd += " --%s %s" % (key, value) - cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE + cmd += " --command-config %s" % ConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) diff --git a/tests/kafkatest/services/performance/share_consumer_performance.py b/tests/kafkatest/services/performance/share_consumer_performance.py index ccb0952458009..1e68e583705d0 100644 --- a/tests/kafkatest/services/performance/share_consumer_performance.py +++ b/tests/kafkatest/services/performance/share_consumer_performance.py @@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService): "socket-buffer-size", "The size of the tcp RECV size." - "consumer.config", "Consumer config properties file." + "command-config", "Config properties file." """ # Root directory for persistent output @@ -100,7 +100,7 @@ def start_cmd(self, node): for key, value in self.args().items(): cmd += " --%s %s" % (key, value) - cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE + cmd += " --command-config %s" % ShareConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index 0892693801ad3..734e6d33cef9c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -58,7 +58,7 @@ public static void main(String[] args) { try { LOG.info("Starting consumer..."); ConsumerPerfOptions options = new ConsumerPerfOptions(args); - AtomicLong totalMessagesRead = new AtomicLong(0); + AtomicLong totalRecordsRead = new AtomicLong(0); AtomicLong totalBytesRead = new AtomicLong(0); AtomicLong joinTimeMs = new AtomicLong(0); AtomicLong joinTimeMsInSingleRound = new AtomicLong(0); @@ -68,14 +68,14 @@ public static void main(String[] args) { KafkaConsumer consumer = new KafkaConsumer<>(options.props()); long bytesRead = 0L; - long messagesRead = 0L; + long recordsRead = 0L; long lastBytesRead = 0L; - long lastMessagesRead = 0L; + long lastRecordsRead = 0L; long currentTimeMs = System.currentTimeMillis(); long joinStartMs = currentTimeMs; long startMs = currentTimeMs; - consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs, - bytesRead, messagesRead, lastBytesRead, lastMessagesRead, + consume(consumer, options, totalRecordsRead, totalBytesRead, joinTimeMs, + bytesRead, recordsRead, lastBytesRead, lastRecordsRead, joinStartMs, joinTimeMsInSingleRound); long endMs = System.currentTimeMillis(); @@ -94,12 +94,12 @@ public static void main(String[] args) { options.dateFormat().format(endMs), totalMbRead, totalMbRead / elapsedSec, - totalMessagesRead.get(), - totalMessagesRead.get() / elapsedSec, + totalRecordsRead.get(), + totalRecordsRead.get() / elapsedSec, joinTimeMs.get(), fetchTimeInMs, totalMbRead / (fetchTimeInMs / 1000.0), - totalMessagesRead.get() / (fetchTimeInMs / 1000.0) + totalRecordsRead.get() / (fetchTimeInMs / 1000.0) ); } @@ -122,16 +122,16 @@ protected static void printHeader(boolean showDetailedStats) { private static void consume(KafkaConsumer consumer, ConsumerPerfOptions options, - AtomicLong totalMessagesRead, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, AtomicLong joinTimeMs, long bytesRead, - long messagesRead, + long recordsRead, long lastBytesRead, - long lastMessagesRead, + long lastRecordsRead, long joinStartMs, AtomicLong joinTimeMsInSingleRound) { - long numMessages = options.numMessages(); + long numRecords = options.numRecords(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); long reportingIntervalMs = options.reportingIntervalMs(); boolean showDetailedStats = options.showDetailedStats(); @@ -149,55 +149,55 @@ private static void consume(KafkaConsumer consumer, long lastReportTimeMs = currentTimeMs; long lastConsumedTimeMs = currentTimeMs; - while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) { + while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); currentTimeMs = System.currentTimeMillis(); if (!records.isEmpty()) lastConsumedTimeMs = currentTimeMs; for (ConsumerRecord record : records) { - messagesRead += 1; + recordsRead += 1; if (record.key() != null) bytesRead += record.key().length; if (record.value() != null) bytesRead += record.value().length; if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) { if (showDetailedStats) - printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, + printConsumerProgress(0, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get()); joinTimeMsInSingleRound = new AtomicLong(0); lastReportTimeMs = currentTimeMs; - lastMessagesRead = messagesRead; + lastRecordsRead = recordsRead; lastBytesRead = bytesRead; } } } - if (messagesRead < numMessages) - System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " + + if (recordsRead < numRecords) + System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " + "You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs); - totalMessagesRead.set(messagesRead); + totalRecordsRead.set(recordsRead); totalBytesRead.set(bytesRead); } protected static void printConsumerProgress(int id, long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, SimpleDateFormat dateFormat, long joinTimeMsInSingleRound) { - printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat); - printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound); + printBasicProgress(id, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, dateFormat); + printExtendedProgress(bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound); System.out.println(); } private static void printBasicProgress(int id, long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, SimpleDateFormat dateFormat) { @@ -205,25 +205,25 @@ private static void printBasicProgress(int id, double totalMbRead = (bytesRead * 1.0) / (1024 * 1024); double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs; - double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0; + double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / elapsedMs) * 1000.0; System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id, - totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec); + totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec); } private static void printExtendedProgress(long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, long joinTimeMsInSingleRound) { long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound; double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); - long intervalMessagesRead = messagesRead - lastMessagesRead; + long intervalRecordsRead = recordsRead - lastRecordsRead; double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMbRead / fetchTimeMs; - double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMessagesRead / fetchTimeMs; + double intervalRecordsPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalRecordsRead / fetchTimeMs; System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound, - fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec); + fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec); } public static class ConsumerPerfRebListener implements ConsumerRebalanceListener { @@ -258,11 +258,15 @@ protected static class ConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec fetchSizeOpt; private final OptionSpec resetBeginningOffsetOpt; private final OptionSpec socketBufferSizeOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec consumerConfigOpt; + private final OptionSpec commandConfigOpt; private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec numMessagesOpt; + private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; private final OptionSpec dateFormatOpt; private final OptionSpec hideHeaderOpt; @@ -292,25 +296,35 @@ public ConsumerPerfOptions(String[] args) { .ofType(Integer.class) .defaultsTo(1024 * 1024); resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message."); + "offset to consume from, start with the latest record present in the log rather than the earliest record."); socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") .withRequiredArg() .describedAs("size") .ofType(Integer.class) .defaultsTo(2 * 1024 * 1024); - consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. " + + "This option will be removed in a future version. Use --command-config instead.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Config properties file.") .withRequiredArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval"); + "interval as configured by reporting-interval."); recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") .withOptionalArg() .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); - numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.") + numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " + + "This option will be removed in a future version. Use --num-records instead.") + .withRequiredArg() + .describedAs("count") + .ofType(Long.class); + numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.") .withRequiredArg() .describedAs("count") .ofType(Long.class); @@ -326,7 +340,7 @@ public ConsumerPerfOptions(String[] args) { .describedAs("date format") .ofType(String.class) .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS"); - hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats"); + hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats."); try { options = parser.parse(args); } catch (OptionException e) { @@ -335,8 +349,19 @@ public ConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); + + CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); + CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt); + + if (options.has(numMessagesOpt)) { + System.out.println("Warning: --messages is deprecated. Use --num-records instead."); + } + + if (options.has(consumerConfigOpt)) { + System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); + } } } @@ -349,8 +374,14 @@ public String brokerHostsAndPorts() { } public Properties props() throws IOException { - Properties props = (options.has(consumerConfigOpt)) - ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + String commandConfigFile; + if (options.has(consumerConfigOpt)) { + commandConfigFile = options.valueOf(consumerConfigOpt); + } else { + commandConfigFile = options.valueOf(commandConfigOpt); + } + Properties props = commandConfigFile != null + ? Utils.loadProps(commandConfigFile) : new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); @@ -378,8 +409,10 @@ public Optional include() { : Optional.empty(); } - public long numMessages() { - return options.valueOf(numMessagesOpt); + public long numRecords() { + return options.has(numMessagesOpt) + ? options.valueOf(numMessagesOpt) + : options.valueOf(numRecordsOpt); } public long reportingIntervalMs() { diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index bb0aaba89d585..4a2c1c3f50729 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -62,7 +62,7 @@ public static void main(String[] args) { try { LOG.info("Starting share consumer/consumers..."); ShareConsumerPerfOptions options = new ShareConsumerPerfOptions(args); - AtomicLong totalMessagesRead = new AtomicLong(0); + AtomicLong totalRecordsRead = new AtomicLong(0); AtomicLong totalBytesRead = new AtomicLong(0); if (!options.hideHeader()) @@ -73,7 +73,7 @@ public static void main(String[] args) { shareConsumers.add(new KafkaShareConsumer<>(options.props())); } long startMs = System.currentTimeMillis(); - consume(shareConsumers, options, totalMessagesRead, totalBytesRead, startMs); + consume(shareConsumers, options, totalRecordsRead, totalBytesRead, startMs); long endMs = System.currentTimeMillis(); List> shareConsumersMetrics = new ArrayList<>(); @@ -89,7 +89,7 @@ public static void main(String[] args) { // Print final stats for share group. double elapsedSec = (endMs - startMs) / 1_000.0; long fetchTimeInMs = endMs - startMs; - printStats(totalBytesRead.get(), totalMessagesRead.get(), elapsedSec, fetchTimeInMs, startMs, endMs, + printStats(totalBytesRead.get(), totalRecordsRead.get(), elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), -1); shareConsumersMetrics.forEach(ToolsUtils::printMetrics); @@ -108,15 +108,15 @@ protected static void printHeader() { private static void consume(List> shareConsumers, ShareConsumerPerfOptions options, - AtomicLong totalMessagesRead, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, long startMs) throws ExecutionException, InterruptedException, TimeoutException { - long numMessages = options.numMessages(); + long numRecords = options.numRecords(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic())); // Now start the benchmark. - AtomicLong messagesRead = new AtomicLong(0); + AtomicLong recordsRead = new AtomicLong(0); AtomicLong bytesRead = new AtomicLong(0); List shareConsumersConsumptionDetails = new ArrayList<>(); @@ -128,7 +128,7 @@ private static void consume(List> shareConsum ShareConsumerConsumption shareConsumerConsumption = new ShareConsumerConsumption(0, 0); futures.add(executorService.submit(() -> { try { - consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, bytesRead, options, + consumeRecordsForSingleShareConsumer(shareConsumers.get(index), recordsRead, bytesRead, options, shareConsumerConsumption, index + 1); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -166,22 +166,22 @@ private static void consume(List> shareConsum // Print stats for share consumer. double elapsedSec = (endMs - startMs) / 1_000.0; long fetchTimeInMs = endMs - startMs; - long messagesReadByConsumer = shareConsumersConsumptionDetails.get(index).messagesConsumed(); + long recordsReadByConsumer = shareConsumersConsumptionDetails.get(index).recordsConsumed(); long bytesReadByConsumer = shareConsumersConsumptionDetails.get(index).bytesConsumed(); - printStats(bytesReadByConsumer, messagesReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1); + printStats(bytesReadByConsumer, recordsReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1); } } - if (messagesRead.get() < numMessages) { - System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " + + if (recordsRead.get() < numRecords) { + System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " + "You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs); } - totalMessagesRead.set(messagesRead.get()); + totalRecordsRead.set(recordsRead.get()); totalBytesRead.set(bytesRead.get()); } - private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer shareConsumer, - AtomicLong totalMessagesRead, + private static void consumeRecordsForSingleShareConsumer(KafkaShareConsumer shareConsumer, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, ShareConsumerPerfOptions options, ShareConsumerConsumption shareConsumerConsumption, @@ -192,17 +192,17 @@ private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer records = shareConsumer.poll(Duration.ofMillis(100)); currentTimeMs = System.currentTimeMillis(); if (!records.isEmpty()) lastConsumedTimeMs = currentTimeMs; for (ConsumerRecord record : records) { - messagesReadByConsumer += 1; - totalMessagesRead.addAndGet(1); + recordsReadByConsumer += 1; + totalRecordsRead.addAndGet(1); if (record.key() != null) { bytesReadByConsumer += record.key().length; totalBytesRead.addAndGet(record.key().length); @@ -213,13 +213,13 @@ private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer= options.reportingIntervalMs()) { if (options.showDetailedStats()) - printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, messagesReadByConsumer, lastMessagesRead, + printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, recordsReadByConsumer, lastRecordsRead, lastReportTimeMs, currentTimeMs, dateFormat, index); lastReportTimeMs = currentTimeMs; - lastMessagesRead = messagesReadByConsumer; + lastRecordsRead = recordsReadByConsumer; lastBytesRead = bytesReadByConsumer; } - shareConsumerConsumption.updateMessagesConsumed(messagesReadByConsumer); + shareConsumerConsumption.updateRecordsConsumed(recordsReadByConsumer); shareConsumerConsumption.updateBytesConsumed(bytesReadByConsumer); } } @@ -227,8 +227,8 @@ private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer= 1. private static void printStats(long bytesRead, - long messagesRead, + long recordsRead, double elapsedSec, long fetchTimeInMs, long startMs, @@ -263,8 +263,8 @@ private static void printStats(long bytesRead, dateFormat.format(endMs), totalMbRead, totalMbRead / elapsedSec, - messagesRead / elapsedSec, - messagesRead, + recordsRead / elapsedSec, + recordsRead, fetchTimeInMs ); return; @@ -274,8 +274,8 @@ private static void printStats(long bytesRead, dateFormat.format(endMs), totalMbRead, totalMbRead / elapsedSec, - messagesRead / elapsedSec, - messagesRead, + recordsRead / elapsedSec, + recordsRead, fetchTimeInMs ); } @@ -286,11 +286,15 @@ protected static class ShareConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec groupIdOpt; private final OptionSpec fetchSizeOpt; private final OptionSpec socketBufferSizeOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec consumerConfigOpt; + private final OptionSpec commandConfigOpt; private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec numMessagesOpt; + private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; private final OptionSpec dateFormatOpt; private final OptionSpec hideHeaderOpt; @@ -322,19 +326,29 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("size") .ofType(Integer.class) .defaultsTo(2 * 1024 * 1024); - consumerConfigOpt = parser.accepts("consumer.config", "Share consumer config properties file.") + consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Share consumer config properties file. " + + "This option will be removed in a future version. Use --command-config instead.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Config properties file.") .withRequiredArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval"); + "interval as configured by reporting-interval."); recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") .withOptionalArg() .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); - numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.") + numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " + + "This option will be removed in a future version. Use --num-records instead.") + .withRequiredArg() + .describedAs("count") + .ofType(Long.class); + numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.") .withRequiredArg() .describedAs("count") .ofType(Long.class); @@ -350,7 +364,7 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("date format") .ofType(String.class) .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS"); - hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats"); + hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats."); numThreadsOpt = parser.accepts("threads", "The number of share consumers to use for sharing the load.") .withRequiredArg() .describedAs("count") @@ -366,7 +380,18 @@ public ShareConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the share consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, bootstrapServerOpt); + + CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); + CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt); + + if (options.has(numMessagesOpt)) { + System.out.println("Warning: --messages is deprecated. Use --num-records instead."); + } + + if (options.has(consumerConfigOpt)) { + System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); + } } } @@ -379,8 +404,14 @@ public String brokerHostsAndPorts() { } public Properties props() throws IOException { - Properties props = (options.has(consumerConfigOpt)) - ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + String commandConfigFile; + if (options.has(consumerConfigOpt)) { + commandConfigFile = options.valueOf(consumerConfigOpt); + } else { + commandConfigFile = options.valueOf(commandConfigOpt); + } + Properties props = commandConfigFile != null + ? Utils.loadProps(commandConfigFile) : new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); @@ -398,8 +429,10 @@ public Set topic() { return Set.of(options.valueOf(topicOpt)); } - public long numMessages() { - return options.valueOf(numMessagesOpt); + public long numRecords() { + return options.has(numMessagesOpt) + ? options.valueOf(numMessagesOpt) + : options.valueOf(numRecordsOpt); } public int threads() { @@ -434,26 +467,26 @@ public long recordFetchTimeoutMs() { } } - // Helper class to know the final messages and bytes consumer by share consumer at the end of consumption. + // Helper class to know the final records and bytes consumed by share consumer at the end of consumption. private static class ShareConsumerConsumption { - private long messagesConsumed; + private long recordsConsumed; private long bytesConsumed; - public ShareConsumerConsumption(long messagesConsumed, long bytesConsumed) { - this.messagesConsumed = messagesConsumed; + public ShareConsumerConsumption(long recordsConsumed, long bytesConsumed) { + this.recordsConsumed = recordsConsumed; this.bytesConsumed = bytesConsumed; } - public long messagesConsumed() { - return messagesConsumed; + public long recordsConsumed() { + return recordsConsumed; } public long bytesConsumed() { return bytesConsumed; } - public void updateMessagesConsumed(long messagesConsumed) { - this.messagesConsumed = messagesConsumed; + public void updateRecordsConsumed(long recordsConsumed) { + this.recordsConsumed = recordsConsumed; } public void updateBytesConsumed(long bytesConsumed) { diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index d78b65e54a31d..badec8aaf7199 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -68,7 +68,7 @@ public void testConfigBootStrapServer() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", "--print-metrics" }; @@ -76,15 +76,64 @@ public void testConfigBootStrapServer() { assertEquals("localhost:9092", config.brokerHostsAndPorts()); assertTrue(config.topic().get().contains("test")); - assertEquals(10, config.numMessages()); + assertEquals(10, config.numRecords()); } @Test - public void testConfigWithUnrecognizedOption() { + public void testBootstrapServerNotPresent() { + String[] args = new String[]{ + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Missing required argument \"[bootstrap-server]\"")); + } + + @Test + public void testNumOfRecordsNotPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required:")); + } + + @Test + public void testMessagesDeprecated() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + assertEquals(10, config.numRecords()); + } + + @Test + public void testNumOfRecordsWithMessagesPresent() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", "--messages", "10", + "--num-records", "20" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required")); + } + + @Test + public void testConfigWithUnrecognizedOption() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", "--new-consumer" }; @@ -98,14 +147,14 @@ public void testConfigWithInclude() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--include", "test.*", - "--messages", "10" + "--num-records", "10" }; ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); assertEquals("localhost:9092", config.brokerHostsAndPorts()); assertTrue(config.include().get().toString().contains("test.*")); - assertEquals(10, config.numMessages()); + assertEquals(10, config.numRecords()); } @Test @@ -114,7 +163,7 @@ public void testConfigWithTopicAndInclude() { "--bootstrap-server", "localhost:9092", "--topic", "test", "--include", "test.*", - "--messages", "10" + "--num-records", "10" }; String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); @@ -126,7 +175,7 @@ public void testConfigWithTopicAndInclude() { public void testConfigWithoutTopicAndInclude() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", - "--messages", "10" + "--num-records", "10" }; String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); @@ -136,7 +185,9 @@ public void testConfigWithoutTopicAndInclude() { @Test public void testClientIdOverride() throws IOException { - File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile(); + Path configPath = tempDir.resolve("test_client_id_override_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { output.println("client.id=consumer-1"); output.flush(); @@ -145,13 +196,53 @@ public void testClientIdOverride() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", + "--command-config", tempFile.getAbsolutePath() + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + Files.deleteIfExists(configPath); + } + + @Test + public void testConsumerConfigDeprecated() throws IOException { + Path configPath = tempDir.resolve("test_consumer_config_deprecated_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", "--consumer.config", tempFile.getAbsolutePath() }; ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + Files.deleteIfExists(configPath); + } + + @Test + public void testCommandConfigWithConsumerConfigPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--consumer.config", "some-path", + "--command-config", "some-path" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains(String.format("Option \"%s\" can't be used with option \"%s\"", + "[consumer.config]", "[command-config]"))); } @Test @@ -159,7 +250,7 @@ public void testDefaultClientId() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10" + "--num-records", "10" }; ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java index 853d79a7e3b35..e3f7a62d9dac3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -62,7 +62,7 @@ public void testConfigBootStrapServer() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", "--print-metrics" }; @@ -70,15 +70,65 @@ public void testConfigBootStrapServer() { assertEquals("localhost:9092", config.brokerHostsAndPorts()); assertTrue(config.topic().contains("test")); - assertEquals(10, config.numMessages()); + assertEquals(10, config.numRecords()); } @Test - public void testConfigWithUnrecognizedOption() { + public void testBootstrapServerNotPresent() { + String[] args = new String[]{ + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Missing required argument \"[bootstrap-server]\"")); + } + + @Test + public void testNumOfRecordsNotPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required:")); + } + + @Test + public void testMessagesDeprecated() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = + new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + assertEquals(10, config.numRecords()); + } + + @Test + public void testNumOfRecordsWithMessagesPresent() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", "--messages", "10", + "--num-records", "20" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required")); + } + + @Test + public void testConfigWithUnrecognizedOption() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", "--new-share-consumer" }; @@ -89,7 +139,9 @@ public void testConfigWithUnrecognizedOption() { @Test public void testClientIdOverride() throws IOException { - File tempFile = Files.createFile(tempDir.resolve("test_share_consumer_config.conf")).toFile(); + Path configPath = tempDir.resolve("test_client_id_override_share_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { output.println("client.id=share-consumer-1"); output.flush(); @@ -98,8 +150,8 @@ public void testClientIdOverride() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", - "--consumer.config", tempFile.getAbsolutePath() + "--num-records", "10", + "--command-config", tempFile.getAbsolutePath() }; ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); @@ -107,12 +159,51 @@ public void testClientIdOverride() throws IOException { assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); } + @Test + public void testConsumerConfigDeprecated() throws IOException { + Path configPath = tempDir.resolve("test_consumer_config_deprecated_share_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=share-consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--consumer.config", tempFile.getAbsolutePath() + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = + new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + + assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testCommandConfigWithConsumerConfigPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--consumer.config", "some-path", + "--command-config", "some-path" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains(String.format("Option \"%s\" can't be used with option \"%s\"", + "[consumer.config]", "[command-config]"))); + } + @Test public void testDefaultClientId() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10" + "--num-records", "10" }; ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args);