|
7 | 7 | import java.util.List; |
8 | 8 | import java.util.concurrent.CompletableFuture; |
9 | 9 | import java.util.concurrent.ExecutionException; |
10 | | -import java.util.concurrent.atomic.AtomicInteger; |
11 | 10 | import lombok.Getter; |
12 | 11 | import lombok.SneakyThrows; |
13 | 12 | import lombok.extern.slf4j.Slf4j; |
14 | 13 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
15 | 14 | import org.springframework.beans.factory.annotation.Autowired; |
16 | | -import org.springframework.beans.factory.annotation.Value; |
17 | 15 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
18 | 16 | import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent; |
19 | 17 | import org.springframework.context.annotation.Configuration; |
20 | | -import org.springframework.context.event.ContextClosedEvent; |
21 | 18 | import org.springframework.context.event.ContextRefreshedEvent; |
22 | 19 | import org.springframework.context.event.EventListener; |
23 | 20 | import org.springframework.core.task.TaskExecutor; |
@@ -54,17 +51,6 @@ public class DynamicKafkaListener { |
54 | 51 |
|
55 | 52 | @Autowired private CommonErrorHandler topicTransactionErrorHandler; |
56 | 53 |
|
57 | | - private final AtomicInteger inFlightRecords = new AtomicInteger(0); |
58 | | - |
59 | | - @Value("${kafka.general.consumer-shutdown-timeout-in-sec:30}") |
60 | | - private int shutdownTimeoutInSeconds; |
61 | | - |
62 | | - @EventListener(ContextClosedEvent.class) |
63 | | - public void onContextClosed() { |
64 | | - log.info("Application context closing, performing graceful Kafka shutdown"); |
65 | | - performGracefulShutdown(); |
66 | | - } |
67 | | - |
68 | 54 | @EventListener |
69 | 55 | public void handleEvent(ContextRefreshedEvent event) { |
70 | 56 | log.info("Initializing Kafka Consumers.."); |
@@ -142,122 +128,58 @@ private Object determineMessageListenerForTransactions(KafkaProperties.Consumer |
142 | 128 | */ |
143 | 129 | private Object getMultithreadedBatchAcknowledgingMessageListener() { |
144 | 130 | return new BatchAcknowledgingMessageListener<String, String>() { |
| 131 | + |
145 | 132 | @SneakyThrows |
146 | 133 | @Override |
147 | 134 | public void onMessage( |
148 | 135 | List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) { |
149 | 136 | log.debug("Consumer got assigned with a Batch of size : {}", consumerRecords.size()); |
150 | 137 |
|
151 | | - // Track the number of records we're processing |
152 | | - inFlightRecords.addAndGet(consumerRecords.size()); |
153 | | - |
154 | 138 | List<CompletableFuture<Void>> transactionSubmissionTasks = new ArrayList<>(); |
155 | 139 |
|
156 | 140 | // Dispatch workers for asynchronously processing Individual records |
157 | 141 | for (ConsumerRecord<String, String> message : consumerRecords) { |
158 | 142 | transactionSubmissionTasks.add( |
159 | 143 | CompletableFuture.runAsync( |
160 | 144 | () -> { |
161 | | - try { |
162 | | - transactionConsumer.listen(message); |
163 | | - } finally { |
164 | | - // No need to decrement here as we'll do it after all tasks complete or fail |
165 | | - } |
| 145 | + transactionConsumer.listen(message); |
166 | 146 | }, |
167 | 147 | defaultTaskExecutor)); |
168 | 148 | } |
169 | 149 |
|
170 | | - boolean batchSuccess = true; |
171 | | - int failedIndex = -1; |
172 | | - |
173 | 150 | for (int i = 0; i < transactionSubmissionTasks.size(); i++) { |
174 | 151 | try { |
175 | 152 | transactionSubmissionTasks.get(i).get(); |
176 | 153 | } catch (InterruptedException | ExecutionException e) { |
177 | | - batchSuccess = false; |
178 | | - failedIndex = i; |
179 | 154 |
|
180 | 155 | final Throwable cause = e.getCause(); |
181 | 156 |
|
182 | 157 | if (cause instanceof ServiceException) { |
183 | 158 | log.error( |
184 | 159 | "One of the Consumer Record in Async Batch Processor failed with message {}", |
185 | 160 | cause.getMessage()); |
| 161 | + throw new BatchListenerFailedException( |
| 162 | + "Failed to process a Consumer Record from the Batch", i); |
186 | 163 | } |
187 | 164 |
|
188 | 165 | if (cause instanceof InterruptedException) { |
189 | 166 | throw e; |
190 | 167 | } |
191 | 168 | } |
192 | 169 | } |
193 | | - |
194 | | - // Always decrement the counter for all records in the batch |
195 | | - inFlightRecords.addAndGet(-consumerRecords.size()); |
196 | | - |
197 | 170 | // If the entire Records were processed successfully, Ack & commit the entire Batch |
198 | | - if (batchSuccess) { |
199 | | - acknowledgment.acknowledge(); |
200 | | - } else { |
201 | | - throw new BatchListenerFailedException( |
202 | | - "Failed to process a Consumer Record from the Batch", failedIndex); |
203 | | - } |
| 171 | + acknowledgment.acknowledge(); |
204 | 172 | } |
205 | 173 | }; |
206 | 174 | } |
207 | 175 |
|
208 | 176 | private Object getPerRecordAcknowledgingListener() { |
| 177 | + |
209 | 178 | return (AcknowledgingMessageListener<String, String>) |
210 | 179 | (message, acknowledgment) -> { |
211 | | - try { |
212 | | - // Increment counter before processing |
213 | | - inFlightRecords.incrementAndGet(); |
214 | | - |
215 | | - transactionConsumer.listen(message); |
216 | | - // Manually ack the single Record |
217 | | - acknowledgment.acknowledge(); |
218 | | - } finally { |
219 | | - // Always decrement counter, even if exception occurred |
220 | | - inFlightRecords.decrementAndGet(); |
221 | | - } |
| 180 | + transactionConsumer.listen(message); |
| 181 | + // Manually ack the single Record |
| 182 | + acknowledgment.acknowledge(); |
222 | 183 | }; |
223 | 184 | } |
224 | | - |
225 | | - private void performGracefulShutdown() { |
226 | | - log.info("Starting graceful shutdown of Kafka consumers"); |
227 | | - |
228 | | - // Stop all containers from polling new messages |
229 | | - if (!CollectionUtils.isEmpty(existingContainers)) { |
230 | | - existingContainers.forEach( |
231 | | - container -> { |
232 | | - log.info("Stopping container: {}", container.metrics().keySet().iterator().next()); |
233 | | - container.stop(); |
234 | | - }); |
235 | | - } |
236 | | - |
237 | | - // Wait for in-flight messages to be processed |
238 | | - log.info( |
239 | | - "All Kafka containers stopped from polling. Waiting for {} in-flight records to be processed...", |
240 | | - inFlightRecords.get()); |
241 | | - |
242 | | - long startTime = System.currentTimeMillis(); |
243 | | - |
244 | | - try { |
245 | | - while (inFlightRecords.get() > 0 |
246 | | - && System.currentTimeMillis() - startTime < (shutdownTimeoutInSeconds * 1000L)) { |
247 | | - log.info("Still waiting for {} records to be acknowledged", inFlightRecords.get()); |
248 | | - Thread.sleep(500); |
249 | | - } |
250 | | - } catch (InterruptedException e) { |
251 | | - Thread.currentThread().interrupt(); |
252 | | - log.error("Interrupted during shutdown wait", e); |
253 | | - } |
254 | | - |
255 | | - if (inFlightRecords.get() > 0) { |
256 | | - log.warn("{} records were not acknowledged before shutdown timeout", inFlightRecords.get()); |
257 | | - } else { |
258 | | - log.info("All records successfully processed and acknowledged"); |
259 | | - } |
260 | | - |
261 | | - log.info("Kafka consumer graceful shutdown completed"); |
262 | | - } |
263 | 185 | } |
0 commit comments