1919import  java .util .ArrayList ;
2020import  java .util .Arrays ;
2121import  java .util .List ;
22+ import  java .util .Map ;
2223import  java .util .concurrent .CountDownLatch ;
2324import  java .util .concurrent .TimeUnit ;
2425import  java .util .concurrent .atomic .AtomicInteger ;
3738import  io .micrometer .tracing .handler .PropagatingSenderTracingObservationHandler ;
3839import  io .micrometer .tracing .propagation .Propagator ;
3940import  io .micrometer .tracing .test .simple .SimpleTracer ;
41+ import  org .apache .kafka .clients .consumer .ConsumerConfig ;
4042import  org .apache .kafka .clients .consumer .ConsumerRecord ;
4143import  org .jspecify .annotations .Nullable ;
4244import  org .junit .jupiter .api .Test ;
6466 * Tests for batch individual record observation functionality. 
6567 * 
6668 * @author Igor Quintanilha 
69+  * @author Artem Bilan 
70+  * 
6771 * @since 4.0 
6872 */ 
6973@ SpringJUnitConfig 
@@ -78,8 +82,8 @@ public class BatchIndividualRecordObservationTests {
7882
7983	@ Test 
8084	void  batchIndividualRecordObservationCreatesObservationPerRecord (@ Autowired  BatchListener  listener ,
81- 																		 @ Autowired  KafkaTemplate <Integer , String > template , @ Autowired  TestObservationHandler  observationHandler ,
82- 																		 @ Autowired  SimpleTracer  tracer )
85+ 			@ Autowired  KafkaTemplate <Integer , String > template , @ Autowired  TestObservationHandler  observationHandler ,
86+ 			@ Autowired  SimpleTracer  tracer )
8387			throws  InterruptedException  {
8488
8589		// Clear any existing observations and spans 
@@ -147,8 +151,9 @@ void batchIndividualRecordObservationCreatesObservationPerRecord(@Autowired Batc
147151	}
148152
149153	@ Test 
150- 	void  batchIndividualRecordObservationDisabledCreatesNoIndividualObservations (@ Autowired  BatchListenerWithoutIndividualObservation  batchListener ,
151- 																					@ Autowired  KafkaTemplate <Integer , String > template , @ Autowired  TestObservationHandler  observationHandler )
154+ 	void  batchIndividualRecordObservationDisabledCreatesNoIndividualObservations (
155+ 			@ Autowired  BatchListenerWithoutIndividualObservation  batchListener ,
156+ 			@ Autowired  KafkaTemplate <Integer , String > template , @ Autowired  TestObservationHandler  observationHandler )
152157			throws  InterruptedException  {
153158
154159		// Clear any existing observations 
@@ -181,12 +186,16 @@ ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
181186
182187		@ Bean 
183188		ConsumerFactory <Integer , String > consumerFactory (EmbeddedKafkaBroker  broker ) {
184- 			return  new  DefaultKafkaConsumerFactory <>(
185- 					KafkaTestUtils .consumerProps (broker , "batch-tests" , false ));
189+ 			Map <String , Object > configs  = KafkaTestUtils .consumerProps (broker , "batch-tests" , false );
190+ 			configs .put (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , 54 );
191+ 			configs .put (ConsumerConfig .FETCH_MAX_WAIT_MS_CONFIG , 1000 );
192+ 			return  new  DefaultKafkaConsumerFactory <>(configs );
186193		}
187194
188195		@ Bean 
189- 		KafkaTemplate <Integer , String > template (ProducerFactory <Integer , String > pf , ObservationRegistry  observationRegistry ) {
196+ 		KafkaTemplate <Integer , String > template (ProducerFactory <Integer , String > pf ,
197+ 				ObservationRegistry  observationRegistry ) {
198+ 
190199			KafkaTemplate <Integer , String > template  = new  KafkaTemplate <>(pf );
191200			template .setObservationEnabled (true );
192201			template .setObservationRegistry (observationRegistry );
@@ -196,6 +205,7 @@ KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf, Obs
196205		@ Bean 
197206		ConcurrentKafkaListenerContainerFactory <Integer , String > kafkaListenerContainerFactory (
198207				ConsumerFactory <Integer , String > cf ) {
208+ 
199209			ConcurrentKafkaListenerContainerFactory <Integer , String > factory  =
200210					new  ConcurrentKafkaListenerContainerFactory <>();
201211			factory .setConsumerFactory (cf );
@@ -208,6 +218,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
208218		@ Bean 
209219		ConcurrentKafkaListenerContainerFactory <Integer , String > observationListenerContainerFactory (
210220				ConsumerFactory <Integer , String > cf , ObservationRegistry  observationRegistry ) {
221+ 
211222			ConcurrentKafkaListenerContainerFactory <Integer , String > factory  =
212223					new  ConcurrentKafkaListenerContainerFactory <>();
213224			factory .setConsumerFactory (cf );
@@ -221,6 +232,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> observationListenerCont
221232		@ Bean 
222233		ConcurrentKafkaListenerContainerFactory <Integer , String > batchOnlyObservationListenerContainerFactory (
223234				ConsumerFactory <Integer , String > cf , ObservationRegistry  observationRegistry ) {
235+ 
224236			ConcurrentKafkaListenerContainerFactory <Integer , String > factory  =
225237					new  ConcurrentKafkaListenerContainerFactory <>();
226238			factory .setConsumerFactory (cf );
@@ -283,7 +295,9 @@ public <C> Span.Builder extract(C carrier, Getter<C> getter) {
283295		}
284296
285297		@ Bean 
286- 		ObservationRegistry  observationRegistry (Tracer  tracer , Propagator  propagator , MeterRegistry  meterRegistry , TestObservationHandler  testObservationHandler ) {
298+ 		ObservationRegistry  observationRegistry (Tracer  tracer , Propagator  propagator , MeterRegistry  meterRegistry ,
299+ 				TestObservationHandler  testObservationHandler ) {
300+ 
287301			ObservationRegistry  observationRegistry  = ObservationRegistry .create ();
288302			observationRegistry .observationConfig ()
289303					.observationHandler (
@@ -309,34 +323,43 @@ BatchListener batchListener() {
309323		BatchListenerWithoutIndividualObservation  batchListenerWithoutIndividualObservation () {
310324			return  new  BatchListenerWithoutIndividualObservation ();
311325		}
326+ 
312327	}
313328
314329	static  class  BatchListener  {
330+ 
315331		final  CountDownLatch  latch  = new  CountDownLatch (1 );
316332
317333		final  List <String > processedRecords  = new  ArrayList <>();
318334
319- 		@ KafkaListener (topics  = BATCH_INDIVIDUAL_OBSERVATION_TOPIC , containerFactory  = "observationListenerContainerFactory" )
335+ 		@ KafkaListener (topics  = BATCH_INDIVIDUAL_OBSERVATION_TOPIC ,
336+ 				containerFactory  = "observationListenerContainerFactory" )
320337		public  void  listen (List <ConsumerRecord <Integer , String >> records ) {
338+ 
321339			for  (ConsumerRecord <Integer , String > record  : records ) {
322340				processedRecords .add (record .value ());
323341			}
324342			latch .countDown ();
325343		}
344+ 
326345	}
327346
328347	static  class  BatchListenerWithoutIndividualObservation  {
348+ 
329349		final  CountDownLatch  latch  = new  CountDownLatch (1 );
330350
331351		final  List <String > processedRecords  = new  ArrayList <>();
332352
333- 		@ KafkaListener (topics  = BATCH_ONLY_OBSERVATION_TOPIC , containerFactory  = "batchOnlyObservationListenerContainerFactory" )
353+ 		@ KafkaListener (topics  = BATCH_ONLY_OBSERVATION_TOPIC ,
354+ 				containerFactory  = "batchOnlyObservationListenerContainerFactory" )
355+ 
334356		public  void  listen (List <ConsumerRecord <Integer , String >> records ) {
335357			for  (ConsumerRecord <Integer , String > record  : records ) {
336358				processedRecords .add (record .value ());
337359			}
338360			latch .countDown ();
339361		}
362+ 
340363	}
341364
342365	static  class  TestObservationHandler  implements  ObservationHandler <Observation .Context > {
@@ -373,6 +396,7 @@ public int getStartedObservations() {
373396		public  void  clear () {
374397			startedObservations .set (0 );
375398		}
399+ 
376400	}
377401
378402}
0 commit comments