Skip to content

KAFKA-19624: Improving consistency of command-line arguments for consumer performance tests #20385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/kafkatest/services/performance/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService):
"socket-buffer-size", "The size of the tcp RECV size."

"new-consumer", "Use the new consumer implementation."
"consumer.config", "Consumer config properties file."
"command-config", "Consumer config properties file."
"""

# Root directory for persistent output
Expand Down Expand Up @@ -115,7 +115,7 @@ def start_cmd(self, node):
for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value)

cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
cmd += " --command-config %s" % ConsumerPerformanceService.CONFIG_FILE

for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService):

"socket-buffer-size", "The size of the tcp RECV size."

"consumer.config", "Consumer config properties file."
"command-config", "Share consumer config properties file."
"""

# Root directory for persistent output
Expand Down Expand Up @@ -100,7 +100,7 @@ def start_cmd(self, node):
for key, value in self.args().items():
cmd += " --%s %s" % (key, value)

cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE
cmd += " --command-config %s" % ShareConsumerPerformanceService.CONFIG_FILE

for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
Expand Down
102 changes: 60 additions & 42 deletions tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void main(String[] args) {
try {
LOG.info("Starting consumer...");
ConsumerPerfOptions options = new ConsumerPerfOptions(args);
AtomicLong totalMessagesRead = new AtomicLong(0);
AtomicLong totalRecordsRead = new AtomicLong(0);
AtomicLong totalBytesRead = new AtomicLong(0);
AtomicLong joinTimeMs = new AtomicLong(0);
AtomicLong joinTimeMsInSingleRound = new AtomicLong(0);
Expand All @@ -66,14 +66,14 @@ public static void main(String[] args) {

KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(options.props());
long bytesRead = 0L;
long messagesRead = 0L;
long recordsRead = 0L;
long lastBytesRead = 0L;
long lastMessagesRead = 0L;
long lastRecordsRead = 0L;
long currentTimeMs = System.currentTimeMillis();
long joinStartMs = currentTimeMs;
long startMs = currentTimeMs;
consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs,
bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
consume(consumer, options, totalRecordsRead, totalBytesRead, joinTimeMs,
bytesRead, recordsRead, lastBytesRead, lastRecordsRead,
joinStartMs, joinTimeMsInSingleRound);
long endMs = System.currentTimeMillis();

Expand All @@ -92,12 +92,12 @@ public static void main(String[] args) {
options.dateFormat().format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
totalMessagesRead.get(),
totalMessagesRead.get() / elapsedSec,
totalRecordsRead.get(),
totalRecordsRead.get() / elapsedSec,
joinTimeMs.get(),
fetchTimeInMs,
totalMbRead / (fetchTimeInMs / 1000.0),
totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
totalRecordsRead.get() / (fetchTimeInMs / 1000.0)
);
}

Expand All @@ -120,16 +120,16 @@ protected static void printHeader(boolean showDetailedStats) {

private static void consume(KafkaConsumer<byte[], byte[]> consumer,
ConsumerPerfOptions options,
AtomicLong totalMessagesRead,
AtomicLong totalRecordsRead,
AtomicLong totalBytesRead,
AtomicLong joinTimeMs,
long bytesRead,
long messagesRead,
long recordsRead,
long lastBytesRead,
long lastMessagesRead,
long lastRecordsRead,
long joinStartMs,
AtomicLong joinTimeMsInSingleRound) {
long numMessages = options.numMessages();
long numRecords = options.numRecords();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
Expand All @@ -142,81 +142,81 @@ private static void consume(KafkaConsumer<byte[], byte[]> consumer,
long lastReportTimeMs = currentTimeMs;
long lastConsumedTimeMs = currentTimeMs;

while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
currentTimeMs = System.currentTimeMillis();
if (!records.isEmpty())
lastConsumedTimeMs = currentTimeMs;
for (ConsumerRecord<byte[], byte[]> record : records) {
messagesRead += 1;
recordsRead += 1;
if (record.key() != null)
bytesRead += record.key().length;
if (record.value() != null)
bytesRead += record.value().length;
if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) {
if (showDetailedStats)
printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
printConsumerProgress(0, bytesRead, lastBytesRead, recordsRead, lastRecordsRead,
lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get());
joinTimeMsInSingleRound = new AtomicLong(0);
lastReportTimeMs = currentTimeMs;
lastMessagesRead = messagesRead;
lastRecordsRead = recordsRead;
lastBytesRead = bytesRead;
}
}
}

if (messagesRead < numMessages)
System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " +
if (recordsRead < numRecords)
System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " +
"You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs);
totalMessagesRead.set(messagesRead);
totalRecordsRead.set(recordsRead);
totalBytesRead.set(bytesRead);
}

protected static void printConsumerProgress(int id,
long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat,
long joinTimeMsInSingleRound) {
printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
printBasicProgress(id, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, dateFormat);
printExtendedProgress(bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound);
System.out.println();
}

private static void printBasicProgress(int id,
long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat) {
double elapsedMs = endMs - startMs;
double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0;
double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / elapsedMs) * 1000.0;
System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id,
totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec);
totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec);
}

private static void printExtendedProgress(long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
long joinTimeMsInSingleRound) {
long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
long intervalMessagesRead = messagesRead - lastMessagesRead;
long intervalRecordsRead = recordsRead - lastRecordsRead;
double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMbRead / fetchTimeMs;
double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMessagesRead / fetchTimeMs;
double intervalRecordsPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalRecordsRead / fetchTimeMs;
System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound,
fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec);
}

public static class ConsumerPerfRebListener implements ConsumerRebalanceListener {
Expand Down Expand Up @@ -250,11 +250,14 @@ protected static class ConsumerPerfOptions extends CommandDefaultOptions {
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
// Deprecated option, kept for backward compatibility
// and will be removed in a future version.
private final OptionSpec<String> consumerConfigOpt;
private final OptionSpec<String> commandConfigOpt;
private final OptionSpec<Void> printMetricsOpt;
private final OptionSpec<Void> showDetailedStatsOpt;
private final OptionSpec<Long> recordFetchTimeoutOpt;
private final OptionSpec<Long> numMessagesOpt;
private final OptionSpec<Long> numRecordsOpt;
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<Void> hideHeaderOpt;
Expand All @@ -280,14 +283,19 @@ public ConsumerPerfOptions(String[] args) {
.ofType(Integer.class)
.defaultsTo(1024 * 1024);
resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
"offset to consume from, start with the latest message present in the log rather than the earliest message.");
"offset to consume from, start with the latest record present in the log rather than the earliest record.");
socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(2 * 1024 * 1024);
consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
.withRequiredArg()
consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. " +
"This option will be removed in a future version. Use --command-config instead.")
.withOptionalArg()
.describedAs("config file")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "Consumer config properties file")
.withOptionalArg()
.describedAs("config file")
.ofType(String.class);
printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics.");
Expand All @@ -298,7 +306,7 @@ public ConsumerPerfOptions(String[] args) {
.describedAs("milliseconds")
.ofType(Long.class)
.defaultsTo(10_000L);
numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.")
numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
Expand All @@ -323,7 +331,10 @@ public ConsumerPerfOptions(String[] args) {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance.");
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt);
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numRecordsOpt);
if (!options.has(consumerConfigOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, commandConfigOpt);
}
}
}

Expand All @@ -336,8 +347,15 @@ public String brokerHostsAndPorts() {
}

public Properties props() throws IOException {
Properties props = (options.has(consumerConfigOpt))
? Utils.loadProps(options.valueOf(consumerConfigOpt))
String commandConfigFile;
if (options.has(consumerConfigOpt)) {
System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead.");
commandConfigFile = options.valueOf(consumerConfigOpt);
} else {
commandConfigFile = options.valueOf(commandConfigOpt);
}
Properties props = commandConfigFile != null
? Utils.loadProps(commandConfigFile)
: new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt));
Expand All @@ -357,8 +375,8 @@ public Set<String> topic() {
return Set.of(options.valueOf(topicOpt));
}

public long numMessages() {
return options.valueOf(numMessagesOpt);
public long numRecords() {
return options.valueOf(numRecordsOpt);
}

public long reportingIntervalMs() {
Expand Down
Loading