diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpClient.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpClient.java index b9e0b3bb78..9a38fe58aa 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpClient.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpClient.java @@ -15,16 +15,13 @@ */ package io.servicetalk.http.api; -import io.servicetalk.concurrent.GracefulAutoCloseable; import io.servicetalk.concurrent.api.Single; -import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination; - /** * Provides a means to issue requests against HTTP service. The implementation is free to maintain a collection of * {@link HttpConnection} instances and distribute calls to {@link #request(HttpRequest)} amongst this collection. */ -public interface HttpClient extends HttpRequester, GracefulAutoCloseable { +public interface HttpClient extends HttpRequester { /** * Reserve an {@link HttpConnection} based on provided {@link HttpRequestMetaData}. *
@@ -63,14 +60,4 @@ default BlockingStreamingHttpClient asBlockingStreamingClient() { default BlockingHttpClient asBlockingClient() { return asStreamingClient().asBlockingClient(); } - - @Override - default void close() throws Exception { - awaitTermination(closeAsync().toFuture()); - } - - @Override - default void closeGracefully() throws Exception { - awaitTermination(closeAsyncGracefully().toFuture()); - } } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpConnection.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpConnection.java index d249122c6e..de98a5f0f9 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpConnection.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpConnection.java @@ -15,16 +15,13 @@ */ package io.servicetalk.http.api; -import io.servicetalk.concurrent.GracefulAutoCloseable; import io.servicetalk.concurrent.PublisherSource; import io.servicetalk.concurrent.api.Publisher; -import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination; - /** * Represents a single fixed connection to a HTTP server. */ -public interface HttpConnection extends HttpRequester, GracefulAutoCloseable { +public interface HttpConnection extends HttpRequester { /** * Get the {@link HttpConnectionContext}. * @@ -69,14 +66,4 @@ default BlockingStreamingHttpConnection asBlockingStreamingConnection() { default BlockingHttpConnection asBlockingConnection() { return asStreamingConnection().asBlockingConnection(); } - - @Override - default void close() throws Exception { - awaitTermination(closeAsync().toFuture()); - } - - @Override - default void closeGracefully() throws Exception { - awaitTermination(closeAsyncGracefully().toFuture()); - } } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpRequester.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpRequester.java index b27d323865..34a47594de 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpRequester.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpRequester.java @@ -15,13 +15,16 @@ */ package io.servicetalk.http.api; +import io.servicetalk.concurrent.GracefulAutoCloseable; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Single; +import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination; + /** * Provides a means to make a HTTP request. */ -public interface HttpRequester extends HttpRequestFactory, ListenableAsyncCloseable { +public interface HttpRequester extends HttpRequestFactory, ListenableAsyncCloseable, GracefulAutoCloseable { /** * Send a {@code request}. * @@ -46,4 +49,14 @@ public interface HttpRequester extends HttpRequestFactory, ListenableAsyncClosea * @return a {@link HttpResponseFactory}. */ HttpResponseFactory httpResponseFactory(); + + @Override + default void close() throws Exception { + awaitTermination(closeAsync().toFuture()); + } + + @Override + default void closeGracefully() throws Exception { + awaitTermination(closeAsyncGracefully().toFuture()); + } } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClient.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClient.java index 0bbead3d26..490973d2b1 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClient.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClient.java @@ -15,16 +15,13 @@ */ package io.servicetalk.http.api; -import io.servicetalk.concurrent.GracefulAutoCloseable; import io.servicetalk.concurrent.api.Single; -import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination; - /** * The equivalent of {@link HttpClient} but that accepts {@link StreamingHttpRequest} and returns * {@link StreamingHttpResponse}. */ -public interface StreamingHttpClient extends FilterableStreamingHttpClient, GracefulAutoCloseable { +public interface StreamingHttpClient extends FilterableStreamingHttpClient { /** * Reserve a {@link StreamingHttpConnection} based on provided {@link HttpRequestMetaData}. *
@@ -66,14 +63,4 @@ public interface StreamingHttpClient extends FilterableStreamingHttpClient, Grac * @return a {@link BlockingHttpClient} representation of this {@link StreamingHttpClient}. */ BlockingHttpClient asBlockingClient(); - - @Override - default void close() throws Exception { - awaitTermination(closeAsync().toFuture()); - } - - @Override - default void closeGracefully() throws Exception { - awaitTermination(closeAsyncGracefully().toFuture()); - } } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnection.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnection.java index bc019b2c28..ba9f87238f 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnection.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnection.java @@ -15,15 +15,11 @@ */ package io.servicetalk.http.api; -import io.servicetalk.concurrent.GracefulAutoCloseable; - -import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination; - /** * The equivalent of {@link HttpConnection} but that accepts {@link StreamingHttpRequest} and returns * {@link StreamingHttpResponse}. */ -public interface StreamingHttpConnection extends FilterableStreamingHttpConnection, GracefulAutoCloseable { +public interface StreamingHttpConnection extends FilterableStreamingHttpConnection { /** * Convert this {@link StreamingHttpConnection} to the {@link HttpConnection} API. *
@@ -50,14 +46,4 @@ public interface StreamingHttpConnection extends FilterableStreamingHttpConnecti
* @return a {@link BlockingHttpConnection} representation of this {@link StreamingHttpConnection}.
*/
BlockingHttpConnection asBlockingConnection();
-
- @Override
- default void close() throws Exception {
- awaitTermination(closeAsync().toFuture());
- }
-
- @Override
- default void closeGracefully() throws Exception {
- awaitTermination(closeAsyncGracefully().toFuture());
- }
}
diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequester.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequester.java
index 1fbafdfaf0..02c52abee3 100644
--- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequester.java
+++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpRequester.java
@@ -15,14 +15,18 @@
*/
package io.servicetalk.http.api;
+import io.servicetalk.concurrent.GracefulAutoCloseable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
+import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination;
+
/**
* The equivalent of {@link HttpRequester} but that accepts {@link StreamingHttpRequest} and returns
* {@link StreamingHttpResponse}.
*/
-public interface StreamingHttpRequester extends StreamingHttpRequestFactory, ListenableAsyncCloseable {
+public interface StreamingHttpRequester extends StreamingHttpRequestFactory, ListenableAsyncCloseable,
+ GracefulAutoCloseable {
/**
* Send a {@code request}.
*
@@ -47,4 +51,14 @@ public interface StreamingHttpRequester extends StreamingHttpRequestFactory, Lis
* @return a {@link StreamingHttpResponseFactory}.
*/
StreamingHttpResponseFactory httpResponseFactory();
+
+ @Override
+ default void close() throws Exception {
+ awaitTermination(closeAsync().toFuture());
+ }
+
+ @Override
+ default void closeGracefully() throws Exception {
+ awaitTermination(closeAsyncGracefully().toFuture());
+ }
}
diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java
index d6973c9cf3..c6f06434d5 100644
--- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java
+++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java
@@ -45,6 +45,7 @@
import io.servicetalk.transport.api.ConnectionObserver.StreamObserver;
import io.servicetalk.transport.api.IoThreadFactory;
import io.servicetalk.transport.api.SslConfig;
+import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
@@ -86,7 +87,6 @@
import static io.servicetalk.http.netty.AbstractStreamingHttpConnection.ZERO_MAX_CONCURRENCY_EVENT;
import static io.servicetalk.http.netty.HeaderUtils.OBJ_EXPECT_CONTINUE;
import static io.servicetalk.http.netty.HttpDebugUtils.showPipeline;
-import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.close;
import static io.servicetalk.transport.netty.internal.ChannelSet.CHANNEL_CLOSEABLE_KEY;
import static io.servicetalk.transport.netty.internal.CloseHandler.forNonPipelined;
import static io.servicetalk.transport.netty.internal.NettyPipelineSslUtils.extractSslSession;
@@ -145,14 +145,14 @@ protected void handleSubscribe(final Subscriber super H2ClientParentConnection
delayedCancellable, shouldWaitForSslHandshake(sslSession, sslConfig),
allowDropTrailersReadFromTransport, config.headersFactory(), reqRespFactory, observer);
} catch (Throwable cause) {
- close(channel, cause);
+ ChannelCloseUtils.close(channel, cause);
deliverErrorFromSource(subscriber, cause);
return;
}
try {
subscriber.onSubscribe(delayedCancellable);
} catch (Throwable cause) {
- close(channel, cause);
+ ChannelCloseUtils.close(channel, cause);
handleExceptionFromOnSubscribe(subscriber, cause);
return;
}
@@ -215,7 +215,7 @@ void tryCompleteSubscriber() {
@Override
boolean tryFailSubscriber(Throwable cause) {
if (subscriber != null) {
- close(parentContext.nettyChannel(), cause);
+ ChannelCloseUtils.close(parentContext.nettyChannel(), cause);
Subscriber super H2ClientParentConnection> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
@@ -391,7 +391,7 @@ private void childChannelActive(Future