Skip to content

Commit 560e74f

Browse files
committed
core: Workaround retry causing memory leak
Data is getting orphaned sitting in MessageFramer. This hack thus always flushes data out of the framer so no data can remain sitting there. See #9340
1 parent 1d0054f commit 560e74f

File tree

2 files changed

+69
-12
lines changed

2 files changed

+69
-12
lines changed

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,10 @@ class SendMessageEntry implements BufferEntry {
550550
@Override
551551
public void runWith(Substream substream) {
552552
substream.stream.writeMessage(method.streamRequest(message));
553+
// TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
554+
// flushes (or half close), but retry appears to have a code path that the flushes may
555+
// not happen. The code needs to be fixed and this removed. See #9340.
556+
substream.stream.flush();
553557
}
554558
}
555559

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,14 @@ public Void answer(InvocationOnMock in) {
268268
retriableStream.sendMessage("msg3");
269269
retriableStream.request(456);
270270

271-
inOrder.verify(mockStream1, times(2)).writeMessage(any(InputStream.class));
271+
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
272+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
273+
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
274+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
272275
inOrder.verify(mockStream1).request(345);
273276
inOrder.verify(mockStream1, times(2)).flush();
274277
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
278+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
275279
inOrder.verify(mockStream1).request(456);
276280
inOrder.verifyNoMoreInteractions();
277281

@@ -304,12 +308,19 @@ public Void answer(InvocationOnMock in) {
304308
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
305309
ArgumentCaptor.forClass(ClientStreamListener.class);
306310
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
307-
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
311+
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
312+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
313+
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
314+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
308315
inOrder.verify(mockStream2).request(345);
309316
inOrder.verify(mockStream2, times(2)).flush();
310317
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
318+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
311319
inOrder.verify(mockStream2).request(456);
312-
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
320+
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
321+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
322+
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
323+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
313324
inOrder.verify(mockStream2).isReady();
314325
inOrder.verifyNoMoreInteractions();
315326

@@ -319,7 +330,10 @@ public Void answer(InvocationOnMock in) {
319330

320331
// mockStream1 is closed so it is not in the drainedSubstreams
321332
verifyNoMoreInteractions(mockStream1);
322-
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
333+
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
334+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
335+
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
336+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
323337

324338
// retry2
325339
doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2);
@@ -353,12 +367,19 @@ public Void answer(InvocationOnMock in) {
353367
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
354368
ArgumentCaptor.forClass(ClientStreamListener.class);
355369
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
356-
inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class));
370+
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
371+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
372+
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
373+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
357374
inOrder.verify(mockStream3).request(345);
358375
inOrder.verify(mockStream3, times(2)).flush();
359376
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
377+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
360378
inOrder.verify(mockStream3).request(456);
361-
inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class));
379+
for (int i = 0; i < 7; i++) {
380+
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
381+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
382+
}
362383
inOrder.verify(mockStream3).isReady();
363384
inOrder.verifyNoMoreInteractions();
364385

@@ -1958,10 +1979,14 @@ public Void answer(InvocationOnMock in) {
19581979
hedgingStream.sendMessage("msg3");
19591980
hedgingStream.request(456);
19601981

1961-
inOrder.verify(mockStream1, times(2)).writeMessage(any(InputStream.class));
1982+
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
1983+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
1984+
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
1985+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
19621986
inOrder.verify(mockStream1).request(345);
19631987
inOrder.verify(mockStream1, times(2)).flush();
19641988
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
1989+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
19651990
inOrder.verify(mockStream1).request(456);
19661991
inOrder.verifyNoMoreInteractions();
19671992

@@ -1984,10 +2009,14 @@ public Void answer(InvocationOnMock in) {
19842009
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
19852010
ArgumentCaptor.forClass(ClientStreamListener.class);
19862011
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
1987-
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
2012+
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
2013+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
2014+
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
2015+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
19882016
inOrder.verify(mockStream2).request(345);
19892017
inOrder.verify(mockStream2, times(2)).flush();
19902018
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
2019+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
19912020
inOrder.verify(mockStream2).request(456);
19922021
inOrder.verify(mockStream1).isReady();
19932022
inOrder.verify(mockStream2).isReady();
@@ -1998,9 +2027,13 @@ public Void answer(InvocationOnMock in) {
19982027
hedgingStream.sendMessage("msg2 after hedge2 starts");
19992028

20002029
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
2030+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
20012031
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
2032+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
20022033
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
2034+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
20032035
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
2036+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
20042037
inOrder.verifyNoMoreInteractions();
20052038

20062039

@@ -2022,12 +2055,19 @@ public Void answer(InvocationOnMock in) {
20222055
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
20232056
ArgumentCaptor.forClass(ClientStreamListener.class);
20242057
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
2025-
inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class));
2058+
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
2059+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
2060+
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
2061+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
20262062
inOrder.verify(mockStream3).request(345);
20272063
inOrder.verify(mockStream3, times(2)).flush();
20282064
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
2065+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
20292066
inOrder.verify(mockStream3).request(456);
2030-
inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class));
2067+
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
2068+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
2069+
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
2070+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
20312071
inOrder.verify(mockStream1).isReady();
20322072
inOrder.verify(mockStream2).isReady();
20332073
inOrder.verify(mockStream3).isReady();
@@ -2036,8 +2076,11 @@ public Void answer(InvocationOnMock in) {
20362076
// send one more message
20372077
hedgingStream.sendMessage("msg1 after hedge3 starts");
20382078
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
2079+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
20392080
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
2081+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
20402082
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
2083+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
20412084

20422085
// hedge3 receives nonFatalStatus
20432086
sublistenerCaptor3.getValue().closed(
@@ -2047,7 +2090,9 @@ public Void answer(InvocationOnMock in) {
20472090
// send one more message
20482091
hedgingStream.sendMessage("msg1 after hedge3 fails");
20492092
inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
2093+
inOrder.verify(mockStream1).flush(); // Memory leak workaround
20502094
inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
2095+
inOrder.verify(mockStream2).flush(); // Memory leak workaround
20512096

20522097
// the hedge mockStream4 starts
20532098
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@@ -2067,12 +2112,19 @@ public Void answer(InvocationOnMock in) {
20672112
ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
20682113
ArgumentCaptor.forClass(ClientStreamListener.class);
20692114
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
2070-
inOrder.verify(mockStream4, times(2)).writeMessage(any(InputStream.class));
2115+
inOrder.verify(mockStream4).writeMessage(any(InputStream.class));
2116+
inOrder.verify(mockStream4).flush(); // Memory leak workaround
2117+
inOrder.verify(mockStream4).writeMessage(any(InputStream.class));
2118+
inOrder.verify(mockStream4).flush(); // Memory leak workaround
20712119
inOrder.verify(mockStream4).request(345);
20722120
inOrder.verify(mockStream4, times(2)).flush();
20732121
inOrder.verify(mockStream4).writeMessage(any(InputStream.class));
2122+
inOrder.verify(mockStream4).flush(); // Memory leak workaround
20742123
inOrder.verify(mockStream4).request(456);
2075-
inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class));
2124+
for (int i = 0; i < 4; i++) {
2125+
inOrder.verify(mockStream4).writeMessage(any(InputStream.class));
2126+
inOrder.verify(mockStream4).flush(); // Memory leak workaround
2127+
}
20762128
inOrder.verify(mockStream1).isReady();
20772129
inOrder.verify(mockStream2).isReady();
20782130
inOrder.verify(mockStream4).isReady();
@@ -2190,6 +2242,7 @@ public void hedging_maxAttempts() {
21902242

21912243
hedgingStream.sendMessage("msg1 after commit");
21922244
inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
2245+
inOrder.verify(mockStream3).flush(); // Memory leak workaround
21932246
inOrder.verifyNoMoreInteractions();
21942247

21952248
Metadata heders = new Metadata();

0 commit comments

Comments
 (0)