Skip to content

Commit

Permalink
Align HttpRequester/StreamingHttpRequester with blocking variants (
Browse files Browse the repository at this point in the history
…#3201)

Motivation:

`BlockingHttpRequester` and `BlockingStreamingHttpRequester` implement
`GracefulAutoCloseable` interface and in result can be used in
try-with-resources. For some reason, our `HttpRequester` and
`StreamingHttpRequester` don't but their client and connection
interfaces do.

Modifications:

- Promote `GracefulAutoCloseable` interface from `HttpClient`
and `HttpConnection` to `HttpRequester`.
- Promote `GracefulAutoCloseable` interface from `StreamingHttpClient`
and `StreamingHttpConnection` to `StreamingHttpRequester`.

Result:

All requester/client/connection interfaces are consistent across all 4
API variants.
  • Loading branch information
idelpivnitskiy authored Mar 7, 2025
1 parent 7bf74af commit 8fa5aab
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.GracefulAutoCloseable;
import io.servicetalk.concurrent.api.Single;

import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination;

/**
* Provides a means to issue requests against HTTP service. The implementation is free to maintain a collection of
* {@link HttpConnection} instances and distribute calls to {@link #request(HttpRequest)} amongst this collection.
*/
public interface HttpClient extends HttpRequester, GracefulAutoCloseable {
public interface HttpClient extends HttpRequester {
/**
* Reserve an {@link HttpConnection} based on provided {@link HttpRequestMetaData}.
* <p>
Expand Down Expand Up @@ -63,14 +60,4 @@ default BlockingStreamingHttpClient asBlockingStreamingClient() {
default BlockingHttpClient asBlockingClient() {
return asStreamingClient().asBlockingClient();
}

@Override
default void close() throws Exception {
awaitTermination(closeAsync().toFuture());
}

@Override
default void closeGracefully() throws Exception {
awaitTermination(closeAsyncGracefully().toFuture());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.GracefulAutoCloseable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;

import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination;

/**
* Represents a single fixed connection to a HTTP server.
*/
public interface HttpConnection extends HttpRequester, GracefulAutoCloseable {
public interface HttpConnection extends HttpRequester {
/**
* Get the {@link HttpConnectionContext}.
*
Expand Down Expand Up @@ -69,14 +66,4 @@ default BlockingStreamingHttpConnection asBlockingStreamingConnection() {
default BlockingHttpConnection asBlockingConnection() {
return asStreamingConnection().asBlockingConnection();
}

@Override
default void close() throws Exception {
awaitTermination(closeAsync().toFuture());
}

@Override
default void closeGracefully() throws Exception {
awaitTermination(closeAsyncGracefully().toFuture());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.GracefulAutoCloseable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;

import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination;

/**
* Provides a means to make a HTTP request.
*/
public interface HttpRequester extends HttpRequestFactory, ListenableAsyncCloseable {
public interface HttpRequester extends HttpRequestFactory, ListenableAsyncCloseable, GracefulAutoCloseable {
/**
* Send a {@code request}.
*
Expand All @@ -46,4 +49,14 @@ public interface HttpRequester extends HttpRequestFactory, ListenableAsyncClosea
* @return a {@link HttpResponseFactory}.
*/
HttpResponseFactory httpResponseFactory();

@Override
default void close() throws Exception {
awaitTermination(closeAsync().toFuture());
}

@Override
default void closeGracefully() throws Exception {
awaitTermination(closeAsyncGracefully().toFuture());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.GracefulAutoCloseable;
import io.servicetalk.concurrent.api.Single;

import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination;

/**
* The equivalent of {@link HttpClient} but that accepts {@link StreamingHttpRequest} and returns
* {@link StreamingHttpResponse}.
*/
public interface StreamingHttpClient extends FilterableStreamingHttpClient, GracefulAutoCloseable {
public interface StreamingHttpClient extends FilterableStreamingHttpClient {
/**
* Reserve a {@link StreamingHttpConnection} based on provided {@link HttpRequestMetaData}.
* <p>
Expand Down Expand Up @@ -66,14 +63,4 @@ public interface StreamingHttpClient extends FilterableStreamingHttpClient, Grac
* @return a {@link BlockingHttpClient} representation of this {@link StreamingHttpClient}.
*/
BlockingHttpClient asBlockingClient();

@Override
default void close() throws Exception {
awaitTermination(closeAsync().toFuture());
}

@Override
default void closeGracefully() throws Exception {
awaitTermination(closeAsyncGracefully().toFuture());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.GracefulAutoCloseable;

import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination;

/**
* The equivalent of {@link HttpConnection} but that accepts {@link StreamingHttpRequest} and returns
* {@link StreamingHttpResponse}.
*/
public interface StreamingHttpConnection extends FilterableStreamingHttpConnection, GracefulAutoCloseable {
public interface StreamingHttpConnection extends FilterableStreamingHttpConnection {
/**
* Convert this {@link StreamingHttpConnection} to the {@link HttpConnection} API.
* <p>
Expand All @@ -50,14 +46,4 @@ public interface StreamingHttpConnection extends FilterableStreamingHttpConnecti
* @return a {@link BlockingHttpConnection} representation of this {@link StreamingHttpConnection}.
*/
BlockingHttpConnection asBlockingConnection();

@Override
default void close() throws Exception {
awaitTermination(closeAsync().toFuture());
}

@Override
default void closeGracefully() throws Exception {
awaitTermination(closeAsyncGracefully().toFuture());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.concurrent.GracefulAutoCloseable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;

import static io.servicetalk.concurrent.internal.FutureUtils.awaitTermination;

/**
* The equivalent of {@link HttpRequester} but that accepts {@link StreamingHttpRequest} and returns
* {@link StreamingHttpResponse}.
*/
public interface StreamingHttpRequester extends StreamingHttpRequestFactory, ListenableAsyncCloseable {
public interface StreamingHttpRequester extends StreamingHttpRequestFactory, ListenableAsyncCloseable,
GracefulAutoCloseable {
/**
* Send a {@code request}.
*
Expand All @@ -47,4 +51,14 @@ public interface StreamingHttpRequester extends StreamingHttpRequestFactory, Lis
* @return a {@link StreamingHttpResponseFactory}.
*/
StreamingHttpResponseFactory httpResponseFactory();

@Override
default void close() throws Exception {
awaitTermination(closeAsync().toFuture());
}

@Override
default void closeGracefully() throws Exception {
awaitTermination(closeAsyncGracefully().toFuture());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.servicetalk.transport.api.ConnectionObserver.StreamObserver;
import io.servicetalk.transport.api.IoThreadFactory;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
Expand Down Expand Up @@ -86,7 +87,6 @@
import static io.servicetalk.http.netty.AbstractStreamingHttpConnection.ZERO_MAX_CONCURRENCY_EVENT;
import static io.servicetalk.http.netty.HeaderUtils.OBJ_EXPECT_CONTINUE;
import static io.servicetalk.http.netty.HttpDebugUtils.showPipeline;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.close;
import static io.servicetalk.transport.netty.internal.ChannelSet.CHANNEL_CLOSEABLE_KEY;
import static io.servicetalk.transport.netty.internal.CloseHandler.forNonPipelined;
import static io.servicetalk.transport.netty.internal.NettyPipelineSslUtils.extractSslSession;
Expand Down Expand Up @@ -145,14 +145,14 @@ protected void handleSubscribe(final Subscriber<? super H2ClientParentConnection
delayedCancellable, shouldWaitForSslHandshake(sslSession, sslConfig),
allowDropTrailersReadFromTransport, config.headersFactory(), reqRespFactory, observer);
} catch (Throwable cause) {
close(channel, cause);
ChannelCloseUtils.close(channel, cause);
deliverErrorFromSource(subscriber, cause);
return;
}
try {
subscriber.onSubscribe(delayedCancellable);
} catch (Throwable cause) {
close(channel, cause);
ChannelCloseUtils.close(channel, cause);
handleExceptionFromOnSubscribe(subscriber, cause);
return;
}
Expand Down Expand Up @@ -215,7 +215,7 @@ void tryCompleteSubscriber() {
@Override
boolean tryFailSubscriber(Throwable cause) {
if (subscriber != null) {
close(parentContext.nettyChannel(), cause);
ChannelCloseUtils.close(parentContext.nettyChannel(), cause);
Subscriber<? super H2ClientParentConnection> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onError(cause);
Expand Down Expand Up @@ -391,7 +391,7 @@ private void childChannelActive(Future<Http2StreamChannel> future,
} catch (Throwable cause) {
if (streamChannel != null) {
try {
close(streamChannel, cause);
ChannelCloseUtils.close(streamChannel, cause);
} catch (Throwable unexpected) {
addSuppressed(unexpected, cause);
LOGGER.warn("Unexpected exception while handling the original cause", unexpected);
Expand Down

0 comments on commit 8fa5aab

Please sign in to comment.