3434import org .apache .kafka .common .KafkaFuture ;
3535import org .apache .kafka .common .TopicPartition ;
3636import org .apache .kafka .common .errors .InterruptException ;
37+ import org .apache .kafka .common .header .Header ;
3738import org .apache .kafka .common .serialization .ByteArrayDeserializer ;
3839import org .apache .kafka .common .serialization .StringDeserializer ;
3940import org .apache .kafka .common .utils .ThreadUtils ;
@@ -92,10 +93,10 @@ public ConsumerService(String bootstrapServer, Properties properties) {
9293 * @param config consumer configuration
9394 * @return the number of consumers created
9495 */
95- public int createConsumers (List <Topic > topics , ConsumersConfig config ) {
96+ public int createConsumers (List <Topic > topics , ConsumersConfig config , Stats stats ) {
9697 int count = 0 ;
9798 for (int g = 0 ; g < config .groupsPerTopic ; g ++) {
98- Group group = new Group (g , config .consumersPerGroup , topics , config );
99+ Group group = new Group (g , config .consumersPerGroup , topics , config , stats );
99100 groups .add (group );
100101 count += group .consumerCount ();
101102 }
@@ -104,40 +105,36 @@ public int createConsumers(List<Topic> topics, ConsumersConfig config) {
104105
105106 public void start (ConsumerCallback callback , int pollRate ) {
106107 BlockingBucket bucket = rateLimitBucket (pollRate );
107- ConsumerCallback callbackWithRateLimit = (tp , p , st ) -> {
108- callback .messageReceived (tp , p , st );
109- bucket .consume (1 );
110- };
111108 CompletableFuture .allOf (
112109 groups .stream ()
113- .map (group -> group .start (callbackWithRateLimit ))
110+ .map (group -> group .start (callback , bucket ))
114111 .toArray (CompletableFuture []::new )
115112 ).join ();
116113 }
117114
118115 public void pause () {
119116 groups .forEach (Group ::pause );
120117 }
121-
118+
122119 /**
123120 * Resume all consumer groups
124121 */
125122 public void resume () {
126123 groups .forEach (Group ::resume );
127124 }
128-
125+
129126 /**
130127 * Resume only a percentage of consumer groups
131- *
128+ *
132129 * @param percentage The percentage of consumers to resume (0-100)
133130 */
134131 public void resume (int percentage ) {
135132 int size = groups .size ();
136133 int consumersToResume = (int ) Math .ceil (size * (percentage / 100.0 ));
137134 consumersToResume = Math .max (1 , Math .min (size , consumersToResume )); // Ensure at least 1 and at most size
138-
135+
139136 LOGGER .info ("Resuming {}% of consumers ({} out of {})" , percentage , consumersToResume , size );
140-
137+
141138 for (int i = 0 ; i < consumersToResume ; i ++) {
142139 groups .get (i ).resume ();
143140 }
@@ -177,7 +174,7 @@ public void resumeTopics(Collection<String> topics) {
177174
178175 /**
179176 * Reset consumer offsets for catch-up reading.
180- *
177+ *
181178 * @param startMillis The timestamp to start seeking from
182179 * @param intervalMillis The interval between group starts
183180 * @param percentage The percentage of consumers to activate (0-100)
@@ -187,21 +184,21 @@ public void resetOffset(long startMillis, long intervalMillis, int percentage) {
187184 int size = groups .size ();
188185 int consumersToActivate = (int ) Math .ceil (size * (percentage / 100.0 ));
189186 consumersToActivate = Math .max (1 , Math .min (size , consumersToActivate )); // Ensure at least 1 and at most size
190-
187+
191188 LOGGER .info ("Activating {}% of consumers ({} out of {})" , percentage , consumersToActivate , size );
192-
189+
193190 for (int i = 0 ; i < consumersToActivate ; i ++) {
194191 Group group = groups .get (i );
195192 group .seek (timestamp .getAndAdd (intervalMillis ));
196193 LOGGER .info ("Reset consumer group offsets: {}/{}" , i + 1 , consumersToActivate );
197194 }
198-
195+
199196 // Keep the remaining consumers paused
200197 if (consumersToActivate < size ) {
201198 LOGGER .info ("Keeping {} consumer groups paused during catch-up" , size - consumersToActivate );
202199 }
203200 }
204-
201+
205202 /**
206203 * Reset all consumer offsets (100% consumers)
207204 * @param startMillis The timestamp to start seeking from
@@ -238,10 +235,8 @@ public interface ConsumerCallback {
238235 * Called when a message is received.
239236 *
240237 * @param topicPartition the topic partition of the received message
241- * @param payload the received message payload
242- * @param sendTimeNanos the time in nanoseconds when the message was sent
243238 */
244- void messageReceived (TopicPartition topicPartition , byte [] payload , long sendTimeNanos ) throws InterruptedException ;
239+ void messageReceived (TopicPartition topicPartition ) throws InterruptedException ;
245240 }
246241
247242 public static class ConsumersConfig {
@@ -263,22 +258,22 @@ private class Group implements AutoCloseable {
263258 private final int index ;
264259 private final Map <Topic , List <Consumer >> consumers = new HashMap <>();
265260
266- public Group (int index , int consumersPerGroup , List <Topic > topics , ConsumersConfig config ) {
261+ public Group (int index , int consumersPerGroup , List <Topic > topics , ConsumersConfig config , Stats stats ) {
267262 this .index = index ;
268263
269264 Properties common = toProperties (config );
270265 for (Topic topic : topics ) {
271266 List <Consumer > topicConsumers = new ArrayList <>();
272267 for (int c = 0 ; c < consumersPerGroup ; c ++) {
273- Consumer consumer = newConsumer (topic , common );
268+ Consumer consumer = newConsumer (topic , common , stats );
274269 topicConsumers .add (consumer );
275270 }
276271 consumers .put (topic , topicConsumers );
277272 }
278273 }
279274
280- public CompletableFuture <Void > start (ConsumerCallback callback ) {
281- consumers ().forEach (consumer -> consumer .start (callback ));
275+ public CompletableFuture <Void > start (ConsumerCallback callback , BlockingBucket bucket ) {
276+ consumers ().forEach (consumer -> consumer .start (callback , bucket ));
282277
283278 // wait for all consumers to join the group
284279 return CompletableFuture .allOf (consumers ()
@@ -336,11 +331,11 @@ private Properties toProperties(ConsumersConfig config) {
336331 return properties ;
337332 }
338333
339- private Consumer newConsumer (Topic topic , Properties common ) {
334+ private Consumer newConsumer (Topic topic , Properties common , Stats stats ) {
340335 Properties properties = new Properties ();
341336 properties .putAll (common );
342337 properties .put (ConsumerConfig .GROUP_ID_CONFIG , groupId (topic ));
343- return new Consumer (properties , topic .name );
338+ return new Consumer (properties , topic .name , stats );
344339 }
345340
346341 private Stream <Consumer > consumers () {
@@ -369,16 +364,17 @@ private static class Consumer {
369364 private final CompletableFuture <Void > started = new CompletableFuture <>();
370365 private boolean paused = false ;
371366 private volatile boolean closing = false ;
367+ private final Stats stats ;
372368
373- public Consumer (Properties properties , String topic ) {
369+ public Consumer (Properties properties , String topic , Stats stats ) {
374370 this .consumer = new KafkaConsumer <>(properties );
375371 this .executor = Executors .newSingleThreadExecutor (ThreadUtils .createThreadFactory ("perf-consumer" , false ));
376-
372+ this . stats = stats ;
377373 consumer .subscribe (List .of (topic ), subscribeListener ());
378374 }
379375
380- public void start (ConsumerCallback callback ) {
381- this .task = this .executor .submit (() -> pollRecords (consumer , callback ));
376+ public void start (ConsumerCallback callback , BlockingBucket bucket ) {
377+ this .task = this .executor .submit (() -> pollRecords (consumer , callback , bucket ));
382378 }
383379
384380 public CompletableFuture <Void > started () {
@@ -408,18 +404,28 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
408404 };
409405 }
410406
411- private void pollRecords (KafkaConsumer <String , byte []> consumer , ConsumerCallback callback ) {
407+ private void pollRecords (KafkaConsumer <String , byte []> consumer , ConsumerCallback callback , BlockingBucket bucket ) {
412408 while (!closing ) {
413409 try {
414410 while (paused ) {
415411 Thread .sleep (PAUSE_INTERVAL );
416412 }
417413 ConsumerRecords <String , byte []> records = consumer .poll (POLL_TIMEOUT );
414+ int numMessages = records .count ();
415+ if (numMessages == 0 ) {
416+ continue ;
417+ }
418+ ConsumerRecord <String , byte []> firstRecord = records .iterator ().next ();
419+ Header header = firstRecord .headers ().lastHeader (HEADER_KEY_SEND_TIME_NANOS );
420+ long bytes = 0 ;
421+ long sendTimeNanos = Longs .fromByteArray (header .value ());
418422 for (ConsumerRecord <String , byte []> record : records ) {
419- long sendTimeNanos = Longs .fromByteArray (record .headers ().lastHeader (HEADER_KEY_SEND_TIME_NANOS ).value ());
420423 TopicPartition topicPartition = new TopicPartition (record .topic (), record .partition ());
421- callback .messageReceived (topicPartition , record .value (), sendTimeNanos );
424+ bytes += record .value ().length ;
425+ callback .messageReceived (topicPartition );
422426 }
427+ stats .messageReceived (numMessages , bytes , sendTimeNanos );
428+ bucket .consume (records .count ());
423429 } catch (InterruptException | InterruptedException e ) {
424430 // ignore, as we are closing
425431 } catch (Exception e ) {
0 commit comments