Skip to content

Commit b24054e

Browse files
authored
Merge pull request #1300 from confluentinc/KNET-12362
[KNET-12362] Produce V3 API - Improve visibility around produce-record errors
2 parents 8178d14 + 16369e5 commit b24054e

File tree

8 files changed

+518
-37
lines changed

8 files changed

+518
-37
lines changed

kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestApplication.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,10 @@ private static RequestLog createRequestLog(
130130
return new CustomLog(
131131
requestLogWriter,
132132
requestLogFormat,
133-
new String[] {CustomLogRequestAttributes.REST_ERROR_CODE});
133+
new String[] {
134+
CustomLogRequestAttributes.REST_ERROR_CODE,
135+
CustomLogRequestAttributes.REST_PRODUCE_RECORD_ERROR_CODE_COUNTS
136+
});
134137
}
135138
// Return null, as Application's ctor would set-up a default request-logger.
136139
return null;

kafka-rest/src/main/java/io/confluent/kafkarest/requestlog/CustomLog.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package io.confluent.kafkarest.requestlog;
1717

18+
import java.util.Map;
19+
import java.util.TreeMap;
20+
import java.util.stream.Collectors;
1821
import org.eclipse.jetty.server.CustomRequestLog;
1922
import org.eclipse.jetty.server.Request;
2023
import org.eclipse.jetty.server.RequestLog;
@@ -39,6 +42,8 @@ public class CustomLog extends AbstractLifeCycle implements RequestLog {
3942

4043
private final String[] requestAttributesToLog;
4144

45+
public static final String PRODUCE_ERROR_CODE_LOG_PREFIX = "Codes=";
46+
4247
public CustomLog(RequestLog.Writer writer, String formatString, String[] requestAttributesToLog) {
4348
for (String attr : requestAttributesToLog) {
4449
// Add format-specifier to log request-attributes as response-headers in Jetty's
@@ -63,6 +68,27 @@ protected void doStop() throws Exception {
6368
}
6469
}
6570

71+
/**
72+
* This class aggregates error-codes for produce-records within a single (http)produce-request.
73+
* This implements toString() method which is used by CustomLog to get a message to log for the
74+
* aggregated error counts.
75+
*/
76+
public static class ProduceRecordErrorCounter {
77+
private final Map<Integer, Integer> produceErrorCodeCountMap = new TreeMap<>();
78+
79+
public synchronized void incrementErrorCount(int httpErrorCode) {
80+
produceErrorCodeCountMap.merge(httpErrorCode, 1, Integer::sum);
81+
}
82+
83+
@Override
84+
public synchronized String toString() {
85+
return PRODUCE_ERROR_CODE_LOG_PREFIX
86+
+ produceErrorCodeCountMap.entrySet().stream()
87+
.map(entry -> entry.getKey() + ":" + entry.getValue())
88+
.collect(Collectors.joining(","));
89+
}
90+
}
91+
6692
@Override
6793
public void log(Request request, Response response) {
6894
// The configured request-attributes are converted to response-headers so Jetty can log them.

kafka-rest/src/main/java/io/confluent/kafkarest/requestlog/CustomLogRequestAttributes.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ public final class CustomLogRequestAttributes {
2424
private CustomLogRequestAttributes() {}
2525

2626
public static final String REST_ERROR_CODE = "REST_ERROR_CODE";
27+
public static final String REST_PRODUCE_RECORD_ERROR_CODE_COUNTS =
28+
"REST_PRODUCE_RECORD_ERROR_CODE_COUNTS";
2729
}

kafka-rest/src/main/java/io/confluent/kafkarest/resources/v3/ProduceAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import io.confluent.kafkarest.extension.ResourceAccesslistFeature.ResourceName;
3939
import io.confluent.kafkarest.ratelimit.DoNotRateLimit;
4040
import io.confluent.kafkarest.ratelimit.RateLimitExceededException;
41+
import io.confluent.kafkarest.requestlog.CustomLog.ProduceRecordErrorCounter;
42+
import io.confluent.kafkarest.requestlog.CustomLogRequestAttributes;
4143
import io.confluent.kafkarest.resources.v3.V3ResourcesModule.ProduceResponseThreadPool;
4244
import io.confluent.kafkarest.response.JsonStream;
4345
import io.confluent.kafkarest.response.StreamingResponseFactory;
@@ -147,13 +149,19 @@ public void produce(
147149
throw Errors.invalidPayloadException("Request body is empty. Data is required.");
148150
}
149151

152+
ProduceRecordErrorCounter produceRecordErrorCounter = new ProduceRecordErrorCounter();
153+
150154
ProduceController controller = produceControllerProvider.get();
151155
streamingResponseFactory
152156
.from(requests)
153157
.compose(
154158
request ->
155159
produce(clusterId, topicName, request, controller, producerMetricsProvider.get()))
156-
.resume(asyncResponse);
160+
.resume(asyncResponse, produceRecordErrorCounter);
161+
162+
httpServletRequest.setAttribute(
163+
CustomLogRequestAttributes.REST_PRODUCE_RECORD_ERROR_CODE_COUNTS,
164+
produceRecordErrorCounter);
157165
}
158166

159167
private CompletableFuture<ProduceResponse> produce(

kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.confluent.kafkarest.exceptions.StatusCodeException;
3434
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
3535
import io.confluent.kafkarest.exceptions.v3.V3ExceptionMapper;
36+
import io.confluent.kafkarest.requestlog.CustomLog.ProduceRecordErrorCounter;
3637
import io.confluent.rest.entities.ErrorMessage;
3738
import io.confluent.rest.exceptions.KafkaExceptionMapper;
3839
import io.confluent.rest.exceptions.RestConstraintViolationException;
@@ -159,7 +160,8 @@ public final <O> StreamingResponse<O> compose(
159160
* <p>This method will block until all requests are read in. The responses are computed and
160161
* written to {@code asyncResponse} asynchronously.
161162
*/
162-
public final void resume(AsyncResponse asyncResponse) {
163+
public final void resume(
164+
AsyncResponse asyncResponse, ProduceRecordErrorCounter produceRecordErrorCounter) {
163165
log.debug("Resuming StreamingResponse");
164166
AsyncResponseQueue responseQueue = new AsyncResponseQueue(chunkedOutputFactory);
165167
responseQueue.asyncResume(asyncResponse);
@@ -186,7 +188,11 @@ public final void resume(AsyncResponse asyncResponse) {
186188
"Streaming connection open for longer than allowed",
187189
"Connection will be closed.")))));
188190
} else if (!closingStarted) {
189-
responseQueue.push(next().handle(this::handleNext));
191+
responseQueue.push(
192+
next()
193+
.handle(
194+
(result, exception) ->
195+
handleNext(result, exception, produceRecordErrorCounter)));
190196
} else {
191197
break;
192198
}
@@ -218,11 +224,17 @@ private void closeAll(AsyncResponseQueue responseQueue) {
218224
responseQueue.close();
219225
}
220226

221-
private ResultOrError handleNext(T result, @Nullable Throwable error) {
227+
private ResultOrError handleNext(
228+
T result, @Nullable Throwable error, ProduceRecordErrorCounter produceRecordErrorCounter) {
222229
if (error == null) {
223230
return ResultOrError.result(result);
224231
} else {
225232
log.debug("Error processing streaming operation.", error);
233+
if (error.getCause() == null) {
234+
throw new IllegalArgumentException("Error cause is null", error);
235+
}
236+
int errorCode = EXCEPTION_MAPPER.toErrorResponse(error.getCause()).getErrorCode();
237+
produceRecordErrorCounter.incrementErrorCount(errorCode);
226238
return ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(error.getCause()));
227239
}
228240
}

0 commit comments

Comments
 (0)