Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to support start/stop accepting connections #1741

Merged
merged 3 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ public Completable closeAsync() {
public Completable closeAsyncGracefully() {
return delegate.closeAsyncGracefully();
}

@Override
public void acceptConnections(final boolean accept) {
delegate.acceptConnections(accept);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.transport.api.ServerListenContext;

import static java.util.Objects.requireNonNull;

/**
* A {@link HttpConnectionContext} for use in the {@link HttpService} context.
*/
public abstract class HttpServiceContext implements HttpConnectionContext {
public abstract class HttpServiceContext implements HttpConnectionContext, ServerListenContext {
private final HttpHeadersFactory headersFactory;
private final HttpResponseFactory factory;
private final StreamingHttpResponseFactory streamingFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,9 @@ public Completable onClose() {
public Completable closeAsync() {
return completed();
}

@Override
public void acceptConnections(final boolean accept) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ private H2ServerParentConnectionContext(final Channel channel, final BufferAlloc
this.listenAddress = requireNonNull(listenAddress);
}

@Override
public void acceptConnections(final boolean accept) {
channel().config().setAutoRead(accept);
}

@Override
public SocketAddress listenAddress() {
return listenAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ public SocketAddress listenAddress() {
return delegate.listenAddress();
}

@Override
public void acceptConnections(final boolean accept) {
delegate.acceptConnections(accept);
}

@Override
public ExecutionContext executionContext() {
return delegate.executionContext();
Expand Down Expand Up @@ -484,6 +489,12 @@ public Channel nettyChannel() {
return connection.nettyChannel();
}

@Override
public void acceptConnections(final boolean accept) {
assert connection.nettyChannel().parent() != null;
connection.nettyChannel().parent().config().setAutoRead(accept);
}

@Override
public String toString() {
return getClass().getSimpleName() + '(' + connection + ')';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private void startServer() throws Exception {
.protocols(protocol)
.transportObserver(serverTransportObserver)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true);
configureServerBuilder(serverBuilder);
if (sslEnabled) {
serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem,
DefaultTestCerts::loadServerKey).build());
Expand Down Expand Up @@ -194,7 +195,10 @@ private void startServer() throws Exception {
httpConnection = httpClient.reserveConnection(httpClient.get("/")).toFuture().get();
}

private SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
}

protected SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
return HttpClients.forResolvedAddress(serverHostAndPort(serverContext));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConnectTimeoutException;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;

import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static io.netty.util.internal.PlatformDependent.normalizedOs;
import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES;
import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax;
import static io.servicetalk.concurrent.api.BlockingTestUtils.await;
import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely;
import static io.servicetalk.http.api.DefaultHttpHeadersFactory.INSTANCE;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpRequestMethod.GET;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO;
import static io.servicetalk.logging.api.LogLevel.TRACE;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT;
import static java.lang.Boolean.TRUE;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest {

private static final boolean IS_LINUX = "linux".equals(normalizedOs());
// There is an off-by-one behavior difference between macOS & Linux.
// Linux has a greater-than check
// (see. https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/include/net/sock.h#L941)
private static final int TCP_BACKLOG = IS_LINUX ? 0 : 1;
private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(5).toMillis();
private static final int VERIFY_REQUEST_AWAIT_SECS = 5;
private static final int TRY_REQUEST_AWAIT_SECS = 1;

private final StreamingHttpRequestResponseFactory reqRespFactory =
new DefaultStreamingHttpRequestResponseFactory(DEFAULT_ALLOCATOR, INSTANCE, HTTP_1_1);

@Override
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
serverBuilder.listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, TCP_BACKLOG);
}

@Test
void testStopAcceptingAndResume() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO);

assertConnectionRequestSucceeds(request);

serverContext().acceptConnections(false);
// Netty will evaluate the auto-read on the next round, so the next connection will go through.
assertConnectionRequestSucceeds(request);
// This connection should get established but not accepted.
assertConnectionRequestReceiveTimesOut(request);

// Restoring auto-read will resume accepting.
serverContext().acceptConnections(true);
assertConnectionRequestSucceeds(request);
}

@Test
void testIdleTimeout() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO);
assertConnectionRequestSucceeds(request);

serverContext().acceptConnections(false);
// Netty will evaluate the auto-read on the next round, so the next connection will go through.
assertConnectionRequestSucceeds(request);
// Connection will establish but remain in the accept-queue
// (i.e., NOT accepted by the server => occupying 1 backlog entry)
assertConnectionRequestReceiveTimesOut(request);
final StreamingHttpClient httpClient2 = newClient();
// Since we control the backlog size, this connection won't establish (i.e., NO syn-ack)
// timeout operator can be used to kill it or socket connection-timeout
final ExecutionException executionException =
assertThrows(ExecutionException.class, () -> awaitIndefinitely(httpClient2.request(request)));
assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class));
}

private void assertConnectionRequestReceiveTimesOut(final StreamingHttpRequest request) {
final StreamingHttpClient httpClient = newClient();
assertThrows(TimeoutException.class, () -> await(httpClient.request(request), TRY_REQUEST_AWAIT_SECS, SECONDS));
}

private StreamingHttpClient newClient() {
return newClientBuilder()
.appendConnectionFactoryFilter(withMax(1))
.autoRetryStrategy(DISABLE_AUTO_RETRIES)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue)
// It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing.
.socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS)
.buildStreaming();
}

private void assertConnectionRequestSucceeds(final StreamingHttpRequest request) throws Exception {
final StreamingHttpClient httpClient = newClientBuilder().buildStreaming();
final StreamingHttpResponse response = await(httpClient.request(request), VERIFY_REQUEST_AWAIT_SECS, SECONDS);
assert response != null;
assertResponse(response, HTTP_1_1, OK, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public ExecutionContext executionContext() {
return serverContext.executionContext();
}

@Override
public void acceptConnections(final boolean accept) {
serverContext.acceptConnections(accept);
}

@Override
public Completable onClose() {
return serverContext.onClose();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@
/**
* Context for servers.
*/
public interface ServerContext extends ListenableAsyncCloseable, GracefulAutoCloseable {
public interface ServerContext extends ServerListenContext, ListenableAsyncCloseable, GracefulAutoCloseable {

/**
* Listen address for the server associated with this context.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.transport.api;

/**
* Context for controlling listen behavior.
*/
public interface ServerListenContext {
/**
* Toggles the server's channel accepting connection ability.
* <p>
* Passing a {@code false} value, will stop accepting connections on the server channel, without affecting any other
* interaction to currently open child channels (i.e., reads / writes).
* <p>
* Depending on the transport, connections may still get ESTABLISHED, see.
* {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} and (in case of Linux)
* <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">SOMAXCONN</a>.
* For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the
* accept queue to be accepted. If the accept queue is full, connection SYNs will await in the
* SYN backlog (i.e., in case of Linux <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">
* tcp_max_syn_backlog</a>). These additional parameters may affect the behavior of new flows when the service
* is not accepting.
* <p>
* Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout
* or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies).
* <p>
* Considerations:
* <ul>
* <li>Upon resumption, {@code accept == true}, backlogged connections will be processed first,
* which may be inactive by that time.</li>
* <li>The effect of toggling connection acceptance may be lazy evaluated (implementation detail), meaning
* that connections may still go through even after setting this to {@code false}.</li>
* </ul>
* @param accept Toggles the server's accepting connection ability.
*/
void acceptConnections(boolean accept);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@
public final class ServiceTalkSocketOptions {

/**
* The connect timeout in milliseconds.
* Connect timeout in milliseconds.
*/
public static final SocketOption<Integer> CONNECT_TIMEOUT =
new ServiceTalkSocketOption<>("CONNECT_TIMEOUT", Integer.class);

/**
* The threshold after which the the Endpoint is not writable anymore.
* The threshold after which the Endpoint is not writable anymore.
*/
public static final SocketOption<Integer> WRITE_BUFFER_THRESHOLD =
new ServiceTalkSocketOption<>("WRITE_BUFFER_THRESHOLD", Integer.class);

/**
* Allow to idle timeout in milli seconds after which the connection is closed.
* Connection idle timeout in milliseconds after which the connection is closed.
*/
public static final SocketOption<Long> IDLE_TIMEOUT = new ServiceTalkSocketOption<>("IDLE_TIMEOUT", Long.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,11 @@ public SocketAddress listenAddress() {
return listenChannel.localAddress();
}

@Override
public void acceptConnections(final boolean accept) {
listenChannel.config().setAutoRead(accept);
}

@Override
public ExecutionContext executionContext() {
return executionContext;
Expand Down