Skip to content

Commit

Permalink
Do not abort server write if the CloseEvent is already registered (#1177
Browse files Browse the repository at this point in the history
)

Motivation:

If a server receives a request with `Connection: close` header, it registers
`PROTOCOL_CLOSING_INBOUND` event and transitions to the `CLOSING` state.
This event can be observed before the server connection started processing
requests. As the result, server's write stream fails and does not let the server
respond.
This use-case happens frequently when `TLSv1.3` is used, because a client
starts sending data before the server completes TLS handshake. Therefore,
the first request can be pending before the server connection is established.
A similar scenario (request received before processing starts, closing state
waits for the response) may occur with graceful closure or
`CHANNEL_CLOSED_INBOUND` event (client half-closes the connection
after a request is sent).

Modifications:

- Abort new writes only on the client-side in DefaultNettyConnection;
- Add tests that reproduce described behavior;

Result:

Server initializes request-response processing and responds to the already
received request.
  • Loading branch information
idelpivnitskiy authored Oct 14, 2020
1 parent 51c36ab commit 573c998
Show file tree
Hide file tree
Showing 17 changed files with 66 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ private void childChannelActive(Future<Http2StreamChannel> future,
HTTP_2_0,
parentContext.sslSession(),
parentContext.nettyChannel().config(),
streamObserver);
streamObserver,
true);

// In h2 a stream is 1 to 1 with a request/response life cycle. This means there is no concept of
// pipelining on a stream so we can use the non-pipelined connection which is more light weight.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ protected void initChannel(final Http2StreamChannel streamChannel) {
HTTP_2_0,
connection.sslSession(),
channel.config(),
streamObserver);
streamObserver,
false);

// ServiceTalk HTTP service handler
new NettyHttpServerConnection(streamConnection, service,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ static Single<NettyHttpServerConnection> initChannel(final Channel channel,
httpExecutionContext.bufferAllocator(), httpExecutionContext.executor(), LAST_CHUNK_PREDICATE,
closeHandler, config.tcpConfig().flushStrategy(), config.tcpConfig().idleTimeoutMs(),
initializer.andThen(getChannelInitializer(getByteBufAllocator(httpExecutionContext.bufferAllocator()),
h1Config, closeHandler)), httpExecutionContext.executionStrategy(), HTTP_1_1, observer)
h1Config, closeHandler)), httpExecutionContext.executionStrategy(), HTTP_1_1, observer, false)
.map(conn -> new NettyHttpServerConnection(conn, service, httpExecutionContext.executionStrategy(),
h1Config.headersFactory(), drainRequestPayloadBody)), HTTP_1_1, channel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ static Single<? extends DefaultNettyConnection<Object, Object>> createConnection
executionContext.executor(), LAST_CHUNK_PREDICATE, closeHandler, config.tcpConfig().flushStrategy(),
config.tcpConfig().idleTimeoutMs(), initializer.andThen(new HttpClientChannelInitializer(
getByteBufAllocator(executionContext.bufferAllocator()), config.h1Config(), closeHandler)),
executionContext.executionStrategy(), HTTP_1_1, connectionObserver), HTTP_1_1, channel);
executionContext.executionStrategy(), HTTP_1_1, connectionObserver, true), HTTP_1_1, channel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ public Completable accept(final ConnectionContext context) {
// Dummy proxy helps to emulate old intermediate systems that do not support half-closed TCP connections
proxyTunnel = new ProxyTunnel();
proxyAddress = proxyTunnel.startProxy();
serverBuilder.secure()
.protocols("TLSv1.2") // FIXME: remove after https://github.com/apple/servicetalk/pull/1156
.commit(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey);
serverBuilder.secure().commit(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey);
} else {
proxyTunnel = null;
}
Expand Down Expand Up @@ -187,7 +185,6 @@ public Completable accept(final ConnectionContext context) {

client = (viaProxy ? HttpClients.forSingleAddressViaProxy(serverHostAndPort(serverContext), proxyAddress)
.secure().disableHostnameVerification()
.protocols("TLSv1.2") // FIXME: remove after https://github.com/apple/servicetalk/pull/1156
.trustManager(DefaultTestCerts::loadServerCAPem)
.commit() :
HttpClients.forResolvedAddress(serverContext.listenAddress()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ public Completable accept(final ConnectionContext context) {
// Dummy proxy helps to emulate old intermediate systems that do not support half-closed TCP connections
proxyTunnel = new ProxyTunnel();
proxyAddress = proxyTunnel.startProxy();
serverBuilder.secure()
.protocols("TLSv1.2")
.commit(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey);
serverBuilder.secure().commit(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey);
} else {
proxyTunnel = null;
}
Expand Down Expand Up @@ -193,7 +191,6 @@ public Completable accept(final ConnectionContext context) {

client = (viaProxy ? HttpClients.forSingleAddressViaProxy(serverHostAndPort(serverContext), proxyAddress)
.secure().disableHostnameVerification()
.protocols("TLSv1.2")
.trustManager(DefaultTestCerts::loadServerCAPem)
.commit() :
HttpClients.forResolvedAddress(serverContext.listenAddress()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ public void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exc
channel2 -> {
serverChannelRef.compareAndSet(null, channel2);
serverChannelLatch.countDown();
}), defaultStrategy(), mock(Protocol.class), observer);
}), defaultStrategy(), mock(Protocol.class), observer, false);
},
connection -> { }).toFuture().get());
ReadOnlyHttpClientConfig cConfig = new HttpClientConfig().asReadOnly();
Expand Down Expand Up @@ -460,7 +460,7 @@ public void userEventTriggered(ChannelHandlerContext ctx,
}
}
})), defaultStrategy(), HTTP_1_1,
NoopConnectionObserver.INSTANCE);
NoopConnectionObserver.INSTANCE, true);
}
).toFuture().get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void setUp() throws Exception {
final DefaultNettyConnection<Integer, Integer> connection =
DefaultNettyConnection.<Integer, Integer>initChannel(channel, DEFAULT_ALLOCATOR,
immediate(), obj -> true, UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, defaultFlushStrategy(), null,
channel2 -> { }, defaultStrategy(), mock(Protocol.class), NoopConnectionObserver.INSTANCE)
channel2 -> { }, defaultStrategy(), mock(Protocol.class), NoopConnectionObserver.INSTANCE, true)
.toFuture().get();
requester = new NettyPipelinedConnection<>(connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public ServerRespondsOnClosingTest() throws Exception {
config.tcpConfig(), connectionObserver),
toStreamingHttpService(service, strategy -> strategy).adaptor(), true,
connectionObserver).toFuture().get();
serverConnection.process(true);
}

@After
Expand All @@ -104,6 +103,7 @@ public void tearDown() throws Exception {

@Test
public void protocolClosingInboundPipelinedFirstInitiatesClosure() throws Exception {
serverConnection.process(true); // Start request processing (read and write)
sendRequest("/first", true);
// The following request after "Connection: close" header violates the spec, but we want to verify that server
// discards those requests and do not respond to them:
Expand All @@ -115,6 +115,7 @@ public void protocolClosingInboundPipelinedFirstInitiatesClosure() throws Except

@Test
public void protocolClosingInboundPipelinedSecondInitiatesClosure() throws Exception {
serverConnection.process(true); // Start request processing (read and write)
sendRequest("/first", false);
sendRequest("/second", true);
handleRequests();
Expand All @@ -125,6 +126,7 @@ public void protocolClosingInboundPipelinedSecondInitiatesClosure() throws Excep

@Test
public void protocolClosingOutboundPipelinedFirstInitiatesClosure() throws Exception {
serverConnection.process(true); // Start request processing (read and write)
sendRequest("/first?serverShouldClose=true", false);
sendRequest("/second", false);
handleRequests();
Expand All @@ -136,6 +138,7 @@ public void protocolClosingOutboundPipelinedFirstInitiatesClosure() throws Excep

@Test
public void protocolClosingOutboundPipelinedSecondInitiatesClosure() throws Exception {
serverConnection.process(true); // Start request processing (read and write)
sendRequest("/first", false);
sendRequest("/second?serverShouldClose=true", false);
handleRequests();
Expand All @@ -147,6 +150,7 @@ public void protocolClosingOutboundPipelinedSecondInitiatesClosure() throws Exce

@Test
public void gracefulClosurePipelined() throws Exception {
serverConnection.process(true); // Start request processing (read and write)
sendRequest("/first", false);
sendRequest("/second", false);
serverConnection.closeAsyncGracefully().subscribe();
Expand All @@ -161,6 +165,7 @@ public void gracefulClosurePipelined() throws Exception {

@Test
public void gracefulClosurePipelinedDiscardPartialRequest() throws Exception {
serverConnection.process(true); // Start request processing (read and write)
sendRequest("/first", false);
// Send only initial line with CRLF that should hang in ByteToMessage cumulation buffer and will be discarded:
channel.writeInbound(writeAscii(PooledByteBufAllocator.DEFAULT, "GET /second HTTP/1.1"));
Expand All @@ -174,6 +179,7 @@ public void gracefulClosurePipelinedDiscardPartialRequest() throws Exception {

@Test
public void gracefulClosurePipelinedFirstResponseClosesConnection() throws Exception {
serverConnection.process(true); // Start request processing (read and write)
sendRequest("/first?serverShouldClose=true", false); // PROTOCOL_CLOSING_OUTBOUND
sendRequest("/second", false);
serverConnection.closeAsyncGracefully().subscribe();
Expand All @@ -185,6 +191,29 @@ public void gracefulClosurePipelinedFirstResponseClosesConnection() throws Excep
assertServerConnectionClosed();
}

@Test
public void protocolClosingInboundBeforeProcessingStarts() throws Exception {
sendRequest("/first", true);
// Start request processing (read and write) after request was received:
serverConnection.process(true);
handleRequests();
verifyResponse("/first");
assertServerConnectionClosed();
}

@Test
public void gracefulClosureBeforeProcessingStarts() throws Exception {
sendRequest("/first", false);
serverConnection.closeAsyncGracefully().subscribe();

// Start request processing (read and write) after request was received:
serverConnection.process(true);
handleRequests();
verifyResponse("/first");
respondWithFIN();
assertServerConnectionClosed();
}

private void sendRequest(String requestTarget, boolean addCloseHeader) {
channel.writeInbound(writeAscii(PooledByteBufAllocator.DEFAULT, "GET " + requestTarget + " HTTP/1.1\r\n" +
"Host: localhost\r\n" +
Expand Down Expand Up @@ -220,7 +249,7 @@ private void verifyResponse(String requestPath) {

private void respondWithFIN() {
assertThat("Server did not shutdown output", channel.isOutputShutdown(), is(true));
channel.shutdownInput(); // simulate FIN from the client
channel.shutdownInput().syncUninterruptibly(); // simulate FIN from the client
}

private void assertServerConnectionClosed() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
}
});
}, CLIENT_CTX.executionStrategy(), mock(Protocol.class), NoopConnectionObserver.INSTANCE))
}, CLIENT_CTX.executionStrategy(), mock(Protocol.class), NoopConnectionObserver.INSTANCE, true))
.toFuture().get();
connection.closeAsync().toFuture().get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Single<NettyConnection<Buffer, Buffer>> connect(ExecutionContext executio
UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, config.flushStrategy(), config.idleTimeoutMs(),
new TcpClientChannelInitializer(config, connectionObserver).andThen(
channel2 -> channel2.pipeline().addLast(BufferHandler.INSTANCE)),
executionContext.executionStrategy(), TCP, connectionObserver);
executionContext.executionStrategy(), TCP, connectionObserver, true);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ServerContext bind(ExecutionContext executionContext, int port,
UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, config.flushStrategy(), config.idleTimeoutMs(),
new TcpServerChannelInitializer(config, connectionObserver)
.andThen(getChannelInitializer(service, executionContext)), executionStrategy, TCP,
connectionObserver);
connectionObserver, false);
},
serverConnection -> service.apply(serverConnection)
.beforeOnError(throwable -> LOGGER.error("Error handling a connection.", throwable))
Expand Down
Loading

0 comments on commit 573c998

Please sign in to comment.