Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions core/src/main/java/io/grpc/internal/MessageDeframer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.Codec;
import io.grpc.Decompressor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -519,9 +520,9 @@ private void reportCount() {

private void verifySize() {
if (count > maxMessageSize) {
throw Status.RESOURCE_EXHAUSTED
.withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize)
.asRuntimeException();
throw new TooLongDecompressedMessageException(
Status.RESOURCE_EXHAUSTED
.withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize));
}
}
}
Expand All @@ -541,4 +542,12 @@ public InputStream next() {
return messageToReturn;
}
}

static class TooLongDecompressedMessageException extends StatusRuntimeException {
private static final long serialVersionUID = 1L;

public TooLongDecompressedMessageException(Status status) {
super(status);
}
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/internal/ServerCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static io.grpc.internal.MessageDeframer.TooLongDecompressedMessageException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -73,6 +74,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private boolean closeCalled;
private Compressor compressor;
private boolean messageSent;
private Status exceptionStatus = null;

ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
Metadata inboundHeaders, Context.CancellableContext context,
Expand Down Expand Up @@ -270,6 +272,11 @@ public SecurityLevel getSecurityLevel() {
* on.
*/
private void handleInternalError(Throwable internalError) {
if (exceptionStatus != null) {
stream.close(exceptionStatus, new Metadata());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggestion was to just call handleInternalError that does stream.cancel but you are doing stream.close which is different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleInternalError() results in Status{code=CANCELLED, description=RST_STREAM closed stream. HTTP/2 error code: CANCEL, cause=null}

exceptionStatus = null;
return;
}
log.log(Level.WARNING, "Cancelling the stream because of internal error", internalError);
Status status = (internalError instanceof StatusRuntimeException)
? ((StatusRuntimeException) internalError).getStatus()
Expand Down Expand Up @@ -338,6 +345,9 @@ private void messagesAvailableInternal(final MessageProducer producer) {
}
message.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message.close() should go up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before listener.onMessage()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while ((message = producer.next()) != null) {
  ReqT parsedMessage;
  try (InputStream ignored = message) {
    parsedMessage = call.method.parseRequest(message);
  } catch (StatusRuntimeException e) {
    GrpcUtil.closeQuietly(producer);
    call.cancelled = true;
    call.close(e.getStatus(), new Metadata());
    return;
  }
  listener.onMessage(parsedMessage);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@panchenko ,Thank you for clarifying the suggestions. I apologize for the misunderstanding; I initially assumed I should remove listener.onMessage(parsedMessage); along with the try and adding the catch to previous try , which caused failures in existing test cases. I have now corrected this and addressed the comments."

}
} catch (TooLongDecompressedMessageException e) {
this.call.exceptionStatus = e.getStatus();
this.call.handleInternalError(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of this.call.exceptionStatus is an unnecessary complication. It introduces one more field that handleInternalError checks for and ignores the exception argument in that case.
pachenko's suggestion of TooLongDecompressedMessageException was for having a separate catch in ServerImpl.JumpToApplicationThreadServerStreamListener#messagesAvailable, which we are not doing now, so it doesn't apply.
It could just have been a StatusRuntimeException with RESOURCE_EXHAUSTED and checking for the message description to distinguish too large message parsing error from other types of RESOURCE_EXHAUSTED like you suggested before, and leave handleInternalError unchanged.

There was also the suggestion to do call.canceled = true; in the exception handling for parse error due to too large size, that is missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kannanjgithub for review , I intentionally omitted call.canceled = true; because we handle this.call.exceptionStatus within handleInternalError() with a return. This aligns with Eric's alternative suggestion(exceptionStatus), I suspected which makes the explicit cancellation seem redundant and I have addressed this review comments as per the latest review comments from @panchenko

} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
Throwables.throwIfUnchecked(t);
Expand Down
17 changes: 17 additions & 0 deletions core/src/test/java/io/grpc/internal/MessageDeframerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.internal.MessageDeframer.TooLongDecompressedMessageException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -491,6 +492,22 @@ public void sizeEnforcingInputStream_markReset() throws IOException {
stream.close();
checkSizeEnforcingInputStreamStats(tracer, 3);
}

@Test
public void testThrows_TooLongDecompressedMessageException() throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8));
SizeEnforcingInputStream stream =
new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx);

try {
StatusRuntimeException e =
assertThrows(TooLongDecompressedMessageException.class, () -> stream.skip(4));
assertThat(e).hasMessageThat()
.isEqualTo("RESOURCE_EXHAUSTED: Decompressed gRPC message exceeds maximum size 2");
} finally {
stream.close();
}
}
}

/**
Expand Down
Loading