Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demonstrate the capacity limiter lockup effect with a demo #3033

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions servicetalk-examples/grpc/helloworld/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
implementation project(":servicetalk-grpc-netty")
implementation project(":servicetalk-grpc-protoc")
implementation project(":servicetalk-grpc-protobuf")
implementation project(":servicetalk-traffic-resilience-http")

implementation "org.slf4j:slf4j-api:$slf4jVersion"
runtimeOnly "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,139 @@
*/
package io.servicetalk.examples.grpc.helloworld.blocking;

import io.servicetalk.capacity.limiter.api.CapacityLimiter;
import io.servicetalk.capacity.limiter.api.CapacityLimiters;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.grpc.api.GrpcServerContext;
import io.servicetalk.grpc.netty.GrpcClients;
import io.servicetalk.grpc.netty.GrpcServers;

import io.grpc.examples.helloworld.Greeter;
import io.grpc.examples.helloworld.Greeter.BlockingGreeterClient;
import io.grpc.examples.helloworld.Greeter.ClientFactory;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.servicetalk.http.utils.JavaNetSoTimeoutHttpConnectionFilter;
import io.servicetalk.traffic.resilience.http.TrafficResilienceHttpClientFilter;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static io.servicetalk.concurrent.api.Single.succeeded;

public final class BlockingHelloWorldClient {
public static void main(String[] args) throws Exception {
try (BlockingGreeterClient client = GrpcClients.forAddress("localhost", 8080)
.buildBlocking(new ClientFactory())) {
HelloReply reply = client.sayHello(HelloRequest.newBuilder().setName("World").build());
System.out.println(reply);

final double fraction = 0.1;

GrpcServerContext serverContext = GrpcServers.forPort(8080)
.listenAndAwait((Greeter.GreeterService) (ctx, request) -> {
Completable timer;
if (ThreadLocalRandom.current().nextDouble() < fraction) {
timer = io.servicetalk.concurrent.api.Executors.global()

.timer(Duration.ofMillis(50));
} else {
timer = Completable.completed();
}

return timer.concat(succeeded(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()));
});

// Share the limiter.
CapacityLimiter limiter = CapacityLimiters.fixedCapacity(1).build();
final TrafficResilienceHttpClientFilter resilienceHttpClientFilter =
new TrafficResilienceHttpClientFilter.Builder(() -> limiter).build();

final int numClients = 60;
final CountDownLatch finishedLatch = new CountDownLatch(numClients);
final CountDownLatch startRequestsLatch = new CountDownLatch(1);
final AtomicBoolean finished = new AtomicBoolean();
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicInteger consecutiveFailures = new AtomicInteger();
final int failureLimit = 600;
final Output output = new Output();
for (int i = 0; i < numClients; i++) {
final int ii = i;
executor.execute(() -> {
try {
System.out.println("Creating new client " + ii);
try (BlockingGreeterClient client = GrpcClients.forAddress("localhost", 8080)
.defaultTimeout(Duration.ofMillis(50))
.initializeHttp(http -> {
http.appendConnectionFilter(
new JavaNetSoTimeoutHttpConnectionFilter(Duration.ofMillis(50)));
http.appendClientFilter(resilienceHttpClientFilter);
})
.buildBlocking(new ClientFactory())) {
startRequestsLatch.await();
while (!finished.get()) {
try {
HelloReply reply = client.sayHello(HelloRequest.newBuilder().setName("World").build());
reply.getMessage();
output.success();
consecutiveFailures.set(0);
} catch (Exception ex) {
output.failed();
if (consecutiveFailures.incrementAndGet() >= failureLimit && finished.compareAndSet(false, true)) {
System.gc(); // hopefully this will hit the finalizers and emit log statements.
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
// noop
}
System.gc();
System.out.printf("\nConsecutive failure threshold reached (%d). Terminating.\n", failureLimit);
}
}
Thread.sleep(100);
}
finishedLatch.countDown();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {

}
});
Thread.sleep(100); // give our clients time to startup and emit their config.
}

System.out.println("Starting main loop.");
startRequestsLatch.countDown();
finishedLatch.await();
serverContext.close();
executor.shutdown();
System.out.println("Terminating.");
}

private static class Output {
private long counter;

private final String clear = "\r \r";

void success() {
emit('.');
}

void failed() {
emit('!');
}

private synchronized void emit(char result) {
if (newline()) {
System.out.print(clear);
}
System.out.print(result);
}

private boolean newline() {
return ++counter % 100 == 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<Logger name="io.servicetalk.http.netty.H2ServerParentConnectionContext" level="DEBUG"/>

<!-- Prints default subscriber errors-->
<Logger name="io.servicetalk.concurrent.api" level="DEBUG"/>
<Logger name="io.servicetalk.concurrent.api" level="INFO"/>

<!-- Use `-Dservicetalk.logger.level=DEBUG` to change the root logger level via command line -->
<Root level="${sys:servicetalk.logger.level:-INFO}">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.http.api.DefaultHttpLoadBalancerFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.DelegatingHttpExecutionContext;
Expand All @@ -48,10 +50,14 @@
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpConnectionFilter;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.netty.ReservableRequestConcurrencyControllers.InternalRetryingHttpClientFilter;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;
import io.servicetalk.http.utils.HostHeaderHttpRequesterFilter;
import io.servicetalk.http.utils.IdleTimeoutConnectionFilter;
import io.servicetalk.loadbalancer.RoundRobinLoadBalancers;
Expand Down Expand Up @@ -142,7 +148,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
strategyComputation = new ClientStrategyInfluencerChainBuilder();
this.loadBalancerFactory = defaultLoadBalancer();
this.serviceDiscoverer = new CastedServiceDiscoverer<>(serviceDiscoverer);
clientFilterFactory = appendFilter(clientFilterFactory, HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER);
clientFilterFactory = appendFilter(appendFilter(clientFilterFactory, new LeakFilter("first")), HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER);
}

private DefaultSingleAddressHttpClientBuilder(final U address,
Expand Down Expand Up @@ -265,12 +271,30 @@ public HttpExecutionStrategy executionStrategy() {

StreamingHttpConnectionFilterFactory connectionFilterFactory =
ctx.builder.addIdleTimeoutConnectionFilter ?
appendConnectionFilter(ctx.builder.connectionFilterFactory, DEFAULT_IDLE_TIMEOUT_FILTER) :
appendStreamingHttpConnectionFilterFactory(ctx.builder.connectionFilterFactory, DEFAULT_IDLE_TIMEOUT_FILTER) :
ctx.builder.connectionFilterFactory;

connectionFilterFactory = appendConnectionFilter(connectionFilterFactory,
connectionFilterFactory = appendStreamingHttpConnectionFilterFactory(connectionFilterFactory,
HttpMessageDiscardWatchdogClientFilter.INSTANCE);

connectionFilterFactory = appendStreamingHttpConnectionFilterFactory(connectionFilterFactory, new StreamingHttpConnectionFilterFactory() {
private final LeakFilter leakFilter = new LeakFilter("connection-filter");
@Override
public StreamingHttpConnectionFilter create(FilterableStreamingHttpConnection connection) {
return new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
return leakFilter.doRequest(connection, request);
}
};
}

@Override
public String toString() {
return "ConnectionLeakFilter";
}
});

if (roConfig.isH2PriorKnowledge() &&
// Direct connection or HTTP proxy
(!roConfig.hasProxy() || sslContext == null)) {
Expand Down Expand Up @@ -329,6 +353,10 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(

// Internal retries must be one of the last filters in the chain.
currClientFilterFactory = appendFilter(currClientFilterFactory, InternalRetryingHttpClientFilter.INSTANCE);

// See if we're leaking here.
currClientFilterFactory = appendFilter(currClientFilterFactory, new LeakFilter("end"));

FilterableStreamingHttpClient wrappedClient =
currClientFilterFactory.create(lbClient, lb.eventStream(), ctx.sdStatus);

Expand All @@ -352,9 +380,75 @@ connectionFilterFactory, new AlpnReqRespFactoryFunc(
}
}

private static class LeakFilter implements StreamingHttpClientFilterFactory {

private final String name;

LeakFilter(final String name) {
this.name = name;
}

Single<StreamingHttpResponse> doRequest(StreamingHttpRequester delegate, StreamingHttpRequest request) {
return delegate.request(request)
.liftSync(new BeforeFinallyHttpOperator(new Tracker("liftSync(..)")))
.map(response -> {
// Also a more fine graned version
Tracker tracker = new Tracker("response.map(..)");
return response.transformMessageBody(body -> body.whenOnSubscribe(subscription -> {
tracker.onComplete();
}));
});
}

@Override
public StreamingHttpClientFilter create(FilterableStreamingHttpClient client) {
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(StreamingHttpRequester delegate, StreamingHttpRequest request) {
return doRequest(delegate, request);
}
};
}

private final class Tracker implements TerminalSignalConsumer {

private final String position;
private volatile boolean finished;

Tracker(final String position) {
this.position = position;
}

@Override
public void onComplete() {
finished = true;
}

@Override
public void onError(Throwable throwable) {
finished = true;
}

@Override
public void cancel() {
finished = true;
}

@Override
protected void finalize() throws Throwable {
if (!finished) {
LOGGER.error("{} - {}: Leaked tracker detected", name, position);
finished = true;
}
super.finalize();
}
}
}

private static <R> ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> appendConnectionFilter(
final ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> first,
final ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> second) {
System.out.println("Appending connection factory filter: " + second);
// For now this delegates to the deprecated method but needs to be fixed up once the deprecated append
// method is being removed.
return first.append(second);
Expand Down Expand Up @@ -403,6 +497,7 @@ private static <U, R> String targetResource(final HttpClientBuildContext<U, R> c
private static ContextAwareStreamingHttpClientFilterFactory appendFilter(
@Nullable final ContextAwareStreamingHttpClientFilterFactory currClientFilterFactory,
final StreamingHttpClientFilterFactory appendClientFilterFactory) {
System.out.println("Appending client filter factory: " + appendClientFilterFactory);
if (appendClientFilterFactory instanceof RetryingHttpRequesterFilter) {
if (currClientFilterFactory == null) {
return (client, lbEventStream, sdStatus) -> {
Expand Down Expand Up @@ -508,7 +603,7 @@ public DefaultSingleAddressHttpClientBuilder<U, R> protocols(final HttpProtocolC
public DefaultSingleAddressHttpClientBuilder<U, R> appendConnectionFilter(
final StreamingHttpConnectionFilterFactory factory) {
requireNonNull(factory);
connectionFilterFactory = appendConnectionFilter(connectionFilterFactory, factory);
connectionFilterFactory = appendStreamingHttpConnectionFilterFactory(connectionFilterFactory, factory);
strategyComputation.add(factory);
checkIfHostHeaderHttpRequesterFilter(factory);
checkIfIdleTimeoutConnectionFilter(factory);
Expand Down Expand Up @@ -537,9 +632,10 @@ private void checkIfIdleTimeoutConnectionFilter(final StreamingHttpConnectionFil
}

// Use another method to keep final references and avoid StackOverflowError
private static StreamingHttpConnectionFilterFactory appendConnectionFilter(
private static StreamingHttpConnectionFilterFactory appendStreamingHttpConnectionFilterFactory(
@Nullable final StreamingHttpConnectionFilterFactory current,
final StreamingHttpConnectionFilterFactory next) {
System.out.println("Appending StreamingHttpConnectionFilterFactory: " + next);
return current == null ? next : connection -> current.create(next.create(connection));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request)
// If a previous message exists, the Single<StreamingHttpResponse> got resubscribed to
// (i.e. during a retry) and so previous message body needs to be cleaned up by the
// user.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
LOGGER.warn("non-cleaner: Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Response payload (message) body must " +
"be fully consumed before retrying.");
Expand Down Expand Up @@ -112,17 +112,31 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
if (maybePublisher != null && maybePublisher.getAndSet(null) != null) {
// No-one subscribed to the message (or there is none), so if there is a message
// tell the user to clean it up.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
// TODO: in this filter, this is the pathway that is failing with a TimeoutException
LOGGER.warn("cleaner: Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Response payload (message) body must " +
"be fully consumed before discarding.");
"be fully consumed before discarding. cause: {}", toStr(cause));
LOGGER.warn("cleaner stack trace", new Exception());
}
return Single.<StreamingHttpResponse>failed(cause).shareContextOnSubscribe();
});
}
};
}

String toStr(Throwable t) {
StringBuilder sb = new StringBuilder();
sb.append(t.getMessage())
.append('\n')
.append(t.getClass().getSimpleName())
.append('\n');
for (Object s : t.getStackTrace()) {
sb.append(s.toString()).append('\n');
}
return sb.toString();
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void onExchangeFinally() {
if (maybePublisher != null && maybePublisher.get() != null) {
// No-one subscribed to the message (or there is none), so if there is a message
// tell the user to clean it up.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
LOGGER.trace("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Responses (or their message body) must " +
"be fully consumed before discarding.");
Expand Down
Loading