@@ -76,7 +76,12 @@ void start(String[] args) throws IOException {
7676 // not thread-safe, do not share with other threads
7777 SplittableRandom random = new SplittableRandom (0 );
7878 ProducerRecord <byte [], byte []> record ;
79- stats = new Stats (config .numRecords , 5000 );
79+
80+ if (config .warmupRecords > 0 ) {
81+ System .out .println ("Warmup first " + config .warmupRecords + " records. Steady state results will print after the complete test summary." );
82+ }
83+ boolean isSteadyState = false ;
84+ stats = new Stats (config .numRecords , isSteadyState );
8085 long startMs = System .currentTimeMillis ();
8186
8287 ThroughputThrottler throttler = new ThroughputThrottler (config .throughput , startMs );
@@ -95,7 +100,11 @@ void start(String[] args) throws IOException {
95100 record = new ProducerRecord <>(config .topicName , payload );
96101
97102 long sendStartMs = System .currentTimeMillis ();
98- cb = new PerfCallback (sendStartMs , payload .length , stats );
103+ if ((isSteadyState = config .warmupRecords > 0 ) && i == config .warmupRecords ) {
104+ steadyStateStats = new Stats (config .numRecords - config .warmupRecords , isSteadyState );
105+ stats .suppressPrinting ();
106+ }
107+ cb = new PerfCallback (sendStartMs , payload .length , stats , steadyStateStats );
99108 producer .send (record , cb );
100109
101110 currentTransactionSize ++;
@@ -117,6 +126,10 @@ record = new ProducerRecord<>(config.topicName, payload);
117126
118127 /* print final results */
119128 stats .printTotal ();
129+ /* print steady-state stats if relevant */
130+ if (steadyStateStats != null ) {
131+ steadyStateStats .printTotal ();
132+ }
120133 } else {
121134 // Make sure all messages are sent before printing out the stats and the metrics
122135 // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
@@ -125,6 +138,10 @@ record = new ProducerRecord<>(config.topicName, payload);
125138
126139 /* print final results */
127140 stats .printTotal ();
141+ /* print steady-state stats if relevant */
142+ if (steadyStateStats != null ) {
143+ steadyStateStats .printTotal ();
144+ }
128145
129146 /* print out metrics */
130147 ToolsUtils .printMetrics (producer .metrics ());
@@ -147,8 +164,8 @@ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
147164 }
148165
149166 Callback cb ;
150-
151167 Stats stats ;
168+ Stats steadyStateStats ;
152169
153170 static byte [] generateRandomPayload (Integer recordSize , List <byte []> payloadByteList , byte [] payload ,
154171 SplittableRandom random , boolean payloadMonotonic , long recordValue ) {
@@ -164,7 +181,7 @@ static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByte
164181 }
165182 return payload ;
166183 }
167-
184+
168185 static Properties readProps (List <String > producerProps , String producerConfig ) throws IOException {
169186 Properties props = new Properties ();
170187 if (producerConfig != null ) {
@@ -331,6 +348,16 @@ static ArgumentParser argParser() {
331348 "--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " +
332349 "the default value will be 3000." );
333350
351+ parser .addArgument ("--warmup-records" )
352+ .action (store ())
353+ .required (false )
354+ .type (Long .class )
355+ .metavar ("WARMUP-RECORDS" )
356+ .dest ("warmupRecords" )
357+ .setDefault (0L )
358+ .help ("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " +
359+ "An additional summary line will be printed describing the steady-state statistics. (default: 0)." );
360+
334361 return parser ;
335362 }
336363
@@ -351,8 +378,10 @@ static class Stats {
351378 private long windowTotalLatency ;
352379 private long windowBytes ;
353380 private long windowStart ;
381+ private final boolean isSteadyState ;
382+ private boolean suppressPrint ;
354383
355- public Stats (long numRecords , int reportingInterval ) {
384+ public Stats (long numRecords , boolean isSteadyState ) {
356385 this .start = System .currentTimeMillis ();
357386 this .windowStart = System .currentTimeMillis ();
358387 this .iteration = 0 ;
@@ -365,7 +394,9 @@ public Stats(long numRecords, int reportingInterval) {
365394 this .windowTotalLatency = 0 ;
366395 this .windowBytes = 0 ;
367396 this .totalLatency = 0 ;
368- this .reportingInterval = reportingInterval ;
397+ this .reportingInterval = 5000 ;
398+ this .isSteadyState = isSteadyState ;
399+ this .suppressPrint = false ;
369400 }
370401
371402 public void record (int latency , int bytes , long time ) {
@@ -383,9 +414,15 @@ public void record(int latency, int bytes, long time) {
383414 }
384415 /* maybe report the recent perf */
385416 if (time - windowStart >= reportingInterval ) {
386- printWindow ();
417+ if (this .isSteadyState && count == windowCount ) {
418+ System .out .println ("In steady state." );
419+ }
420+ if (!this .suppressPrint ) {
421+ printWindow ();
422+ }
387423 newWindow ();
388424 }
425+ this .iteration ++;
389426 }
390427
391428 public long totalCount () {
@@ -433,8 +470,9 @@ public void printTotal() {
433470 double recsPerSec = 1000.0 * count / (double ) elapsed ;
434471 double mbPerSec = 1000.0 * this .bytes / (double ) elapsed / (1024.0 * 1024.0 );
435472 int [] percs = percentiles (this .latencies , index , 0.5 , 0.95 , 0.99 , 0.999 );
436- System .out .printf ("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n" ,
473+ System .out .printf ("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n" ,
437474 count ,
475+ this .isSteadyState ? " steady state" : "" ,
438476 recsPerSec ,
439477 mbPerSec ,
440478 totalLatency / (double ) count ,
@@ -455,16 +493,22 @@ private static int[] percentiles(int[] latencies, int count, double... percentil
455493 }
456494 return values ;
457495 }
496+
497+ public void suppressPrinting () {
498+ this .suppressPrint = true ;
499+ }
458500 }
459501
460502 static final class PerfCallback implements Callback {
461503 private final long start ;
462504 private final int bytes ;
463505 private final Stats stats ;
506+ private final Stats steadyStateStats ;
464507
465- public PerfCallback (long start , int bytes , Stats stats ) {
508+ public PerfCallback (long start , int bytes , Stats stats , Stats steadyStateStats ) {
466509 this .start = start ;
467510 this .stats = stats ;
511+ this .steadyStateStats = steadyStateStats ;
468512 this .bytes = bytes ;
469513 }
470514
@@ -475,7 +519,9 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
475519 // magically printed when the sending fails.
476520 if (exception == null ) {
477521 this .stats .record (latency , bytes , now );
478- this .stats .iteration ++;
522+ if (steadyStateStats != null ) {
523+ this .steadyStateStats .record (latency , bytes , now );
524+ }
479525 }
480526 if (exception != null )
481527 exception .printStackTrace ();
@@ -484,7 +530,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
484530
485531 static final class ConfigPostProcessor {
486532 final String topicName ;
487- final Long numRecords ;
533+ final long numRecords ;
534+ final long warmupRecords ;
488535 final Integer recordSize ;
489536 final double throughput ;
490537 final boolean payloadMonotonic ;
@@ -498,6 +545,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
498545 Namespace namespace = parser .parseArgs (args );
499546 this .topicName = namespace .getString ("topic" );
500547 this .numRecords = namespace .getLong ("numRecords" );
548+ this .warmupRecords = Math .max (namespace .getLong ("warmupRecords" ), 0 );
501549 this .recordSize = namespace .getInt ("recordSize" );
502550 this .throughput = namespace .getDouble ("throughput" );
503551 this .payloadMonotonic = namespace .getBoolean ("payloadMonotonic" );
@@ -508,9 +556,12 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
508556 String payloadFilePath = namespace .getString ("payloadFile" );
509557 Long transactionDurationMsArg = namespace .getLong ("transactionDurationMs" );
510558 String transactionIdArg = namespace .getString ("transactionalId" );
511- if (numRecords != null && numRecords <= 0 ) {
559+ if (numRecords <= 0 ) {
512560 throw new ArgumentParserException ("--num-records should be greater than zero" , parser );
513561 }
562+ if (warmupRecords >= numRecords ) {
563+ throw new ArgumentParserException ("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records." , parser );
564+ }
514565 if (recordSize != null && recordSize <= 0 ) {
515566 throw new ArgumentParserException ("--record-size should be greater than zero" , parser );
516567 }
0 commit comments