Skip to content

Commit a4c7d9a

Browse files
author
Xudong Ma
committed
Redo b1e2aae, with some changes to prevent test flaky.
1 parent e1bd6ef commit a4c7d9a

File tree

2 files changed

+62
-64
lines changed

2 files changed

+62
-64
lines changed

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.common.base.Preconditions;
3838
import com.google.common.base.Stopwatch;
3939
import com.google.common.base.Ticker;
40+
import com.google.common.util.concurrent.SettableFuture;
4041

4142
import com.squareup.okhttp.ConnectionSpec;
4243
import com.squareup.okhttp.OkHttpTlsUpgrader;
@@ -154,13 +155,15 @@ class OkHttpClientTransport implements ClientTransport {
154155
private SSLSocketFactory sslSocketFactory;
155156
private Socket socket;
156157
@GuardedBy("lock")
157-
private int maxConcurrentStreams = Integer.MAX_VALUE;
158+
private int maxConcurrentStreams = 0;
158159
@GuardedBy("lock")
159160
private LinkedList<OkHttpClientStream> pendingStreams = new LinkedList<OkHttpClientStream>();
160161
private final ConnectionSpec connectionSpec;
161162
private FrameWriter testFrameWriter;
162-
// Used by test only.
163-
Runnable connectedCallback;
163+
164+
// The following fields should only be used for test.
165+
Runnable connectingCallback;
166+
SettableFuture<Void> connectedFuture;
164167

165168
OkHttpClientTransport(String host, int port, String authorityHost, Executor executor,
166169
@Nullable SSLSocketFactory sslSocketFactory, ConnectionSpec connectionSpec) {
@@ -183,7 +186,8 @@ class OkHttpClientTransport implements ClientTransport {
183186
*/
184187
@VisibleForTesting
185188
OkHttpClientTransport(Executor executor, FrameReader frameReader, FrameWriter testFrameWriter,
186-
int nextStreamId, Socket socket, Ticker ticker, Runnable connectedCallback) {
189+
int nextStreamId, Socket socket, Ticker ticker,
190+
@Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture) {
187191
host = null;
188192
port = 0;
189193
authorityHost = null;
@@ -196,7 +200,8 @@ class OkHttpClientTransport implements ClientTransport {
196200
this.nextStreamId = nextStreamId;
197201
this.ticker = ticker;
198202
this.connectionSpec = null;
199-
this.connectedCallback = Preconditions.checkNotNull(connectedCallback);
203+
this.connectingCallback = connectingCallback;
204+
this.connectedFuture = Preconditions.checkNotNull(connectedFuture);
200205
}
201206

202207
private boolean isForTest() {
@@ -319,12 +324,20 @@ public void start(Listener listener) {
319324
@Override
320325
public void run() {
321326
if (isForTest()) {
327+
if (connectingCallback != null) {
328+
connectingCallback.run();
329+
}
322330
clientFrameHandler = new ClientFrameHandler(testFrameReader);
323331
executor.execute(clientFrameHandler);
324-
connectedCallback.run();
332+
synchronized (lock) {
333+
maxConcurrentStreams = Integer.MAX_VALUE;
334+
}
325335
frameWriter.becomeConnected(testFrameWriter, socket);
336+
startPendingStreams();
337+
connectedFuture.set(null);
326338
return;
327339
}
340+
328341
BufferedSource source;
329342
BufferedSink sink;
330343
Socket sock;
@@ -355,6 +368,7 @@ public void run() {
355368
return;
356369
}
357370
socket = sock;
371+
maxConcurrentStreams = Integer.MAX_VALUE;
358372
}
359373

360374
Variant variant = new Http2();
@@ -377,6 +391,7 @@ public void run() {
377391
OkHttpClientTransport.this.listener.transportReady();
378392
}
379393
executor.execute(clientFrameHandler);
394+
startPendingStreams();
380395
}
381396
});
382397
}

okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java

Lines changed: 41 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@
107107
import java.util.concurrent.Executors;
108108
import java.util.concurrent.TimeUnit;
109109

110+
import javax.annotation.Nullable;
111+
110112
/**
111113
* Tests for {@link OkHttpClientTransport}.
112114
*/
@@ -128,10 +130,10 @@ public class OkHttpClientTransportTest {
128130
private ClientTransport.Listener transportListener;
129131
private OkHttpClientTransport clientTransport;
130132
private MockFrameReader frameReader;
131-
private ClientFrameHandler frameHandler;
132133
private ExecutorService executor;
133134
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
134-
private ConnectedCallback connectedCallback;
135+
private SettableFuture<Void> connectedFuture;
136+
private DelayConnectedCallback delayConnectedCallback;
135137

136138
/** Set up for test. */
137139
@Before
@@ -144,12 +146,22 @@ public void setUp() {
144146
frameReader = new MockFrameReader();
145147
}
146148

147-
private void initTransport() {
148-
initTransport(3, new ConnectedCallback(false));
149+
private void initTransport() throws Exception {
150+
startTransport(3, null, true);
151+
}
152+
153+
private void initTransport(int startId) throws Exception {
154+
startTransport(startId, null, true);
155+
}
156+
157+
private void initTransportAndDelayConnected() throws Exception {
158+
delayConnectedCallback = new DelayConnectedCallback();
159+
startTransport(3, delayConnectedCallback, false);
149160
}
150161

151-
private void initTransport(int startId, ConnectedCallback connectedCallback) {
152-
this.connectedCallback = connectedCallback;
162+
private void startTransport(int startId, @Nullable Runnable connectingCallback,
163+
boolean waitingForConnected) throws Exception {
164+
connectedFuture = SettableFuture.create();
153165
Ticker ticker = new Ticker() {
154166
@Override
155167
public long read() {
@@ -158,8 +170,11 @@ public long read() {
158170
};
159171
clientTransport = new OkHttpClientTransport(
160172
executor, frameReader, frameWriter, startId,
161-
new MockSocket(frameReader), ticker, connectedCallback);
173+
new MockSocket(frameReader), ticker, connectingCallback, connectedFuture);
162174
clientTransport.start(transportListener);
175+
if (waitingForConnected) {
176+
connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
177+
}
163178
}
164179

165180
/** Final test checks and clean up. */
@@ -601,7 +616,7 @@ public void receiveGoAway() throws Exception {
601616
@Test
602617
public void streamIdExhausted() throws Exception {
603618
int startId = Integer.MAX_VALUE - 2;
604-
initTransport(startId, new ConnectedCallback(false));
619+
initTransport(startId);
605620

606621
MockStreamListener listener = new MockStreamListener();
607622
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
@@ -728,7 +743,7 @@ public void pendingStreamFailedByShutdown() throws Exception {
728743
@Test
729744
public void pendingStreamFailedByIdExhausted() throws Exception {
730745
int startId = Integer.MAX_VALUE - 4;
731-
initTransport(startId, new ConnectedCallback(false));
746+
initTransport(startId);
732747
setMaxConcurrentStreams(1);
733748

734749
final MockStreamListener listener1 = new MockStreamListener();
@@ -1057,7 +1072,7 @@ public void ping_failsIfTransportFails() throws Exception {
10571072

10581073
@Test
10591074
public void writeBeforeConnected() throws Exception {
1060-
initTransport(3, new ConnectedCallback(true));
1075+
initTransportAndDelayConnected();
10611076
final String message = "Hello Server";
10621077
MockStreamListener listener = new MockStreamListener();
10631078
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
@@ -1067,7 +1082,7 @@ public void writeBeforeConnected() throws Exception {
10671082
// The message should be queued.
10681083
verifyNoMoreInteractions(frameWriter);
10691084

1070-
connectedCallback.allowConnected();
1085+
allowTransportConnected();
10711086

10721087
// The queued message should be sent out.
10731088
ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class);
@@ -1080,7 +1095,7 @@ public void writeBeforeConnected() throws Exception {
10801095

10811096
@Test
10821097
public void cancelBeforeConnected() throws Exception {
1083-
initTransport(3, new ConnectedCallback(true));
1098+
initTransportAndDelayConnected();
10841099
final String message = "Hello Server";
10851100
MockStreamListener listener = new MockStreamListener();
10861101
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
@@ -1090,41 +1105,24 @@ public void cancelBeforeConnected() throws Exception {
10901105
stream.cancel(Status.CANCELLED);
10911106
verifyNoMoreInteractions(frameWriter);
10921107

1093-
connectedCallback.allowConnected();
1094-
1095-
// There should be 4 pending operations
1096-
verify(frameWriter, timeout(TIME_OUT_MS)).synStream(
1097-
eq(false), eq(false), eq(3), eq(0), Matchers.<List<Header>>any());
1098-
verify(frameWriter, timeout(TIME_OUT_MS)).flush();
1099-
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
1100-
1101-
// TODO(madongfly): Is this really what we want, we may just throw away the messages of
1102-
// a cancelled stream.
1103-
verify(frameWriter, timeout(TIME_OUT_MS))
1104-
.data(eq(false), eq(3), any(Buffer.class), eq(12 + HEADER_LENGTH));
1108+
allowTransportConnected();
1109+
verifyNoMoreInteractions(frameWriter);
11051110
}
11061111

11071112
@Test
11081113
public void shutdownDuringConnecting() throws Exception {
1109-
initTransport(3, new ConnectedCallback(true));
1114+
initTransportAndDelayConnected();
11101115
final String message = "Hello Server";
11111116
MockStreamListener listener = new MockStreamListener();
11121117
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
11131118

11141119
clientTransport.shutdown();
1115-
connectedCallback.allowConnected();
1120+
allowTransportConnected();
11161121

1117-
// The new stream should be failed, but the started stream should not be affected.
1122+
// The new stream should be failed, as well as the pending stream.
11181123
assertNewStreamFail();
1119-
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
1120-
stream.writeMessage(input);
1121-
stream.flush();
1122-
ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class);
1123-
verify(frameWriter, timeout(TIME_OUT_MS))
1124-
.data(eq(false), eq(3), captor.capture(), eq(12 + HEADER_LENGTH));
1125-
Buffer sentFrame = captor.getValue();
1126-
assertEquals(createMessageFrame(message), sentFrame);
1127-
stream.cancel(Status.CANCELLED);
1124+
listener.waitUntilStreamClosed();
1125+
assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode());
11281126
}
11291127

11301128
private int activeStreamCount() {
@@ -1140,11 +1138,7 @@ void assertContainStream(int streamId) {
11401138
}
11411139

11421140
private ClientFrameHandler frameHandler() throws Exception {
1143-
if (frameHandler == null) {
1144-
connectedCallback.waitUntilConnected();
1145-
frameHandler = clientTransport.getHandler();
1146-
}
1147-
return frameHandler;
1141+
return clientTransport.getHandler();
11481142
}
11491143

11501144
private void waitForStreamPending(int expected) throws Exception {
@@ -1353,31 +1347,20 @@ public void pingFailed(Throwable cause) {
13531347
}
13541348
}
13551349

1356-
private class ConnectedCallback implements Runnable {
1357-
SettableFuture<Void> connected;
1358-
SettableFuture<Void> delayed;
1350+
private void allowTransportConnected() {
1351+
delayConnectedCallback.allowConnected();
1352+
}
13591353

1360-
private ConnectedCallback(boolean delayConnection) {
1361-
connected = SettableFuture.create();
1362-
if (delayConnection) {
1363-
delayed = SettableFuture.create();
1364-
}
1365-
}
1354+
private class DelayConnectedCallback implements Runnable {
1355+
SettableFuture<Void> delayed = SettableFuture.create();
13661356

13671357
@Override
13681358
public void run() {
1369-
if (delayed != null) {
1370-
Futures.getUnchecked(delayed);
1371-
}
1372-
connected.set(null);
1359+
Futures.getUnchecked(delayed);
13731360
}
13741361

13751362
void allowConnected() {
13761363
delayed.set(null);
13771364
}
1378-
1379-
void waitUntilConnected() throws Exception {
1380-
connected.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
1381-
}
13821365
}
13831366
}

0 commit comments

Comments
 (0)