Skip to content

Commit aa8c576

Browse files
KAFKA-18618: Improve leader change handling of acknowledgements [1/N] (apache#18672)
Reviewers: Apoorv Mittal <[email protected]>, ShivsundarR <[email protected]>, Manikumar Reddy <[email protected]>
1 parent b2b2408 commit aa8c576

21 files changed

+537
-183
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.kafka.common.TopicIdPartition;
2121
import org.apache.kafka.common.annotation.InterfaceStability;
2222
import org.apache.kafka.common.errors.AuthorizationException;
23+
import org.apache.kafka.common.errors.DisconnectException;
2324
import org.apache.kafka.common.errors.InterruptException;
2425
import org.apache.kafka.common.errors.InvalidRecordStateException;
26+
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
2527
import org.apache.kafka.common.errors.WakeupException;
2628

2729
import java.util.Map;
@@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback {
4244
*
4345
* @param exception The exception thrown during processing of the request, or null if the acknowledgement completed successfully.
4446
* <p><ul>
47+
* <li> {@link AuthorizationException} if not authorized to the topic or group
4548
* <li> {@link InvalidRecordStateException} if the record state is invalid
46-
* <li> {@link AuthorizationException} if not authorized to the topic of group
49+
* <li> {@link NotLeaderOrFollowerException} if the leader had changed by the time the acknowledgements were sent
50+
* <li> {@link DisconnectException} if the broker disconnected before the request could be completed
4751
* <li> {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is called before or while this function is called
4852
* <li> {@link InterruptException} if the calling thread is interrupted before or while this function is called
4953
* <li> {@link KafkaException} for any other unrecoverable errors
5054
* </ul>
55+
* <p>Note that even if the exception is a retriable exception, the acknowledgement could not be completed and the
56+
* records need to be fetched again. The callback is called after any retries have been performed.
5157
*/
5258
void onComplete(Map<TopicIdPartition, Set<Long>> offsets, Exception exception);
5359
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandler.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.clients.consumer.internals;
1818

1919
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
20+
import org.apache.kafka.common.KafkaException;
2021
import org.apache.kafka.common.TopicIdPartition;
2122

2223
import org.slf4j.Logger;
@@ -45,10 +46,7 @@ public boolean hasEnteredCallback() {
4546
void onComplete(List<Map<TopicIdPartition, Acknowledgements>> acknowledgementsMapList) {
4647
final ArrayList<Throwable> exceptions = new ArrayList<>();
4748
acknowledgementsMapList.forEach(acknowledgementsMap -> acknowledgementsMap.forEach((partition, acknowledgements) -> {
48-
Exception exception = null;
49-
if (acknowledgements.getAcknowledgeErrorCode() != null) {
50-
exception = acknowledgements.getAcknowledgeErrorCode().exception();
51-
}
49+
KafkaException exception = acknowledgements.getAcknowledgeException();
5250
Set<Long> offsets = acknowledgements.getAcknowledgementsTypeMap().keySet();
5351
Set<Long> offsetsCopy = Collections.unmodifiableSet(offsets);
5452
enteredCallback = true;

clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java

+21-14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.clients.consumer.internals;
1818

1919
import org.apache.kafka.clients.consumer.AcknowledgeType;
20+
import org.apache.kafka.common.KafkaException;
2021
import org.apache.kafka.common.protocol.Errors;
2122

2223
import java.util.ArrayList;
@@ -35,15 +36,20 @@ public class Acknowledgements {
3536
// The acknowledgements keyed by offset. If the record is a gap, the AcknowledgeType will be null.
3637
private final Map<Long, AcknowledgeType> acknowledgements;
3738

38-
// When the broker responds to the acknowledgements, this is the error code returned.
39-
private Errors acknowledgeErrorCode;
39+
// When the broker responds to the acknowledgements, this is the exception thrown.
40+
private KafkaException acknowledgeException;
41+
42+
// Set when the broker has responded to the acknowledgements.
43+
private boolean completed;
4044

4145
public static Acknowledgements empty() {
4246
return new Acknowledgements(new TreeMap<>());
4347
}
4448

4549
private Acknowledgements(Map<Long, AcknowledgeType> acknowledgements) {
4650
this.acknowledgements = acknowledgements;
51+
this.acknowledgeException = null;
52+
this.completed = false;
4753
}
4854

4955
/**
@@ -115,25 +121,26 @@ public int size() {
115121
* @return Whether the acknowledgements were sent to the broker and a response received
116122
*/
117123
public boolean isCompleted() {
118-
return acknowledgeErrorCode != null;
124+
return completed;
119125
}
120126

121127
/**
122-
* Set the acknowledgement error code when the response has been received from the broker.
128+
* Completes the acknowledgements when the response has been received from the broker.
123129
*
124-
* @param acknowledgeErrorCode the error code
130+
* @param acknowledgeException the exception (will be null if successful)
125131
*/
126-
public void setAcknowledgeErrorCode(Errors acknowledgeErrorCode) {
127-
this.acknowledgeErrorCode = acknowledgeErrorCode;
132+
public void complete(KafkaException acknowledgeException) {
133+
this.acknowledgeException = acknowledgeException;
134+
completed = true;
128135
}
129136

130137
/**
131-
* Get the acknowledgement error code when the response has been received from the broker.
138+
* Get the acknowledgement exception when the response has been received from the broker.
132139
*
133140
* @return the error code
134141
*/
135-
public Errors getAcknowledgeErrorCode() {
136-
return acknowledgeErrorCode;
142+
public KafkaException getAcknowledgeException() {
143+
return acknowledgeException;
137144
}
138145

139146
/**
@@ -301,10 +308,10 @@ private boolean canOptimiseForSingleAcknowledgeType(AcknowledgementBatch acknowl
301308
public String toString() {
302309
StringBuilder sb = new StringBuilder("Acknowledgements(");
303310
sb.append(acknowledgements);
304-
if (acknowledgeErrorCode != null) {
305-
sb.append(", errorCode=");
306-
sb.append(acknowledgeErrorCode.code());
307-
}
311+
sb.append(", acknowledgeException=");
312+
sb.append(acknowledgeException != null ? Errors.forException(acknowledgeException) : "null");
313+
sb.append(", completed=");
314+
sb.append(completed);
308315
sb.append(")");
309316
return sb.toString();
310317
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer.internals;
18+
19+
import java.util.Objects;
20+
21+
/**
22+
* This class combines Acknowledgements with the id of the node to use for acknowledging.
23+
*/
24+
public class NodeAcknowledgements {
25+
private final int nodeId;
26+
private final Acknowledgements acknowledgements;
27+
28+
public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) {
29+
this.nodeId = nodeId;
30+
this.acknowledgements = Objects.requireNonNull(acknowledgements);
31+
}
32+
33+
public int nodeId() {
34+
return nodeId;
35+
}
36+
37+
public Acknowledgements acknowledgements() {
38+
return acknowledgements;
39+
}
40+
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
* to keep track of aborted transactions or the need to keep track of fetch position.
5656
*/
5757
public class ShareCompletedFetch {
58-
58+
final int nodeId;
5959
final TopicIdPartition partition;
6060
final ShareFetchResponseData.PartitionData partitionData;
6161
final short requestVersion;
@@ -79,12 +79,14 @@ public class ShareCompletedFetch {
7979

8080
ShareCompletedFetch(final LogContext logContext,
8181
final BufferSupplier decompressionBufferSupplier,
82+
final int nodeId,
8283
final TopicIdPartition partition,
8384
final ShareFetchResponseData.PartitionData partitionData,
8485
final ShareFetchMetricsAggregator metricAggregator,
8586
final short requestVersion) {
8687
this.log = logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class);
8788
this.decompressionBufferSupplier = decompressionBufferSupplier;
89+
this.nodeId = nodeId;
8890
this.partition = partition;
8991
this.partitionData = partitionData;
9092
this.metricAggregator = metricAggregator;
@@ -156,7 +158,7 @@ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deseriali
156158
final int maxRecords,
157159
final boolean checkCrcs) {
158160
// Creating an empty ShareInFlightBatch
159-
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(partition);
161+
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(nodeId, partition);
160162

161163
if (cachedBatchException != null) {
162164
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.

0 commit comments

Comments
 (0)