| 
16 | 16 | 
 
  | 
17 | 17 | package org.springframework.kafka.listener;  | 
18 | 18 | 
 
  | 
19 |  | -import java.lang.reflect.Type;  | 
20 |  | -import java.util.Arrays;  | 
 | 19 | +import java.util.ArrayList;  | 
21 | 20 | import java.util.List;  | 
22 |  | - | 
23 |  | -import org.apache.kafka.clients.consumer.ConsumerRecord;  | 
 | 21 | +import java.util.Map;  | 
 | 22 | +import java.util.concurrent.CountDownLatch;  | 
 | 23 | +import java.util.concurrent.TimeUnit;  | 
 | 24 | + | 
 | 25 | +import org.apache.kafka.clients.consumer.ConsumerConfig;  | 
 | 26 | +import org.apache.kafka.clients.producer.ProducerConfig;  | 
 | 27 | +import org.apache.kafka.common.serialization.ByteArrayDeserializer;  | 
 | 28 | +import org.apache.kafka.common.serialization.ByteArraySerializer;  | 
 | 29 | +import org.apache.kafka.common.serialization.IntegerDeserializer;  | 
 | 30 | +import org.apache.kafka.common.serialization.IntegerSerializer;  | 
24 | 31 | import org.junit.jupiter.api.Test;  | 
25 | 32 | 
 
  | 
 | 33 | +import org.springframework.beans.factory.annotation.Autowired;  | 
 | 34 | +import org.springframework.context.annotation.Bean;  | 
 | 35 | +import org.springframework.context.annotation.Configuration;  | 
 | 36 | +import org.springframework.kafka.annotation.EnableKafka;  | 
 | 37 | +import org.springframework.kafka.annotation.KafkaListener;  | 
 | 38 | +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;  | 
 | 39 | +import org.springframework.kafka.config.KafkaListenerContainerFactory;  | 
 | 40 | +import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  | 
 | 41 | +import org.springframework.kafka.core.DefaultKafkaProducerFactory;  | 
 | 42 | +import org.springframework.kafka.core.KafkaTemplate;  | 
 | 43 | +import org.springframework.kafka.core.ProducerFactory;  | 
26 | 44 | import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;  | 
27 | 45 | import org.springframework.kafka.support.converter.MessagingMessageConverter;  | 
 | 46 | +import org.springframework.kafka.test.EmbeddedKafkaBroker;  | 
 | 47 | +import org.springframework.kafka.test.context.EmbeddedKafka;  | 
 | 48 | +import org.springframework.kafka.test.utils.KafkaTestUtils;  | 
28 | 49 | import org.springframework.messaging.Message;  | 
29 | 50 | import org.springframework.messaging.MessageHeaders;  | 
30 | 51 | import org.springframework.messaging.converter.SmartMessageConverter;  | 
31 | 52 | import org.springframework.messaging.support.MessageBuilder;  | 
 | 53 | +import org.springframework.test.annotation.DirtiesContext;  | 
 | 54 | +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;  | 
32 | 55 | 
 
  | 
33 | 56 | import static org.assertj.core.api.Assertions.assertThat;  | 
34 | 57 | 
 
  | 
35 | 58 | /**  | 
36 |  | - * Tests for SmartMessageConverter support in batch listeners.  | 
37 |  | - * Reproduces the issue described in GH-4097.  | 
 | 59 | + * Integration tests for SmartMessageConverter support in batch listeners.  | 
 | 60 | + * Reproduces and verifies the fix for the issue described in GH-4097.  | 
38 | 61 |  *  | 
39 |  | - * @author Jujuwryy  | 
 | 62 | + * @author George Mahfoud  | 
40 | 63 |  * @since 3.3.11  | 
41 | 64 |  */  | 
 | 65 | +@SpringJUnitConfig  | 
 | 66 | +@DirtiesContext  | 
 | 67 | +@EmbeddedKafka(partitions = 1, topics = { "smartBatchTopic" })  | 
42 | 68 | class BatchSmartMessageConverterTests {  | 
43 | 69 | 
 
  | 
44 |  | -	@Test  | 
45 |  | -	void testSmartMessageConverterWorksInBatchConversion() {  | 
46 |  | -		// Given: A BatchMessagingMessageConverter with a record converter and SmartMessageConverter  | 
47 |  | -		MessagingMessageConverter recordConverter = new MessagingMessageConverter();  | 
48 |  | -		BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter);  | 
49 |  | - | 
50 |  | -		// Set up SmartMessageConverter that converts byte[] to String  | 
51 |  | -		TestStringMessageConverter smartConverter = new TestStringMessageConverter();  | 
52 |  | -		batchConverter.setMessagingConverter(smartConverter);  | 
53 |  | - | 
54 |  | -		// Create test records with byte[] values that need conversion to String  | 
55 |  | -		List<ConsumerRecord<?, ?>> records = Arrays.asList(  | 
56 |  | -				new ConsumerRecord<>("topic", 0, 0, "key", "hello".getBytes()),  | 
57 |  | -				new ConsumerRecord<>("topic", 0, 1, "key", "world".getBytes())  | 
58 |  | -		);  | 
59 |  | - | 
60 |  | -		// When: Convert batch with List<String> target type  | 
61 |  | -		Type targetType = new TestParameterizedType(List.class, new Type[]{String.class});  | 
62 |  | -		Message<?> result = batchConverter.toMessage(records, null, null, targetType);  | 
63 |  | - | 
64 |  | -		// Then: Verify the SmartMessageConverter was applied and byte[] was converted to String  | 
65 |  | -		assertThat(result).isNotNull();  | 
66 |  | -		assertThat(result.getPayload()).isInstanceOf(List.class);  | 
67 |  | - | 
68 |  | -		List<?> payloads = (List<?>) result.getPayload();  | 
69 |  | -		assertThat(payloads).hasSize(2);  | 
70 |  | -		assertThat(payloads.get(0)).isEqualTo("hello");  | 
71 |  | -		assertThat(payloads.get(1)).isEqualTo("world");  | 
72 |  | -	}  | 
 | 70 | +	@Autowired  | 
 | 71 | +	private KafkaTemplate<Integer, byte[]> template;  | 
 | 72 | + | 
 | 73 | +	@Autowired  | 
 | 74 | +	private Config config;  | 
73 | 75 | 
 
  | 
74 | 76 | 	@Test  | 
75 |  | -	void testBatchConversionWithoutSmartMessageConverter() {  | 
76 |  | -		// Given: A BatchMessagingMessageConverter without SmartMessageConverter  | 
77 |  | -		MessagingMessageConverter recordConverter = new MessagingMessageConverter();  | 
78 |  | -		BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter);  | 
79 |  | - | 
80 |  | -		// Create test records with byte[] values  | 
81 |  | -		List<ConsumerRecord<?, ?>> records = Arrays.asList(  | 
82 |  | -				new ConsumerRecord<>("topic", 0, 0, "key", "test".getBytes())  | 
83 |  | -		);  | 
84 |  | - | 
85 |  | -		// When: Convert batch  | 
86 |  | -		Type targetType = new TestParameterizedType(List.class, new Type[]{String.class});  | 
87 |  | -		Message<?> result = batchConverter.toMessage(records, null, null, targetType);  | 
88 |  | - | 
89 |  | -		// Then: Should work but payloads remain as byte[]  | 
90 |  | -		assertThat(result).isNotNull();  | 
91 |  | -		List<?> payloads = (List<?>) result.getPayload();  | 
92 |  | -		assertThat(payloads.get(0)).isInstanceOf(byte[].class);  | 
 | 77 | +	void testContentTypeConverterWithBatchListener() throws Exception {  | 
 | 78 | +		// Given: A batch listener with contentTypeConverter configured  | 
 | 79 | +		BatchListener listener = this.config.batchListener();  | 
 | 80 | + | 
 | 81 | +		// When: Send byte[] messages that should be converted to String  | 
 | 82 | +		this.template.send("smartBatchTopic", "hello".getBytes());  | 
 | 83 | +		this.template.send("smartBatchTopic", "world".getBytes());  | 
 | 84 | + | 
 | 85 | +		// Then: SmartMessageConverter should convert byte[] to String for batch listener  | 
 | 86 | +		assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();  | 
 | 87 | +		assertThat(listener.received).hasSize(2).containsExactly("hello", "world");  | 
93 | 88 | 	}  | 
94 | 89 | 
 
  | 
95 |  | -	/**  | 
96 |  | -	 * Test SmartMessageConverter that converts byte[] to String.  | 
97 |  | -	 */  | 
98 |  | -	static class TestStringMessageConverter implements SmartMessageConverter {  | 
 | 90 | +	@Configuration  | 
 | 91 | +	@EnableKafka  | 
 | 92 | +	public static class Config {  | 
 | 93 | + | 
 | 94 | +		@Bean  | 
 | 95 | +		public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka) {  | 
 | 96 | +			ConcurrentKafkaListenerContainerFactory<Integer, byte[]> factory =  | 
 | 97 | +					new ConcurrentKafkaListenerContainerFactory<>();  | 
 | 98 | +			factory.setConsumerFactory(consumerFactory(embeddedKafka));  | 
 | 99 | +			factory.setBatchListener(true);  | 
 | 100 | +			// Set up batch converter with record converter - framework will propagate SmartMessageConverter  | 
 | 101 | +			factory.setBatchMessageConverter(new BatchMessagingMessageConverter(new MessagingMessageConverter()));  | 
 | 102 | +			return factory;  | 
 | 103 | +		}  | 
99 | 104 | 
 
  | 
100 |  | -		@Override  | 
101 |  | -		public Object fromMessage(Message<?> message, Class<?> targetClass) {  | 
102 |  | -			return convertPayload(message.getPayload());  | 
 | 105 | +		@Bean  | 
 | 106 | +		public DefaultKafkaConsumerFactory<Integer, byte[]> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {  | 
 | 107 | +			return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka));  | 
103 | 108 | 		}  | 
104 | 109 | 
 
  | 
105 |  | -		@Override  | 
106 |  | -		public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {  | 
107 |  | -			return convertPayload(message.getPayload());  | 
 | 110 | +		@Bean  | 
 | 111 | +		public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {  | 
 | 112 | +			Map<String, Object> consumerProps =  | 
 | 113 | +					KafkaTestUtils.consumerProps(embeddedKafka, "smartBatchGroup", false);  | 
 | 114 | +			consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);  | 
 | 115 | +			consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);  | 
 | 116 | +			return consumerProps;  | 
108 | 117 | 		}  | 
109 | 118 | 
 
  | 
110 |  | -		@Override  | 
111 |  | -		public Message<?> toMessage(Object payload, MessageHeaders headers) {  | 
112 |  | -			return MessageBuilder.withPayload(payload).copyHeaders(headers).build();  | 
 | 119 | +		@Bean  | 
 | 120 | +		public KafkaTemplate<Integer, byte[]> template(EmbeddedKafkaBroker embeddedKafka) {  | 
 | 121 | +			return new KafkaTemplate<>(producerFactory(embeddedKafka));  | 
113 | 122 | 		}  | 
114 | 123 | 
 
  | 
115 |  | -		@Override  | 
116 |  | -		public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {  | 
117 |  | -			return toMessage(payload, headers);  | 
 | 124 | +		@Bean  | 
 | 125 | +		public ProducerFactory<Integer, byte[]> producerFactory(EmbeddedKafkaBroker embeddedKafka) {  | 
 | 126 | +			return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka));  | 
 | 127 | +		}  | 
 | 128 | + | 
 | 129 | +		@Bean  | 
 | 130 | +		public Map<String, Object> producerConfigs(EmbeddedKafkaBroker embeddedKafka) {  | 
 | 131 | +			Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);  | 
 | 132 | +			props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);  | 
 | 133 | +			props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);  | 
 | 134 | +			return props;  | 
 | 135 | +		}  | 
 | 136 | + | 
 | 137 | +		@Bean  | 
 | 138 | +		public SmartMessageConverter byteArrayToStringConverter() {  | 
 | 139 | +			return new ByteArrayToStringConverter();  | 
118 | 140 | 		}  | 
119 | 141 | 
 
  | 
120 |  | -		private Object convertPayload(Object payload) {  | 
121 |  | -			// Convert byte[] to String - this is the core functionality being tested  | 
122 |  | -			if (payload instanceof byte[] bytes) {  | 
123 |  | -				return new String(bytes);  | 
124 |  | -			}  | 
125 |  | -			return payload;  | 
 | 142 | +		@Bean  | 
 | 143 | +		public BatchListener batchListener() {  | 
 | 144 | +			return new BatchListener();  | 
126 | 145 | 		}  | 
 | 146 | + | 
127 | 147 | 	}  | 
128 | 148 | 
 
  | 
129 |  | -	/**  | 
130 |  | -	 * Helper class for creating parameterized types for testing.  | 
131 |  | -	 */  | 
132 |  | -	static class TestParameterizedType implements java.lang.reflect.ParameterizedType {  | 
 | 149 | +	public static class BatchListener {  | 
 | 150 | + | 
 | 151 | +		private final CountDownLatch latch = new CountDownLatch(2);  | 
133 | 152 | 
 
  | 
134 |  | -		private final Type rawType;  | 
 | 153 | +		private final List<String> received = new ArrayList<>();  | 
135 | 154 | 
 
  | 
136 |  | -		private final Type[] typeArguments;  | 
 | 155 | +		@KafkaListener(  | 
 | 156 | +				id = "batchSmartListener",  | 
 | 157 | +				topics = "smartBatchTopic",  | 
 | 158 | +				groupId = "smartBatchGroup",  | 
 | 159 | +				contentTypeConverter = "byteArrayToStringConverter",  | 
 | 160 | +				batch = "true"  | 
 | 161 | +		)  | 
 | 162 | +		public void listen(List<String> messages) {  | 
 | 163 | +			messages.forEach(message -> {  | 
 | 164 | +				this.received.add(message);  | 
 | 165 | +				this.latch.countDown();  | 
 | 166 | +			});  | 
 | 167 | +		}  | 
 | 168 | + | 
 | 169 | +	}  | 
 | 170 | + | 
 | 171 | +	/**  | 
 | 172 | +	 * Simple SmartMessageConverter for testing that converts byte[] to String.  | 
 | 173 | +	 */  | 
 | 174 | +	static class ByteArrayToStringConverter implements SmartMessageConverter {  | 
137 | 175 | 
 
  | 
138 |  | -		TestParameterizedType(Type rawType, Type[] typeArguments) {  | 
139 |  | -			this.rawType = rawType;  | 
140 |  | -			this.typeArguments = typeArguments;  | 
 | 176 | +		@Override  | 
 | 177 | +		public Object fromMessage(Message<?> message, Class<?> targetClass) {  | 
 | 178 | +			Object payload = message.getPayload();  | 
 | 179 | +			return (payload instanceof byte[] bytes) ? new String(bytes) : payload;  | 
141 | 180 | 		}  | 
142 | 181 | 
 
  | 
143 |  | -		public Type[] getActualTypeArguments() {  | 
144 |  | -			return typeArguments;  | 
 | 182 | +		@Override  | 
 | 183 | +		public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {  | 
 | 184 | +			return fromMessage(message, targetClass);  | 
145 | 185 | 		}  | 
146 | 186 | 
 
  | 
147 |  | -		public Type getRawType() {  | 
148 |  | -			return rawType;  | 
 | 187 | +		@Override  | 
 | 188 | +		public Message<?> toMessage(Object payload, MessageHeaders headers) {  | 
 | 189 | +			return MessageBuilder.withPayload(payload).copyHeaders(headers).build();  | 
149 | 190 | 		}  | 
150 | 191 | 
 
  | 
151 |  | -		public Type getOwnerType() {  | 
152 |  | -			return null;  | 
 | 192 | +		@Override  | 
 | 193 | +		public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {  | 
 | 194 | +			return toMessage(payload, headers);  | 
153 | 195 | 		}  | 
 | 196 | + | 
154 | 197 | 	}  | 
155 | 198 | }  | 
0 commit comments