Skip to content

Commit

Permalink
MultiAddressClientBuilder execution strategy control (#2166)
Browse files Browse the repository at this point in the history
Motivation:
The MultiAddressClientBuilder does not currently always respect
overrides of execution strategy made by the single address
initializer
Modification:
The computed execution strategy is a combination of the multiaddress
client builder strategy, the API strategy and the client strategy.
Result:
More predictable and controllable behavior for MultiAddressClientBuilder

Co-authored-by: Idel Pivnitskiy <[email protected]>
  • Loading branch information
bondolo and idelpivnitskiy authored Jun 15, 2022
1 parent f34e21a commit 0f1ac15
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,42 @@ default SingleAddressInitializer<U, R> append(SingleAddressInitializer<U, R> toA
@Override
MultiAddressHttpClientBuilder<U, R> executor(Executor executor);

/**
* {@inheritDoc}
*
* <p>Provides the base execution strategy for all clients created from this builder and the default strategy for
* the {@link SingleAddressHttpClientBuilder} used to construct client instances. The
* {@link #initializer(SingleAddressInitializer)} may be used for some customization of the execution strategy for a
* specific single address client instance, but may not reduce the offloading to be performed. Specifically, the
* initializer may introduce additional offloading via
* {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)} and may add filters which
* influence the computed execution strategy.
*
* <p>Specifying an execution strategy will affect the offloading used during the execution of client requests:
*
* <dl>
* <dt>Unspecified or {@link HttpExecutionStrategies#defaultStrategy()}
* <dd>The resulting client instances will use the default safe strategy for each API variant and
* {@link SingleAddressHttpClientBuilder} instances generated will also have default strategy.
*
* <dt>{@link HttpExecutionStrategies#offloadNone()}
* (or deprecated {@link HttpExecutionStrategies#offloadNever()})
* <dd>{@link SingleAddressHttpClientBuilder} instances created by the client will have a strategy of
* {@link HttpExecutionStrategies#offloadNone()}. {@link HttpExecutionStrategies#offloadNone()} execution
* strategy requires that filters and asynchronous callbacks
* <strong style="text-transform: uppercase;">must not</strong> ever block during the execution of client
* requests. An {@link #initializer(SingleAddressInitializer) initializer} may override to add offloads using
* {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)}.
*
* <dt>A custom execution strategy ({@link HttpExecutionStrategies#customStrategyBuilder()}) or
* {@link HttpExecutionStrategies#offloadAll()}
* <dd>{@link SingleAddressHttpClientBuilder} instances created by the client will start with the provided
* strategy and may add additional offloading as required by added filters.
* </dl>
*
* @param strategy {@inheritDoc}
* @return {@inheritDoc}
*/
@Override
MultiAddressHttpClientBuilder<U, R> executionStrategy(HttpExecutionStrategy strategy);

Expand All @@ -90,7 +126,10 @@ default MultiAddressHttpClientBuilder<U, R> headersFactory(HttpHeadersFactory he
/**
* Set a function which can customize options for each {@link StreamingHttpClient} that is built.
* @param initializer Initializes the {@link SingleAddressHttpClientBuilder} used to build new
* {@link StreamingHttpClient}s.
* {@link StreamingHttpClient}s. See {@link #executionStrategy(HttpExecutionStrategy)} for discussion of
* restrictions on the use of {@link SingleAddressHttpClientBuilder#executionStrategy(HttpExecutionStrategy)}
* within an initializer.
*
* @return {@code this}
*/
MultiAddressHttpClientBuilder<U, R> initializer(SingleAddressInitializer<U, R> initializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,42 @@ SingleAddressHttpClientBuilder<U, R> appendConnectionFilter(
@Override
SingleAddressHttpClientBuilder<U, R> executor(Executor executor);

/**
* {@inheritDoc}
*
* <p>Specifying an execution strategy affects the offloading used during execution of client requests:
*
* <dl>
* <dt>Unspecified or {@link HttpExecutionStrategies#defaultStrategy()}
* <dd>Execution of client requests will use a safe (non-blocking) execution strategy appropriate for the
* client API used and the filters added. Blocking is always safe as all potentially blocking paths are
* offloaded. Each client API variant (async/blocking streaming/aggregate) requires a specific execution
* strategy to avoid blocking the event-loop and filters added via
* {@link #appendClientFilter(StreamingHttpClientFilterFactory)},
* {@link #appendConnectionFilter(StreamingHttpConnectionFilterFactory)}, or
* {@link #appendConnectionFactoryFilter(ConnectionFactoryFilter)}, etc. may also require offloading.
* The execution strategy for execution of client requests will be computed based on the client API in use and
* {@link HttpExecutionStrategyInfluencer#requiredOffloads()} of added the filters.
*
* <dt>{@link HttpExecutionStrategies#offloadNone()}
* (or deprecated {@link HttpExecutionStrategies#offloadNever()})
* <dd>No offloading will be used during execution of client requests regardless of the client API used or the
* influence of added filters. Filters and asynchronous callbacks
* <strong style="text-transform: uppercase;">must not</strong> ever block during the execution of client
* requests.
*
* <dt>A custom execution strategy ({@link HttpExecutionStrategies#customStrategyBuilder()}) or
* {@link HttpExecutionStrategies#offloadAll()}
* <dd>The specified execution strategy will be used for executing client requests rather than the client
* API's default safe strategy. Like with the default strategy, the actual execution strategy used is computed
* from the provided strategy and the execution strategies required by added filters. Filters and asynchronous
* callbacks <strong style="text-transform: uppercase;">MAY</strong> only block during the offloaded portions of
* the client request execution.
* </dl>
*
* @param strategy {@inheritDoc}
* @return {@inheritDoc}
*/
@Override
SingleAddressHttpClientBuilder<U, R> executionStrategy(HttpExecutionStrategy strategy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ final class StreamingHttpClientToBlockingHttpClient implements BlockingHttpClien
private final HttpRequestResponseFactory reqRespFactory;

StreamingHttpClientToBlockingHttpClient(final StreamingHttpClient client, final HttpExecutionStrategy strategy) {
this.strategy = defaultStrategy() == strategy ?
DEFAULT_BLOCKING_CONNECTION_STRATEGY : strategy;
this.strategy = defaultStrategy() == strategy ? DEFAULT_BLOCKING_CONNECTION_STRATEGY : strategy;
this.client = client;
context = new DelegatingHttpExecutionContext(client.executionContext()) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.RequestResponseFactories.toAggregated;
import static io.servicetalk.http.api.StreamingHttpConnectionToHttpConnection.DEFAULT_CONNECTION_STRATEGY;
import static io.servicetalk.http.api.StreamingHttpConnectionToHttpConnection.DEFAULT_ASYNC_CONNECTION_STRATEGY;
import static java.util.Objects.requireNonNull;

final class StreamingHttpClientToHttpClient implements HttpClient {
Expand All @@ -32,7 +32,7 @@ final class StreamingHttpClientToHttpClient implements HttpClient {
private final HttpRequestResponseFactory reqRespFactory;

StreamingHttpClientToHttpClient(final StreamingHttpClient client, final HttpExecutionStrategy strategy) {
this.strategy = defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy;
this.strategy = defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy;
this.client = client;
context = new DelegatingHttpExecutionContext(client.executionContext()) {
@Override
Expand Down Expand Up @@ -118,7 +118,7 @@ static final class ReservedStreamingHttpConnectionToReservedHttpConnection imple

ReservedStreamingHttpConnectionToReservedHttpConnection(final ReservedStreamingHttpConnection connection,
final HttpExecutionStrategy strategy) {
this(connection, defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy,
this(connection, defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy,
toAggregated(connection));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final class StreamingHttpConnectionToHttpConnection implements HttpConnection {
* For aggregation, we invoke the user callback (Single from client#request()) after the payload is completed,
* hence we need to offload data.
*/
static final HttpExecutionStrategy DEFAULT_CONNECTION_STRATEGY = OFFLOAD_RECEIVE_DATA_EVENT_STRATEGY;
static final HttpExecutionStrategy DEFAULT_ASYNC_CONNECTION_STRATEGY = OFFLOAD_RECEIVE_DATA_EVENT_STRATEGY;
private final StreamingHttpConnection connection;
private final HttpExecutionStrategy strategy;
private final HttpConnectionContext context;
Expand All @@ -37,7 +37,7 @@ final class StreamingHttpConnectionToHttpConnection implements HttpConnection {

StreamingHttpConnectionToHttpConnection(final StreamingHttpConnection connection,
final HttpExecutionStrategy strategy) {
this.strategy = defaultStrategy() == strategy ? DEFAULT_CONNECTION_STRATEGY : strategy;
this.strategy = defaultStrategy() == strategy ? DEFAULT_ASYNC_CONNECTION_STRATEGY : strategy;
this.connection = connection;
final HttpConnectionContext originalCtx = connection.connectionContext();
executionContext = new DelegatingHttpExecutionContext(connection.executionContext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.FilterableReservedStreamingHttpConnection;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.HttpContextKeys;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpRequestMetaData;
Expand All @@ -37,8 +40,11 @@
import io.servicetalk.http.api.RedirectConfig;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.utils.RedirectingHttpRequesterFilter;
Expand All @@ -63,6 +69,10 @@
import static io.servicetalk.concurrent.api.AsyncCloseables.toListenableAsyncCloseable;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadAll;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.DefaultSingleAddressHttpClientBuilder.setExecutionContext;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -106,12 +116,12 @@ public StreamingHttpClient buildStreaming() {
final CompositeCloseable closeables = newCompositeCloseable();
try {
final HttpExecutionContext executionContext = executionContextBuilder.build();
final ClientFactory clientFactory = new ClientFactory(builderFactory, executionContext,
singleAddressInitializer);
final ClientFactory clientFactory =
new ClientFactory(builderFactory, executionContext, singleAddressInitializer);
final CachingKeyFactory keyFactory = closeables.prepend(new CachingKeyFactory());
final HttpHeadersFactory headersFactory = this.headersFactory;
FilterableStreamingHttpClient urlClient = closeables.prepend(
new StreamingUrlHttpClient(executionContext, clientFactory, keyFactory,
new StreamingUrlHttpClient(executionContext, keyFactory, clientFactory,
new DefaultStreamingHttpRequestResponseFactory(executionContext.bufferAllocator(),
headersFactory != null ? headersFactory : DefaultHttpHeadersFactory.INSTANCE,
HTTP_1_1)));
Expand Down Expand Up @@ -240,6 +250,8 @@ public StreamingHttpClient apply(final UrlKey urlKey) {
builder.sslConfig(DEFAULT_CLIENT_SSL_CONFIG);
}

builder.appendClientFilter(HttpExecutionStrategyUpdater.INSTANCE);

if (singleAddressInitializer != null) {
singleAddressInitializer.initialize(urlKey.scheme, urlKey.hostAndPort, builder);
}
Expand All @@ -248,6 +260,63 @@ public StreamingHttpClient apply(final UrlKey urlKey) {
}
}

private static void singleClientStrategyUpdate(ContextMap context, HttpExecutionStrategy singleStrategy) {
HttpExecutionStrategy requestStrategy = context.getOrDefault(HTTP_EXECUTION_STRATEGY_KEY, defaultStrategy());
assert null != requestStrategy : "Request strategy unexpectedly null";
HttpExecutionStrategy useStrategy = defaultStrategy() == requestStrategy ?
// For all apis except async streaming default conversion has already been done.
// This is the default to required strategy resolution for the async streaming client.
offloadAll() :
defaultStrategy() == singleStrategy || !singleStrategy.hasOffloads() ?
// single client is default or has no *additional* offloads
requestStrategy :
// add single client offloads to existing strategy
requestStrategy.merge(singleStrategy);

if (useStrategy != requestStrategy) {
LOGGER.debug("Request strategy {} changes to {}. SingleAddressClient strategy: {}",
requestStrategy, useStrategy, singleStrategy);
context.put(HTTP_EXECUTION_STRATEGY_KEY, useStrategy);
}
}

/**
* When request transitions from the multi-address level to the single-address level, this filter will make sure
* that any missing offloading required by the selected single-address client will be applied for the request
* execution. This filter never reduces offloading, it can only add missing offloading flags. Users who want to
* execute a request without offloading must specify {@link HttpExecutionStrategies#offloadNone()} strategy at the
* {@link MultiAddressHttpClientBuilder} or explicitly set the required strategy at request context with
* {@link HttpContextKeys#HTTP_EXECUTION_STRATEGY_KEY}.
*/
private static final class HttpExecutionStrategyUpdater implements StreamingHttpClientFilterFactory {

static final StreamingHttpClientFilterFactory INSTANCE = new HttpExecutionStrategyUpdater();

private HttpExecutionStrategyUpdater() {
// Singleton
}

@Override
public StreamingHttpClientFilter create(final FilterableStreamingHttpClient client) {
return new StreamingHttpClientFilter(client) {
@Override
protected Single<StreamingHttpResponse> request(
final StreamingHttpRequester delegate, final StreamingHttpRequest request) {
return defer(() -> {
singleClientStrategyUpdate(request.context(), client.executionContext().executionStrategy());

return delegate.request(request);
});
}
};
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return offloadNone();
}
}

private static final class StreamingUrlHttpClient implements FilterableStreamingHttpClient {
private final HttpExecutionContext executionContext;
private final StreamingHttpRequestResponseFactory reqRespFactory;
Expand All @@ -256,8 +325,7 @@ private static final class StreamingUrlHttpClient implements FilterableStreaming
private final ListenableAsyncCloseable closeable;

StreamingUrlHttpClient(final HttpExecutionContext executionContext,
final Function<UrlKey, FilterableStreamingHttpClient> clientFactory,
final CachingKeyFactory keyFactory,
final CachingKeyFactory keyFactory, final ClientFactory clientFactory,
final StreamingHttpRequestResponseFactory reqRespFactory) {
this.reqRespFactory = requireNonNull(reqRespFactory);
this.group = ClientGroup.from(clientFactory);
Expand All @@ -278,7 +346,10 @@ public Single<? extends FilterableReservedStreamingHttpConnection> reserveConnec
final HttpRequestMetaData metaData) {
return defer(() -> {
try {
return selectClient(metaData).reserveConnection(metaData).shareContextOnSubscribe();
FilterableStreamingHttpClient singleClient = selectClient(metaData);
singleClientStrategyUpdate(metaData.context(), singleClient.executionContext().executionStrategy());

return singleClient.reserveConnection(metaData).shareContextOnSubscribe();
} catch (Throwable t) {
return Single.<FilterableReservedStreamingHttpConnection>failed(t).shareContextOnSubscribe();
}
Expand Down
Loading

0 comments on commit 0f1ac15

Please sign in to comment.