Skip to content
4 changes: 4 additions & 0 deletions servicetalk-http-netty/gradle/spotbugs/test-exclusions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@
<Class name="io.servicetalk.http.netty.HttpClientResolvesOnNewConnectionTest$FailureCase$1"/>
<Bug pattern="NP_NONNULL_RETURN_VIOLATION"/>
</Match>
<Match>
<Class name="io.servicetalk.http.netty.WatchdogLeakDetectorTest"/>
<Bug pattern="DM_GC"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -35,67 +35,39 @@

import java.util.concurrent.atomic.AtomicReference;

import static io.servicetalk.http.netty.HttpMessageDiscardWatchdogServiceFilter.generifyAtomicReference;

/**
* Filter which tracks message bodies and warns if they are not discarded properly.
*/
final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory {
final class HttpMessageDiscardWatchdogClientFilter {

private static final ContextMap.Key<AtomicReference<Publisher<?>>> MESSAGE_PUBLISHER_KEY = ContextMap.Key
.newKey(HttpMessageDiscardWatchdogClientFilter.class.getName() + ".messagePublisher",
generifyAtomicReference());
WatchdogLeakDetector.generifyAtomicReference());

private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogClientFilter.class);

/**
* Instance of {@link HttpMessageDiscardWatchdogClientFilter}.
*/
static final HttpMessageDiscardWatchdogClientFilter INSTANCE = new HttpMessageDiscardWatchdogClientFilter();
static final StreamingHttpConnectionFilterFactory INSTANCE;

/**
* Instance of {@link StreamingHttpClientFilterFactory} with the cleaner implementation.
*/
static final StreamingHttpClientFilterFactory CLIENT_CLEANER = new CleanerStreamingHttpClientFilterFactory();

private HttpMessageDiscardWatchdogClientFilter() {
// Singleton
}
static final StreamingHttpClientFilterFactory CLIENT_CLEANER;

@Override
public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) {
return new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate().request(request).map(response -> {
// always write the buffer publisher into the request context. When a downstream subscriber
// arrives, mark the message as subscribed explicitly (having a message present and no
// subscription is an indicator that it must be freed later on).
final AtomicReference<Publisher<?>> reference = request.context()
.computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>());
assert reference != null;
if (reference.getAndSet(response.messageBody()) != null) {
// If a previous message exists, the Single<StreamingHttpResponse> got resubscribed to
// (i.e. during a retry) and so previous message body needs to be cleaned up by the
// user.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Response payload (message) body must " +
"be fully consumed before retrying. connectionInfo={}", connectionContext());
}

return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
reference.set(null);
return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE;
}));
});
}
};
static {
if (WatchdogLeakDetector.strictDetection()) {
INSTANCE = new GcHttpMessageDiscardWatchdogClientFilter();
CLIENT_CLEANER = new NoopCleaner();
} else {
INSTANCE = new ContextHttpMessageDiscardWatchdogClientFilter();
CLIENT_CLEANER = new CleanerStreamingHttpClientFilterFactory();
}
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
private HttpMessageDiscardWatchdogClientFilter() {
// No instances
}

private static final class CleanerStreamingHttpClientFilterFactory implements StreamingHttpClientFilterFactory {
Expand Down Expand Up @@ -128,4 +100,92 @@ public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}
}

private static final class ContextHttpMessageDiscardWatchdogClientFilter
implements StreamingHttpConnectionFilterFactory {

@Override
public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) {
return new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate().request(request).map(response -> {
// always write the buffer publisher into the request context. When a downstream subscriber
// arrives, mark the message as subscribed explicitly (having a message present and no
// subscription is an indicator that it must be freed later on).
final AtomicReference<Publisher<?>> reference = request.context()
.computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>());
assert reference != null;
if (reference.getAndSet(response.messageBody()) != null) {
// If a previous message exists, the Single<StreamingHttpResponse> got resubscribed to
// (i.e. during a retry) and so previous message body needs to be cleaned up by the
// user.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Response payload (message) body must " +
"be fully consumed before retrying. connectionInfo={}", connectionContext());
}

return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
reference.set(null);
return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE;
}));
});
}
};
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}
}

private static final class GcHttpMessageDiscardWatchdogClientFilter
implements StreamingHttpConnectionFilterFactory {

@Override
public StreamingHttpConnectionFilter create(FilterableStreamingHttpConnection connection) {
return new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) {
return delegate().request(request.transformMessageBody(publisher ->
WatchdogLeakDetector.gcLeakDetection(publisher, this::onRequestLeak)))
.map(response -> response.transformMessageBody(publisher ->
WatchdogLeakDetector.gcLeakDetection(publisher, this::onResponseLeak)));
}

private void onRequestLeak() {
LOGGER.warn("Discovered un-drained HTTP request message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. The request payload (message) body must " +
"be fully consumed. connectionInfo={}", connectionContext());
}

private void onResponseLeak() {
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Response payload (message) body must " +
"be fully consumed before retrying. connectionInfo={}", connectionContext());
}
};
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}
}

private static final class NoopCleaner implements StreamingHttpClientFilterFactory {
@Override
public StreamingHttpClientFilter create(FilterableStreamingHttpClient client) {
return new StreamingHttpClientFilter(client) { };
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return HttpExecutionStrategies.offloadNone();
}
}
}
Loading
Loading