-
Notifications
You must be signed in to change notification settings - Fork 3.9k
11246 :: Unexpected error when server expands a compressed message to learn it is too large #12360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
fe36e6d
61fbe27
bb929aa
1024a7f
5410196
14adfdb
631edd3
92b75a8
894dc66
65fbebb
3b9ebbe
55741e4
26095ef
f16cd6b
a511a77
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,7 @@ | |
| import io.grpc.ServerServiceDefinition; | ||
| import io.grpc.ServerTransportFilter; | ||
| import io.grpc.Status; | ||
| import io.grpc.StatusRuntimeException; | ||
| import io.perfmark.Link; | ||
| import io.perfmark.PerfMark; | ||
| import io.perfmark.Tag; | ||
|
|
@@ -811,7 +812,13 @@ void setListener(ServerStreamListener listener) { | |
| private void internalClose(Throwable t) { | ||
| // TODO(ejona86): this is not thread-safe :) | ||
| String description = "Application error processing RPC"; | ||
| stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata()); | ||
| Status statusToPropagate = Status.UNKNOWN.withDescription(description).withCause(t); | ||
| if (t instanceof StatusRuntimeException) { | ||
| if (((StatusRuntimeException) t).getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) { | ||
| statusToPropagate = ((StatusRuntimeException) t).getStatus().withCause(t); | ||
| } | ||
| } | ||
|
||
| stream.close(statusToPropagate, new Metadata()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,7 @@ | |
| import io.grpc.ServiceDescriptor; | ||
| import io.grpc.Status; | ||
| import io.grpc.Status.Code; | ||
| import io.grpc.StatusRuntimeException; | ||
| import io.grpc.StringMarshaller; | ||
| import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; | ||
| import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder; | ||
|
|
@@ -1542,6 +1543,99 @@ public void channelz_transport_membershp() throws Exception { | |
| assertTrue(after.end); | ||
| } | ||
|
|
||
| @Test | ||
| public void testInternalClose_nonProtocolStatusRuntimeExceptionBecomesUnknown() { | ||
| JumpToApplicationThreadServerStreamListener listener | ||
| = new JumpToApplicationThreadServerStreamListener( | ||
| executor.getScheduledExecutorService(), | ||
| executor.getScheduledExecutorService(), | ||
| stream, | ||
| Context.ROOT.withCancellation(), | ||
| PerfMark.createTag()); | ||
| ServerStreamListener mockListener = mock(ServerStreamListener.class); | ||
| listener.setListener(mockListener); | ||
|
|
||
| StatusRuntimeException statusRuntimeException | ||
| = new StatusRuntimeException(Status.PERMISSION_DENIED.withDescription("denied")); | ||
| doThrow(statusRuntimeException).when(mockListener).onReady(); | ||
| listener.onReady(); | ||
| try { | ||
| executor.runDueTasks(); | ||
| fail("Expected exception"); | ||
| } catch (RuntimeException t) { | ||
| assertSame(statusRuntimeException, t); | ||
| ensureServerStateNotLeaked(); | ||
| } | ||
| verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); | ||
| Status status = statusCaptor.getValue(); | ||
| assertEquals(Code.UNKNOWN, status.getCode()); | ||
| assertEquals("Application error processing RPC", status.getDescription()); | ||
| assertEquals(statusRuntimeException, status.getCause()); | ||
| assertTrue(metadataCaptor.getValue().keys().isEmpty()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testInternalClose_otherExceptionBecomesUnknown() { | ||
| JumpToApplicationThreadServerStreamListener listener | ||
| = new JumpToApplicationThreadServerStreamListener( | ||
| executor.getScheduledExecutorService(), | ||
| executor.getScheduledExecutorService(), | ||
| stream, | ||
| Context.ROOT.withCancellation(), | ||
| PerfMark.createTag()); | ||
| ServerStreamListener mockListener = mock(ServerStreamListener.class); | ||
| listener.setListener(mockListener); | ||
|
|
||
| RuntimeException expectedT = new RuntimeException(); | ||
|
||
| doThrow(expectedT).when(mockListener) | ||
| .messagesAvailable(any(StreamListener.MessageProducer.class)); | ||
| listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); | ||
| try { | ||
| executor.runDueTasks(); | ||
| fail("Expected exception"); | ||
| } catch (RuntimeException t) { | ||
| assertSame(expectedT, t); | ||
| ensureServerStateNotLeaked(); | ||
| } | ||
| verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); | ||
| Status status = statusCaptor.getValue(); | ||
| assertEquals(Code.UNKNOWN, status.getCode()); | ||
| assertEquals("Application error processing RPC", status.getDescription()); | ||
| assertEquals(expectedT, status.getCause()); | ||
| assertTrue(metadataCaptor.getValue().keys().isEmpty()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testInternalClose_propagatesResourceExhausted() { | ||
| JumpToApplicationThreadServerStreamListener listener | ||
| = new JumpToApplicationThreadServerStreamListener( | ||
| executor.getScheduledExecutorService(), | ||
| executor.getScheduledExecutorService(), | ||
| stream, | ||
| Context.ROOT.withCancellation(), | ||
| PerfMark.createTag()); | ||
| ServerStreamListener mockListener = mock(ServerStreamListener.class); | ||
| listener.setListener(mockListener); | ||
|
|
||
| StatusRuntimeException statusRuntimeException | ||
| = new StatusRuntimeException(Status.RESOURCE_EXHAUSTED.withDescription("exhausted")); | ||
| doThrow(statusRuntimeException).when(mockListener) | ||
| .messagesAvailable(any(StreamListener.MessageProducer.class)); | ||
| listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); | ||
| try { | ||
| executor.runDueTasks(); | ||
| fail("Expected exception"); | ||
| } catch (RuntimeException t) { | ||
| assertSame(statusRuntimeException, t); | ||
| } | ||
| verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); | ||
| Status status = statusCaptor.getValue(); | ||
| assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); | ||
| assertEquals("exhausted", status.getDescription()); | ||
| assertEquals(statusRuntimeException, status.getCause()); | ||
| assertTrue(metadataCaptor.getValue().keys().isEmpty()); | ||
| } | ||
|
|
||
| private void createAndStartServer() throws IOException { | ||
| createServer(); | ||
| server.start(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there no other cases where we can get "RESOURCE_EXHAUSTED" ? As, this is not tied specifically to marshaling the intended behavior can change if and when new "RESOURCE_EXHAUSTED" are added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AgraVator , I've addressed the review comments and changed the logic to handle both StatusRuntimeException and StatusException , I also maintained the existing exception propagation as same for other status code except RESOURCE_EXHAUSTED, Please review and share your feedback.