diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java index e30119ce62..e9e77dbdd6 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java @@ -100,4 +100,9 @@ public Completable closeAsync() { public Completable closeAsyncGracefully() { return delegate.closeAsyncGracefully(); } + + @Override + public void acceptConnections(final boolean accept) { + delegate.acceptConnections(accept); + } } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServiceContext.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServiceContext.java index ad9dc5ed09..820319e7e0 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServiceContext.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServiceContext.java @@ -15,12 +15,14 @@ */ package io.servicetalk.http.api; +import io.servicetalk.transport.api.ServerListenContext; + import static java.util.Objects.requireNonNull; /** * A {@link HttpConnectionContext} for use in the {@link HttpService} context. */ -public abstract class HttpServiceContext implements HttpConnectionContext { +public abstract class HttpServiceContext implements HttpConnectionContext, ServerListenContext { private final HttpHeadersFactory headersFactory; private final HttpResponseFactory factory; private final StreamingHttpResponseFactory streamingFactory; diff --git a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java index 19ca93eabb..ed1e75d874 100644 --- a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java +++ b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java @@ -102,4 +102,9 @@ public Completable onClose() { public Completable closeAsync() { return completed(); } + + @Override + public void acceptConnections(final boolean accept) { + throw new UnsupportedOperationException(); + } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index 0aa4ba9217..a52e4e8635 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -70,6 +70,11 @@ private H2ServerParentConnectionContext(final Channel channel, final HttpExecuti this.listenAddress = requireNonNull(listenAddress); } + @Override + public void acceptConnections(final boolean accept) { + channel().parent().config().setAutoRead(accept); + } + @Override public SocketAddress listenAddress() { return listenAddress; diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java index 6491d855cf..d713334efa 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java @@ -206,6 +206,11 @@ public SocketAddress listenAddress() { return delegate.listenAddress(); } + @Override + public void acceptConnections(final boolean accept) { + delegate.acceptConnections(accept); + } + @Override public ExecutionContext executionContext() { return delegate.executionContext(); @@ -481,6 +486,12 @@ public Channel nettyChannel() { return connection.nettyChannel(); } + @Override + public void acceptConnections(final boolean accept) { + assert connection.nettyChannel().parent() != null; + connection.nettyChannel().parent().config().setAutoRead(accept); + } + @Override public String toString() { return connection.toString(); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java index aededc3e5c..f995efdbd7 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java @@ -163,6 +163,7 @@ private void startServer() throws Exception { .protocols(protocol) .transportObserver(serverTransportObserver) .enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true); + configureServerBuilder(serverBuilder); if (sslEnabled) { serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey).build()); @@ -198,7 +199,10 @@ private void startServer() throws Exception { httpConnection = httpClient.reserveConnection(httpClient.get("/")).toFuture().get(); } - private SingleAddressHttpClientBuilder newClientBuilder() { + protected void configureServerBuilder(final HttpServerBuilder serverBuilder) { + } + + protected SingleAddressHttpClientBuilder newClientBuilder() { return HttpClients.forResolvedAddress(serverHostAndPort(serverContext)); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java new file mode 100644 index 0000000000..b883756a83 --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java @@ -0,0 +1,131 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.client.api.ConnectTimeoutException; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.http.api.HttpServerBuilder; +import io.servicetalk.http.api.SingleAddressHttpClientBuilder; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.transport.api.HostAndPort; + +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static io.netty.util.internal.PlatformDependent.normalizedOs; +import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES; +import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax; +import static io.servicetalk.concurrent.api.BlockingTestUtils.await; +import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; +import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; +import static io.servicetalk.http.api.HttpRequestMethod.GET; +import static io.servicetalk.http.api.HttpResponseStatus.OK; +import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED; +import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO; +import static io.servicetalk.logging.api.LogLevel.TRACE; +import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT; +import static io.servicetalk.transport.api.ServiceTalkSocketOptions.SO_BACKLOG; +import static java.lang.Boolean.TRUE; +import static java.time.Duration.ofSeconds; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest { + + private static final boolean IS_LINUX = "linux".equals(normalizedOs()); + // There is an off-by-one behavior difference between macOS & Linux. + // Linux has a greater-than check + // (see. https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/include/net/sock.h#L941) + private static final int TCP_BACKLOG = IS_LINUX ? 0 : 1; + private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(1).toMillis(); + private static final int VERIFY_REQUEST_AWAIT_MILLIS = 500; + private static final int TRY_REQUEST_AWAIT_MILLIS = 500; + + @Override + protected void configureServerBuilder(final HttpServerBuilder serverBuilder) { + serverBuilder.listenSocketOption(SO_BACKLOG, TCP_BACKLOG); + } + + @Override + protected SingleAddressHttpClientBuilder newClientBuilder() { + return super.newClientBuilder() + .appendConnectionFactoryFilter(withMax(5)) + .autoRetryStrategy(DISABLE_AUTO_RETRIES) + .enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue) + // It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing. + .socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS); + } + + @Test + void testStopAcceptingAndResume() throws Exception { + setUp(CACHED, CACHED); + final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO); + + assertConnectionRequestSucceeds(request); + + serverContext().acceptConnections(false); + // Netty will evaluate the auto-read on the next round, so the next connection will go through. + assertConnectionRequestSucceeds(request); + // This connection should get established but not accepted. + assertConnectionRequestReceiveTimesOut(request); + + // Restoring auto-read will resume accepting. + serverContext().acceptConnections(true); + assertConnectionRequestSucceeds(request); + } + + @Test + void testIdleTimeout() throws Exception { + setUp(CACHED, CACHED); + final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO); + + assertConnectionRequestSucceeds(request); + + serverContext().acceptConnections(false); + // Netty will evaluate the auto-read on the next round, so the next connection will go through. + assertConnectionRequestSucceeds(request); + // Connection will establish but remain in the accept-queue + // (i.e., NOT accepted by the server => occupying 1 backlog entry) + assertConnectionRequestReceiveTimesOut(request); + final Single response = + streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)); + // Since we control the backlog size, this connection won't establish (i.e., NO syn-ack) + // timeout operator can be used to kill it or socket connection-timeout + final ExecutionException executionException = + assertThrows(ExecutionException.class, () -> awaitIndefinitely(response)); + assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class)); + } + + private void assertConnectionRequestReceiveTimesOut(final StreamingHttpRequest request) { + assertThrows(TimeoutException.class, + () -> await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)), + TRY_REQUEST_AWAIT_MILLIS, MILLISECONDS)); + } + + private void assertConnectionRequestSucceeds(final StreamingHttpRequest request) throws Exception { + final StreamingHttpResponse response = + await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)), + VERIFY_REQUEST_AWAIT_MILLIS, MILLISECONDS); + assert response != null; + assertResponse(response, HTTP_1_1, OK, ""); + } +} diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java index 3acf37acac..0e4c53bd23 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java @@ -169,6 +169,11 @@ public ExecutionContext executionContext() { return serverContext.executionContext(); } + @Override + public void acceptConnections(final boolean accept) { + serverContext.acceptConnections(accept); + } + @Override public Completable onClose() { return serverContext.onClose(); diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java index 947b20fdbb..4c63e6e499 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java @@ -107,8 +107,16 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { ((ReferenceCounted) msg).release(); } } - if (msg instanceof Channel && !channelSet.addIfAbsent((Channel) msg)) { - LOGGER.warn("Channel ({}) not added to ChannelSet", msg); + if (msg instanceof Channel) { + final Channel channel = (Channel) msg; + if (!channel.isActive()) { + channel.close(); + LOGGER.debug("Channel ({}) is accepted, but was already inactive", msg); + return; + } else if (!channelSet.addIfAbsent(channel)) { + LOGGER.warn("Channel ({}) not added to ChannelSet", msg); + return; + } } ctx.fireChannelRead(msg); } diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java index e08401b5f9..ae707ceb32 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ /** * Context for servers. */ -public interface ServerContext extends ListenableAsyncCloseable, GracefulAutoCloseable { +public interface ServerContext extends ServerListenContext, ListenableAsyncCloseable, GracefulAutoCloseable { /** * Listen address for the server associated with this context. diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java new file mode 100644 index 0000000000..c9e372958c --- /dev/null +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.transport.api; + +/** + * Context for controlling listen behavior. + */ +public interface ServerListenContext { + /** + * Toggles the server's ability to accept new connections. + *

+ * Passing a {@code false} value will signal the server to stop accepting new connections. + * It won't affect any other interactions to currently open connections (i.e., reads / writes). + *

+ * Depending on the transport, connections may still get ESTABLISHED, see + * {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} or OS wide settings: + *

+ * For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the + * accept queue to be accepted. If the accept queue is full, connection SYNs will await in the + * SYN backlog (in the case of linux). This can be tuned: + * tcp_max_syn_backlog + * These additional parameters may affect the behavior of new flows when the service is not accepting. + *

+ * Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout + * or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies). + *

+ * Considerations: + *

    + *
  • Upon resumption, {@code accept == true}, backlogged connections will be processed first, + * which may be inactive by that time.
  • + *
  • The effect of toggling connection acceptance may be lazy evaluated (implementation detail), meaning + * that connections may still go through even after setting this to {@code false}.
  • + *
+ * @param accept Toggles the server's accepting connection ability. + */ + void acceptConnections(boolean accept); +} diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java index 1f955dab2b..d1ac762a7a 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java @@ -28,19 +28,19 @@ public final class ServiceTalkSocketOptions { /** - * The connect timeout in milliseconds. + * Connect timeout in milliseconds. */ public static final SocketOption CONNECT_TIMEOUT = new ServiceTalkSocketOption<>("CONNECT_TIMEOUT", Integer.class); /** - * The threshold after which the the Endpoint is not writable anymore. + * The threshold after which the Endpoint is not writable anymore. */ public static final SocketOption WRITE_BUFFER_THRESHOLD = new ServiceTalkSocketOption<>("WRITE_BUFFER_THRESHOLD", Integer.class); /** - * Allow to idle timeout in milli seconds after which the connection is closed. + * Connection idle timeout in milliseconds after which the connection is closed. */ public static final SocketOption IDLE_TIMEOUT = new ServiceTalkSocketOption<>("IDLE_TIMEOUT", Long.class); diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java index 75f92c202f..f6c7ffa406 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,6 +83,11 @@ public SocketAddress listenAddress() { return listenChannel.localAddress(); } + @Override + public void acceptConnections(final boolean accept) { + listenChannel.config().setAutoRead(accept); + } + @Override public ExecutionContext executionContext() { return executionContext;