Skip to content

Commit

Permalink
API to support start/stop accepting connections (#1741)
Browse files Browse the repository at this point in the history
API to support start/stop accepting connections

Motivation:

Expose a way a user can yield accepting connection on server side, and resume on demand.?

Modifications:

ServerContext now supports an additional `acceptConnections(bool)` API that can be used to
hint to the server the need for start/stop accepting.

Result:

More ways to control the service on-demand.
  • Loading branch information
tkountis authored Aug 26, 2021
1 parent dfd5900 commit 6beebe8
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,9 @@ public Completable closeAsync() {
public Completable closeAsyncGracefully() {
return delegate.closeAsyncGracefully();
}

@Override
public void acceptConnections(final boolean accept) {
delegate.acceptConnections(accept);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.transport.api.ServerListenContext;

import static java.util.Objects.requireNonNull;

/**
* A {@link HttpConnectionContext} for use in the {@link HttpService} context.
*/
public abstract class HttpServiceContext implements HttpConnectionContext {
public abstract class HttpServiceContext implements HttpConnectionContext, ServerListenContext {
private final HttpHeadersFactory headersFactory;
private final HttpResponseFactory factory;
private final StreamingHttpResponseFactory streamingFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,9 @@ public Completable onClose() {
public Completable closeAsync() {
return completed();
}

@Override
public void acceptConnections(final boolean accept) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ private H2ServerParentConnectionContext(final Channel channel, final HttpExecuti
this.listenAddress = requireNonNull(listenAddress);
}

@Override
public void acceptConnections(final boolean accept) {
channel().parent().config().setAutoRead(accept);
}

@Override
public SocketAddress listenAddress() {
return listenAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ public SocketAddress listenAddress() {
return delegate.listenAddress();
}

@Override
public void acceptConnections(final boolean accept) {
delegate.acceptConnections(accept);
}

@Override
public ExecutionContext executionContext() {
return delegate.executionContext();
Expand Down Expand Up @@ -481,6 +486,12 @@ public Channel nettyChannel() {
return connection.nettyChannel();
}

@Override
public void acceptConnections(final boolean accept) {
assert connection.nettyChannel().parent() != null;
connection.nettyChannel().parent().config().setAutoRead(accept);
}

@Override
public String toString() {
return connection.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ private void startServer() throws Exception {
.protocols(protocol)
.transportObserver(serverTransportObserver)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true);
configureServerBuilder(serverBuilder);
if (sslEnabled) {
serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem,
DefaultTestCerts::loadServerKey).build());
Expand Down Expand Up @@ -198,7 +199,10 @@ private void startServer() throws Exception {
httpConnection = httpClient.reserveConnection(httpClient.get("/")).toFuture().get();
}

private SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
}

protected SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
return HttpClients.forResolvedAddress(serverHostAndPort(serverContext));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConnectTimeoutException;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.HostAndPort;

import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static io.netty.util.internal.PlatformDependent.normalizedOs;
import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES;
import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax;
import static io.servicetalk.concurrent.api.BlockingTestUtils.await;
import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpRequestMethod.GET;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO;
import static io.servicetalk.logging.api.LogLevel.TRACE;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.SO_BACKLOG;
import static java.lang.Boolean.TRUE;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest {

private static final boolean IS_LINUX = "linux".equals(normalizedOs());
// There is an off-by-one behavior difference between macOS & Linux.
// Linux has a greater-than check
// (see. https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/include/net/sock.h#L941)
private static final int TCP_BACKLOG = IS_LINUX ? 0 : 1;
private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(1).toMillis();
private static final int VERIFY_REQUEST_AWAIT_MILLIS = 500;
private static final int TRY_REQUEST_AWAIT_MILLIS = 500;

@Override
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
serverBuilder.listenSocketOption(SO_BACKLOG, TCP_BACKLOG);
}

@Override
protected SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
return super.newClientBuilder()
.appendConnectionFactoryFilter(withMax(5))
.autoRetryStrategy(DISABLE_AUTO_RETRIES)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue)
// It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing.
.socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS);
}

@Test
void testStopAcceptingAndResume() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO);

assertConnectionRequestSucceeds(request);

serverContext().acceptConnections(false);
// Netty will evaluate the auto-read on the next round, so the next connection will go through.
assertConnectionRequestSucceeds(request);
// This connection should get established but not accepted.
assertConnectionRequestReceiveTimesOut(request);

// Restoring auto-read will resume accepting.
serverContext().acceptConnections(true);
assertConnectionRequestSucceeds(request);
}

@Test
void testIdleTimeout() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO);

assertConnectionRequestSucceeds(request);

serverContext().acceptConnections(false);
// Netty will evaluate the auto-read on the next round, so the next connection will go through.
assertConnectionRequestSucceeds(request);
// Connection will establish but remain in the accept-queue
// (i.e., NOT accepted by the server => occupying 1 backlog entry)
assertConnectionRequestReceiveTimesOut(request);
final Single<StreamingHttpResponse> response =
streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request));
// Since we control the backlog size, this connection won't establish (i.e., NO syn-ack)
// timeout operator can be used to kill it or socket connection-timeout
final ExecutionException executionException =
assertThrows(ExecutionException.class, () -> awaitIndefinitely(response));
assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class));
}

private void assertConnectionRequestReceiveTimesOut(final StreamingHttpRequest request) {
assertThrows(TimeoutException.class,
() -> await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)),
TRY_REQUEST_AWAIT_MILLIS, MILLISECONDS));
}

private void assertConnectionRequestSucceeds(final StreamingHttpRequest request) throws Exception {
final StreamingHttpResponse response =
await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)),
VERIFY_REQUEST_AWAIT_MILLIS, MILLISECONDS);
assert response != null;
assertResponse(response, HTTP_1_1, OK, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public ExecutionContext executionContext() {
return serverContext.executionContext();
}

@Override
public void acceptConnections(final boolean accept) {
serverContext.acceptConnections(accept);
}

@Override
public Completable onClose() {
return serverContext.onClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,16 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
((ReferenceCounted) msg).release();
}
}
if (msg instanceof Channel && !channelSet.addIfAbsent((Channel) msg)) {
LOGGER.warn("Channel ({}) not added to ChannelSet", msg);
if (msg instanceof Channel) {
final Channel channel = (Channel) msg;
if (!channel.isActive()) {
channel.close();
LOGGER.debug("Channel ({}) is accepted, but was already inactive", msg);
return;
} else if (!channelSet.addIfAbsent(channel)) {
LOGGER.warn("Channel ({}) not added to ChannelSet", msg);
return;
}
}
ctx.fireChannelRead(msg);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@
/**
* Context for servers.
*/
public interface ServerContext extends ListenableAsyncCloseable, GracefulAutoCloseable {
public interface ServerContext extends ServerListenContext, ListenableAsyncCloseable, GracefulAutoCloseable {

/**
* Listen address for the server associated with this context.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.transport.api;

/**
* Context for controlling listen behavior.
*/
public interface ServerListenContext {
/**
* Toggles the server's ability to accept new connections.
* <p>
* Passing a {@code false} value will signal the server to stop accepting new connections.
* It won't affect any other interactions to currently open connections (i.e., reads / writes).
* <p>
* Depending on the transport, connections may still get ESTABLISHED, see
* {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} or OS wide settings:
* <ul>
* <li>Linux: <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">SOMAXCONN</a></li>
* <li>MacOS/BSD: <a href="https://docs.freebsd.org/en/books/handbook/config/#configtuning-kernel-limits">
* kern.ipc.somaxconn / kern.ipc.soacceptqueue</a></li>
* </ul>
* For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the
* accept queue to be accepted. If the accept queue is full, connection SYNs will await in the
* SYN backlog (in the case of linux). This can be tuned:
* <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">tcp_max_syn_backlog</a>
* These additional parameters may affect the behavior of new flows when the service is not accepting.
* <p>
* Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout
* or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies).
* <p>
* Considerations:
* <ul>
* <li>Upon resumption, {@code accept == true}, backlogged connections will be processed first,
* which may be inactive by that time.</li>
* <li>The effect of toggling connection acceptance may be lazy evaluated (implementation detail), meaning
* that connections may still go through even after setting this to {@code false}.</li>
* </ul>
* @param accept Toggles the server's accepting connection ability.
*/
void acceptConnections(boolean accept);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@
public final class ServiceTalkSocketOptions {

/**
* The connect timeout in milliseconds.
* Connect timeout in milliseconds.
*/
public static final SocketOption<Integer> CONNECT_TIMEOUT =
new ServiceTalkSocketOption<>("CONNECT_TIMEOUT", Integer.class);

/**
* The threshold after which the the Endpoint is not writable anymore.
* The threshold after which the Endpoint is not writable anymore.
*/
public static final SocketOption<Integer> WRITE_BUFFER_THRESHOLD =
new ServiceTalkSocketOption<>("WRITE_BUFFER_THRESHOLD", Integer.class);

/**
* Allow to idle timeout in milli seconds after which the connection is closed.
* Connection idle timeout in milliseconds after which the connection is closed.
*/
public static final SocketOption<Long> IDLE_TIMEOUT = new ServiceTalkSocketOption<>("IDLE_TIMEOUT", Long.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,11 @@ public SocketAddress listenAddress() {
return listenChannel.localAddress();
}

@Override
public void acceptConnections(final boolean accept) {
listenChannel.config().setAutoRead(accept);
}

@Override
public ExecutionContext executionContext() {
return executionContext;
Expand Down

0 comments on commit 6beebe8

Please sign in to comment.