From 5c7c559f93b80f42a589e3d42a2ef81476fa9738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 27 Jul 2021 12:25:24 +0200 Subject: [PATCH 01/12] unhealthy host health checking WIP --- .../loadbalancer/RoundRobinLoadBalancer.java | 147 +++++++++++++++- .../RoundRobinLoadBalancerFactory.java | 35 +++- .../EagerRoundRobinLoadBalancerTest.java | 10 +- .../LingeringRoundRobinLoadBalancerTest.java | 10 +- .../RoundRobinLoadBalancerTest.java | 161 +++++++++++++++++- 5 files changed, 331 insertions(+), 32 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 7adf4fceeb..f4a115cc9f 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -22,12 +22,16 @@ import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscovererEvent; +import io.servicetalk.concurrent.Cancellable; import io.servicetalk.concurrent.PublisherSource.Processor; import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.AsyncCloseable; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; +import io.servicetalk.concurrent.api.DefaultThreadFactory; +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; @@ -43,7 +47,9 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; import java.util.function.Predicate; @@ -81,6 +87,10 @@ public final class RoundRobinLoadBalancer implements LoadBalancer { + static final String BACKGROUND_PROCESSING_EXECUTOR_NAME = "round-robin-load-balancer-executor"; + static final Executor SHARED_EXECUTOR = Executors.newFixedSizeExecutor(1, + new DefaultThreadFactory(BACKGROUND_PROCESSING_EXECUTOR_NAME)); + private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class); private static final List CLOSED_LIST = new ArrayList<>(0); private static final Object[] EMPTY_ARRAY = new Object[0]; @@ -116,6 +126,7 @@ public final class RoundRobinLoadBalancer eventStream; private final SequentialCancellable discoveryCancellable = new SequentialCancellable(); private final ConnectionFactory connectionFactory; + private final Executor healthCheckExecutor; private final ListenableAsyncCloseable asyncCloseable; /** @@ -129,7 +140,7 @@ public final class RoundRobinLoadBalancer> eventPublisher, final ConnectionFactory connectionFactory) { - this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED); + this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, SHARED_EXECUTOR); } /** @@ -144,10 +155,12 @@ public RoundRobinLoadBalancer(final Publisher> eventPublisher, final ConnectionFactory connectionFactory, - final boolean eagerConnectionShutdown) { + final boolean eagerConnectionShutdown, + final Executor healthCheckExecutor) { Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); this.eventStream = fromSource(eventStreamProcessor); this.connectionFactory = requireNonNull(connectionFactory); + this.healthCheckExecutor = healthCheckExecutor; toSource(eventPublisher).subscribe(new Subscriber>() { @@ -367,8 +380,10 @@ private Single selectConnection0(Predicate selector) { } } - // don't open new connections for expired hosts, try a different one - if (host.isActive()) { + // Don't open new connections for expired or unhealthy hosts, try a different one. + // Unhealthy hosts can in fact have open connections – that's why we don't fail earlier. + // When a host accepts a limited amount of connections, we will try to use those that have been established. + if (host.isActiveAndHealthy()) { pickedHost = host; break; } @@ -383,6 +398,8 @@ private Single selectConnection0(Predicate selector) { // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. return connectionFactory.newConnection(host.address, null) + // Schedule health check before returning + .beforeOnError(t -> host.registerHealthCheck(healthCheckExecutor, connectionFactory, selector)) .flatMap(newCnx -> { // Invoke the selector before adding the connection to the pool, otherwise, connection can be // used concurrently and hence a new connection can be rejected by the selector. @@ -432,7 +449,8 @@ public static final class RoundRobinLoadBalancerFactory LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED); + return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, + EAGER_CONNECTION_SHUTDOWN_ENABLED, SHARED_EXECUTOR); } } @@ -441,7 +459,7 @@ List>> usedAddresses() { return usedHosts.stream().map(Host::asEntry).collect(toList()); } - private static final class Host implements ListenableAsyncCloseable { + private static final class Host implements ListenableAsyncCloseable { private static final ConnState EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, State.ACTIVE); private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); @@ -451,6 +469,7 @@ private static final class Host implem private enum State { ACTIVE, + UNHEALTHY, // only active hosts can be unhealthy, expired hosts should not attempt opening new connections EXPIRED, CLOSED } @@ -470,6 +489,8 @@ private static final class ConnState { private volatile ConnState connState = EMPTY_CONN_STATE; + private final AtomicReference healthCheck = new AtomicReference<>(); + Host(Addr address) { this.address = requireNonNull(address); this.closeable = toAsyncCloseable(graceful -> @@ -477,18 +498,21 @@ private static final class ConnState { } boolean tryToMarkActive() { - return connStateUpdater.updateAndGet(this, oldConnState -> { + final State newState = connStateUpdater.updateAndGet(this, oldConnState -> { if (oldConnState.state == State.EXPIRED) { return new ConnState(oldConnState.connections, State.ACTIVE); } // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. + // UNHEALTHY state is treated similarly to ACTIVE return oldConnState; - }).state == State.ACTIVE; + }).state; + return newState == State.ACTIVE || newState == State.UNHEALTHY; } void markClosed() { final Object[] toRemove = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE).connections; + cancelHealthCheck(); LOGGER.debug("Closing {} connection(s) gracefully to closed address: {}", toRemove.length, address); for (Object conn : toRemove) { @SuppressWarnings("unchecked") @@ -501,13 +525,41 @@ void markExpired() { final ConnState newState = connStateUpdater.updateAndGet(this, oldConnState -> oldConnState.connections.length == 0 ? CLOSED_CONN_STATE : new ConnState(oldConnState.connections, State.EXPIRED)); + if (newState.state == State.EXPIRED) { + cancelHealthCheck(); + } if (newState == CLOSED_CONN_STATE) { // Trigger the callback to remove the host from usedHosts array. this.closeAsync().subscribe(); } } - boolean isActive() { + void markHealthy(boolean healthy) { + connStateUpdater.updateAndGet(this, oldConnState -> { + final State oldState = oldConnState.state; + + // Prevent EXPIRED and CLOSED state changes. Only ACTIVE hosts can become UNHEALTHY and vice versa. + if (oldState == State.EXPIRED || oldState == State.CLOSED) { + return oldConnState; + } + + return new ConnState(oldConnState.connections, healthy ? State.ACTIVE : State.UNHEALTHY); + }); + + if (healthy) { + // TODO(dj): fix a race condition: + // t1: schedule healthcheck, mark unhealthy + // t1: connection opened, update state to ACTIVE + // t2: failed connection, try to schedule health check + // -> fail CAS (check not null yet) + // -> doesn't mark unhealthy + // t1: cancel health check, ignoring signal from t2 + // next fail should schedule a proper check anyway - perhaps we can ignore this race? + cancelHealthCheck(); + } + } + + boolean isActiveAndHealthy() { return connState.state == State.ACTIVE; } @@ -599,11 +651,36 @@ public Completable onClose() { private Completable doClose(final Function closeFunction) { return Completable.defer(() -> { final Object[] connections = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE).connections; + cancelHealthCheck(); return connections.length == 0 ? completed() : from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn)); }); } + void registerHealthCheck(Executor executor, ConnectionFactory connectionFactory, Predicate selector) { + @SuppressWarnings("unchecked") + HealthCheck previousCheck = (HealthCheck) this.healthCheck.get(); + // If a previous check was in place, it can't have been cancelled so it was still scheduled at CAS. + if (previousCheck == null) { + // We only schedule a new check. If CAS fails, there is already a check active. + HealthCheck newCheck = new HealthCheck<>(executor, selector, connectionFactory, this); + if (this.healthCheck.compareAndSet(null, newCheck)) { + LOGGER.debug("Scheduled health check"); + newCheck.schedule(); + markHealthy(false); + } + } + + } + + private void cancelHealthCheck() { + // To prevent races, first free the reference – if another connection fails, it will schedule a new check. + final HealthCheck healthCheck = this.healthCheck.getAndSet(null); + if (healthCheck != null) { + healthCheck.cancel(); + } + } + @Override public String toString() { final ConnState connState = this.connState; @@ -615,6 +692,58 @@ public String toString() { } } + private static class HealthCheck + implements Runnable, Cancellable { + private Executor executor; + private Predicate selector; + private ConnectionFactory connectionFactory; + private Host host; + private volatile boolean cancelled; + + public HealthCheck(final Executor executor, final Predicate selector, + final ConnectionFactory connectionFactory, + final Host host) { + this.executor = executor; + this.selector = selector; + this.connectionFactory = connectionFactory; + this.host = host; + } + + public void schedule() { + executor.schedule(this, 1, TimeUnit.SECONDS); + } + + @Override + public void run() { + if (cancelled) { + return; + } + connectionFactory.newConnection(host.address, null) + .flatMapCompletable(newCnx -> { + if (!selector.test(newCnx)) { + return newCnx.closeAsync().concat(Completable.failed(new RuntimeException())); + } + if (host.addConnection(newCnx)) { + host.markHealthy(true); + } else { + return newCnx.closeAsync().concat(Completable.failed(new RuntimeException())); + } + return completed(); + }) + .afterOnError(e -> { + if (!cancelled) { + schedule(); + } + }) + .subscribe(); + } + + @Override + public void cancel() { + this.cancelled = true; + } + } + private static final class StacklessNoAvailableHostException extends NoAvailableHostException { private static final long serialVersionUID = 5942960040738091793L; diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 797507eef4..7e50f5eb46 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -20,9 +20,13 @@ import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.ServiceDiscovererEvent; +import io.servicetalk.concurrent.api.DefaultThreadFactory; +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; import java.util.function.Predicate; +import javax.annotation.Nullable; /** * {@link LoadBalancerFactory} that creates {@link LoadBalancer} instances which use a round robin strategy @@ -50,16 +54,19 @@ public final class RoundRobinLoadBalancerFactory LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, eagerConnectionShutdown); + return new RoundRobinLoadBalancer<>( + eventPublisher, connectionFactory, eagerConnectionShutdown, backgroundExecutor); } /** @@ -69,7 +76,12 @@ public LoadBalancer newLoadBalancer( * @param The type of connection. */ public static final class Builder { + private static final String BACKGROUND_PROCESSING_EXECUTOR_NAME = "round-robin-load-balancer-executor"; + private static final Executor SHARED_EXECUTOR = Executors.newFixedSizeExecutor(1, + new DefaultThreadFactory(BACKGROUND_PROCESSING_EXECUTOR_NAME)); private boolean eagerConnectionShutdown = EAGER_CONNECTION_SHUTDOWN_ENABLED; + @Nullable + private Executor backgroundExecutor; /** * Creates a new instance with default settings. @@ -95,13 +107,30 @@ public RoundRobinLoadBalancerFactory.Builder eagerConnection return this; } + /** + * This {@link LoadBalancer} monitors hosts to which connection establishment has failed + * using health checks that run in the background. The health check tries to establish a new connection + * and if it succeeds, the host is returned to the load balancing pool. As long as the connection + * establishment fails, the host is not considered for opening new connections for processed requests. + * + * @param backgroundExecutor {@link Executor} on which to schedule health checking. + * @return @{code this}. + */ + public RoundRobinLoadBalancerFactory.Builder backgroundExecutor( + Executor backgroundExecutor) { + this.backgroundExecutor = backgroundExecutor; + return this; + } + /** * Builds the {@link RoundRobinLoadBalancerFactory} configured by this builder. * * @return a new instance of {@link RoundRobinLoadBalancerFactory} with settings from this builder. */ public RoundRobinLoadBalancerFactory build() { - return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown); + final Executor backgroundExecutor = this.backgroundExecutor != null ? + this.backgroundExecutor : SHARED_EXECUTOR; + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, backgroundExecutor); } } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java index c21e3dc629..ee51ebd05c 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java @@ -106,13 +106,7 @@ private void validateConnectionClosedGracefully(final TestLoadBalancedConnection } @Override - protected RoundRobinLoadBalancer defaultLb() { - return newTestLoadBalancer(true); - } - - @Override - protected RoundRobinLoadBalancer defaultLb( - RoundRobinLoadBalancerTest.DelegatingConnectionFactory connectionFactory) { - return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, true); + protected boolean eagerConnectionShutdown() { + return true; } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java index 7770370e23..5c79fb7b02 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java @@ -309,13 +309,7 @@ public void handleDiscoveryEventsForNotConnectedHosts() { } @Override - protected RoundRobinLoadBalancer defaultLb() { - return newTestLoadBalancer(false); - } - - @Override - protected RoundRobinLoadBalancer defaultLb( - RoundRobinLoadBalancerTest.DelegatingConnectionFactory connectionFactory) { - return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, false); + protected boolean eagerConnectionShutdown() { + return false; } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index 07fbca0d08..6089259134 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -25,10 +25,13 @@ import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.ExecutorRule; import io.servicetalk.concurrent.api.LegacyTestSingle; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.api.TestExecutor; import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.concurrent.api.TestSubscription; import io.servicetalk.concurrent.internal.DeliberateException; @@ -65,6 +68,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable; import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; @@ -90,6 +94,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -105,6 +110,9 @@ abstract class RoundRobinLoadBalancerTest { @Rule public final ExpectedException thrown = ExpectedException.none(); + @Rule + public final ExecutorRule executor = ExecutorRule.withTestExecutor(); + protected final TestSingleSubscriber selectConnectionListener = new TestSingleSubscriber<>(); protected final List connectionsCreated = new CopyOnWriteArrayList<>(); @@ -116,6 +124,8 @@ abstract class RoundRobinLoadBalancerTest { protected RoundRobinLoadBalancer lb; + protected TestExecutor testExecutor; + protected static Predicate any() { return __ -> true; } @@ -124,14 +134,21 @@ protected Predicate alwaysNewConnectionFilter() { return cnx -> lb.usedAddresses().stream().noneMatch(addr -> addr.getValue().stream().anyMatch(cnx::equals)); } - protected abstract RoundRobinLoadBalancer defaultLb(); + protected RoundRobinLoadBalancer defaultLb() { + return newTestLoadBalancer(eagerConnectionShutdown()); + } - protected abstract RoundRobinLoadBalancer defaultLb( - DelegatingConnectionFactory connectionFactory); + protected RoundRobinLoadBalancer defaultLb( + DelegatingConnectionFactory connectionFactory) { + return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, eagerConnectionShutdown()); + } + + protected abstract boolean eagerConnectionShutdown(); @Before public void initialize() { lb = defaultLb(); + testExecutor = executor.executor(); connectionsCreated.clear(); connectionRealizers.clear(); } @@ -381,6 +398,93 @@ public void newConnectionIsClosedWhenSelectorRejects() throws Exception { awaitIndefinitely(connection.onClose()); } + @Test + public void hostUnhealthyIsHealthChecked() throws Exception { + serviceDiscoveryPublisher.onComplete(); + final Single properConnection = newRealizedConnectionSingle("address-1"); + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = + new UnhealthyHostConnectionFactory(properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = defaultLb(connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + + unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); + testExecutor.advanceTimeBy(1, SECONDS); + unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); + testExecutor.advanceTimeBy(1, SECONDS); + unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); + testExecutor.advanceTimeBy(1, SECONDS); + + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + + // 1 failed attempt triggers health check, 2 health check attempts fail, 3rd health check attempt + // uses the proper connection, final selection reuses that connection. 4 total creation attempts. + int expectedConnectionAttempts = 4; + assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(expectedConnectionAttempts)); + } + + @Test + public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = + new UnhealthyHostConnectionFactory(properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = defaultLb(connectionFactory); + sendServiceDiscoveryEvents(upEvent("address-1")); + + // Imitate concurrency by running multiple threads attempting to establish connections. + ExecutorService executor = Executors.newFixedThreadPool(3); + final Runnable runnable = () -> + assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + + for (int i = 0; i < 1000; i++) { + executor.submit(runnable); + } + + // From test main thread, wait until the host becomes UNHEALTHY, which is apparent from NoHostAvailableException + // being thrown from selection. + try { + awaitIndefinitely(lb.selectConnection(any()).retry((i, t) -> t instanceof DeliberateException)); + } catch (Exception e) { + assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + } + + // At this point, either the above selection caused the host to be marked as UNHEALTHY, + // or any background thread. We can assume from now on that only the health check can attempt establishing + // new connections. Therefore we can count those attempts. If our assumption doesn't hold, the UNHEALTHY + // state is either not properly set or multiple health checks run at the same time. + int requestsBefore = unhealthyHostConnectionFactory.requests.get(); + unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); + testExecutor.advanceTimeBy(1, SECONDS); + + assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(requestsBefore + 1)); + + unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); + testExecutor.advanceTimeBy(1, SECONDS); + assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(requestsBefore + 2)); + + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + + unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); + testExecutor.advanceTimeBy(1, SECONDS); + assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(requestsBefore + 3)); + + // After 3 increments a proper established connection is returned and the host should be eligible for selection. + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(requestsBefore + 3)); + } + @SuppressWarnings("unchecked") protected void sendServiceDiscoveryEvents(final ServiceDiscovererEvent... events) { sendServiceDiscoveryEvents(serviceDiscoveryPublisher, events); @@ -408,7 +512,22 @@ protected RoundRobinLoadBalancer newTestLoad protected RoundRobinLoadBalancer newTestLoadBalancer( final TestPublisher> serviceDiscoveryPublisher, final DelegatingConnectionFactory connectionFactory, final boolean eagerConnectionShutdown) { - return new RoundRobinLoadBalancer<>(serviceDiscoveryPublisher, connectionFactory, eagerConnectionShutdown); + return new RoundRobinLoadBalancer<>(serviceDiscoveryPublisher, connectionFactory, + eagerConnectionShutdown, testExecutor); + } + + protected RoundRobinLoadBalancer newTestLoadBalancer( + final TestPublisher> serviceDiscoveryPublisher, + final DelegatingConnectionFactory connectionFactory, final boolean eagerConnectionShutdown, + @Nullable final Executor executor) { + RoundRobinLoadBalancerFactory.Builder builder = + new RoundRobinLoadBalancerFactory.Builder() + .eagerConnectionShutdown(eagerConnectionShutdown); + if (executor != null) { + builder.backgroundExecutor(executor); + } + return (RoundRobinLoadBalancer) builder.build() + .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); } @SafeVarargs @@ -510,4 +629,38 @@ boolean isClosed() { return closed.get(); } } + + protected static class UnhealthyHostConnectionFactory { + final AtomicInteger momentInTime = new AtomicInteger(); + final AtomicInteger requests = new AtomicInteger(); + final Single properConnection; + + Function> factory + = new Function>() { + + final List> connections = + Arrays.asList( + failed(DELIBERATE_EXCEPTION), + failed(DELIBERATE_EXCEPTION), + failed(DELIBERATE_EXCEPTION) + ); + + @Override + public Single apply(final String s) { + requests.incrementAndGet(); + if (momentInTime.get() >= connections.size()) { + return properConnection; + } + return connections.get(momentInTime.get()); + } + }; + + UnhealthyHostConnectionFactory(Single properConnection) { + this.properConnection = properConnection; + } + + DelegatingConnectionFactory createFactory() { + return new DelegatingConnectionFactory(this.factory); + } + } } From 2faf2322aaba43df75e7b2053a147cce2d2b3998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 28 Jul 2021 11:50:45 +0200 Subject: [PATCH 02/12] Single Host state including HealthCheck --- .../loadbalancer/RoundRobinLoadBalancer.java | 254 +++++++++--------- 1 file changed, 130 insertions(+), 124 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index f4a115cc9f..dd75f6f97d 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -49,7 +49,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; import java.util.function.Predicate; @@ -257,9 +256,9 @@ private List> addHostToList( // duplicates are not allowed for (Host host : oldHostsTyped) { if (host.address.equals(addr)) { - if (handleExpired && !host.tryToMarkActive()) { - // If the new state is not ACTIVE, the host is already in CLOSED state, we should create - // a new entry. For duplicate ACTIVE events or for repeated activation due to failed CAS + if (handleExpired && !host.markActiveIfNotClosed()) { + // If the host is already in CLOSED state, we should create a new entry. + // For duplicate ACTIVE events or for repeated activation due to failed CAS // of replacing the usedHosts array the marking succeeds so we will not add a new entry. break; } @@ -460,25 +459,74 @@ List>> usedAddresses() { } private static final class Host implements ListenableAsyncCloseable { - private static final ConnState EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, State.ACTIVE); - private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); + private static final Object STATE_ACTIVE = new Object(); + private static final Object STATE_EXPIRED = new Object(); + private static final Object STATE_CLOSED = new Object(); + + private static final ConnState EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE); + private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_CLOSED); @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater connStateUpdater = AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState"); - private enum State { - ACTIVE, - UNHEALTHY, // only active hosts can be unhealthy, expired hosts should not attempt opening new connections - EXPIRED, - CLOSED + private static class HealthCheck + implements Runnable, Cancellable { + private Executor executor; + private Predicate selector; + private ConnectionFactory connectionFactory; + private Host host; + private volatile boolean cancelled; + + public HealthCheck(final Executor executor, final Predicate selector, + final ConnectionFactory connectionFactory, + final Host host) { + this.executor = executor; + this.selector = selector; + this.connectionFactory = connectionFactory; + this.host = host; + } + + public void schedule() { + executor.schedule(this, 1, TimeUnit.SECONDS); + } + + @Override + public void run() { + if (cancelled) { + return; + } + connectionFactory.newConnection(host.address, null) + .flatMapCompletable(newCnx -> { + if (!selector.test(newCnx)) { + return newCnx.closeAsync().concat(Completable.failed(new RuntimeException())); + } + if (host.addConnection(newCnx)) { + host.markHealthy(); + } else { + return newCnx.closeAsync().concat(Completable.failed(new RuntimeException())); + } + return completed(); + }) + .afterOnError(e -> { + if (!cancelled) { + schedule(); + } + }) + .subscribe(); + } + + @Override + public void cancel() { + this.cancelled = true; + } } private static final class ConnState { final Object[] connections; - final State state; + final Object state; - ConnState(final Object[] connections, final State state) { + ConnState(final Object[] connections, final Object state) { this.connections = connections; this.state = state; } @@ -486,33 +534,31 @@ private static final class ConnState { final Addr address; private final ListenableAsyncCloseable closeable; - private volatile ConnState connState = EMPTY_CONN_STATE; - private final AtomicReference healthCheck = new AtomicReference<>(); - Host(Addr address) { this.address = requireNonNull(address); this.closeable = toAsyncCloseable(graceful -> graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); } - boolean tryToMarkActive() { - final State newState = connStateUpdater.updateAndGet(this, oldConnState -> { - if (oldConnState.state == State.EXPIRED) { - return new ConnState(oldConnState.connections, State.ACTIVE); + boolean markActiveIfNotClosed() { + final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { + if (oldConnState.state == STATE_EXPIRED) { + return new ConnState(oldConnState.connections, STATE_ACTIVE); } // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. // UNHEALTHY state is treated similarly to ACTIVE return oldConnState; }).state; - return newState == State.ACTIVE || newState == State.UNHEALTHY; + return oldState != STATE_CLOSED; } void markClosed() { - final Object[] toRemove = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE).connections; - cancelHealthCheck(); + final ConnState oldState = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE); + final Object[] toRemove = oldState.connections; + cancelIfHealthCheck(oldState.state); LOGGER.debug("Closing {} connection(s) gracefully to closed address: {}", toRemove.length, address); for (Object conn : toRemove) { @SuppressWarnings("unchecked") @@ -522,45 +568,37 @@ void markClosed() { } void markExpired() { - final ConnState newState = connStateUpdater.updateAndGet(this, - oldConnState -> oldConnState.connections.length == 0 ? - CLOSED_CONN_STATE : new ConnState(oldConnState.connections, State.EXPIRED)); - if (newState.state == State.EXPIRED) { - cancelHealthCheck(); - } - if (newState == CLOSED_CONN_STATE) { - // Trigger the callback to remove the host from usedHosts array. - this.closeAsync().subscribe(); + for (;;) { + ConnState oldState = connStateUpdater.get(this); + if (oldState.state == STATE_EXPIRED || oldState.state == STATE_CLOSED) { + break; + } + Object nextState = oldState.connections.length == 0 ? STATE_CLOSED : STATE_EXPIRED; + + if (connStateUpdater.compareAndSet(this, oldState, + new ConnState(oldState.connections, nextState))) { + cancelIfHealthCheck(oldState.state); + if (nextState == STATE_CLOSED) { + // Trigger the callback to remove the host from usedHosts array. + this.closeAsync().subscribe(); + } + break; + } } } - void markHealthy(boolean healthy) { - connStateUpdater.updateAndGet(this, oldConnState -> { - final State oldState = oldConnState.state; - - // Prevent EXPIRED and CLOSED state changes. Only ACTIVE hosts can become UNHEALTHY and vice versa. - if (oldState == State.EXPIRED || oldState == State.CLOSED) { - return oldConnState; + void markHealthy() { + Object oldState = connStateUpdater.getAndUpdate(this, previous -> { + if (previous.state.getClass().equals(HealthCheck.class)) { + return new ConnState(previous.connections, STATE_ACTIVE); } - - return new ConnState(oldConnState.connections, healthy ? State.ACTIVE : State.UNHEALTHY); - }); - - if (healthy) { - // TODO(dj): fix a race condition: - // t1: schedule healthcheck, mark unhealthy - // t1: connection opened, update state to ACTIVE - // t2: failed connection, try to schedule health check - // -> fail CAS (check not null yet) - // -> doesn't mark unhealthy - // t1: cancel health check, ignoring signal from t2 - // next fail should schedule a proper check anyway - perhaps we can ignore this race? - cancelHealthCheck(); - } + return previous; + }).state; + cancelIfHealthCheck(oldState); } boolean isActiveAndHealthy() { - return connState.state == State.ACTIVE; + return connState.state == STATE_ACTIVE; } boolean addConnection(C connection) { @@ -595,11 +633,11 @@ currentConnState, new ConnState(newList, currentConnState.state))) { if (i == connections.length) { break; } else if (connections.length == 1) { - if (currentConnState.state == State.ACTIVE) { + if (currentConnState.state == STATE_ACTIVE) { if (connStateUpdater.compareAndSet(this, currentConnState, EMPTY_CONN_STATE)) { break; } - } else if (currentConnState.state == State.EXPIRED + } else if (currentConnState.state == STATE_EXPIRED // We're closing the last connection, close the Host. // Closing the host will trigger the Host's onClose method, which will remove the host // from used hosts list. If a race condition appears and a new connection was added @@ -650,33 +688,41 @@ public Completable onClose() { @SuppressWarnings("unchecked") private Completable doClose(final Function closeFunction) { return Completable.defer(() -> { - final Object[] connections = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE).connections; - cancelHealthCheck(); + final ConnState oldState = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE); + cancelIfHealthCheck(oldState.state); + final Object[] connections = oldState.connections; return connections.length == 0 ? completed() : from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn)); }); } void registerHealthCheck(Executor executor, ConnectionFactory connectionFactory, Predicate selector) { - @SuppressWarnings("unchecked") - HealthCheck previousCheck = (HealthCheck) this.healthCheck.get(); - // If a previous check was in place, it can't have been cancelled so it was still scheduled at CAS. - if (previousCheck == null) { - // We only schedule a new check. If CAS fails, there is already a check active. - HealthCheck newCheck = new HealthCheck<>(executor, selector, connectionFactory, this); - if (this.healthCheck.compareAndSet(null, newCheck)) { - LOGGER.debug("Scheduled health check"); - newCheck.schedule(); - markHealthy(false); + for (;;) { + ConnState previous = connStateUpdater.get(this); + if (previous.state != STATE_ACTIVE) { + break; } - } + // TODO(dj): if we mark the host as unhealthy for any connection failure, transient failures will + // potentially block clients from performing any requests for the health check interval. + // There needs to be more logic for determining when to take the host out of the pool, considering + // the actual pool and the state of other hosts. + + final ConnState nextState = new ConnState(previous.connections, + new HealthCheck(executor, selector, connectionFactory, this)); + if (connStateUpdater.compareAndSet(this, previous, nextState)) { + @SuppressWarnings("unchecked") + final HealthCheck state = (HealthCheck) nextState.state; + state.schedule(); + break; + } + } } - private void cancelHealthCheck() { - // To prevent races, first free the reference – if another connection fails, it will schedule a new check. - final HealthCheck healthCheck = this.healthCheck.getAndSet(null); - if (healthCheck != null) { + private void cancelIfHealthCheck(Object o) { + if (o.getClass().equals(HealthCheck.class)) { + @SuppressWarnings("unchecked") + HealthCheck healthCheck = (HealthCheck) o; healthCheck.cancel(); } } @@ -686,61 +732,21 @@ public String toString() { final ConnState connState = this.connState; return "Host{" + "address=" + address + - ", state=" + connState.state + + ", state=" + describeState(connState.state) + ", #connections=" + connState.connections.length + '}'; } - } - - private static class HealthCheck - implements Runnable, Cancellable { - private Executor executor; - private Predicate selector; - private ConnectionFactory connectionFactory; - private Host host; - private volatile boolean cancelled; - - public HealthCheck(final Executor executor, final Predicate selector, - final ConnectionFactory connectionFactory, - final Host host) { - this.executor = executor; - this.selector = selector; - this.connectionFactory = connectionFactory; - this.host = host; - } - public void schedule() { - executor.schedule(this, 1, TimeUnit.SECONDS); - } - - @Override - public void run() { - if (cancelled) { - return; + private String describeState(Object state) { + if (state == STATE_ACTIVE) { + return "active"; + } else if (state == STATE_EXPIRED) { + return "expired"; + } else if (state == STATE_CLOSED) { + return "closed"; + } else { + return "unhealthy"; } - connectionFactory.newConnection(host.address, null) - .flatMapCompletable(newCnx -> { - if (!selector.test(newCnx)) { - return newCnx.closeAsync().concat(Completable.failed(new RuntimeException())); - } - if (host.addConnection(newCnx)) { - host.markHealthy(true); - } else { - return newCnx.closeAsync().concat(Completable.failed(new RuntimeException())); - } - return completed(); - }) - .afterOnError(e -> { - if (!cancelled) { - schedule(); - } - }) - .subscribe(); - } - - @Override - public void cancel() { - this.cancelled = true; } } From c637add80fea24886814cdd5a87fd16274284dae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Thu, 29 Jul 2021 20:01:50 +0200 Subject: [PATCH 03/12] Health Checking after threshold --- .../loadbalancer/RoundRobinLoadBalancer.java | 72 +++++++++------ .../RoundRobinLoadBalancerFactory.java | 58 +++++++++--- .../RoundRobinLoadBalancerTest.java | 89 +++++++++---------- 3 files changed, 132 insertions(+), 87 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index dd75f6f97d..babf8dd017 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -41,13 +41,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; @@ -89,6 +89,10 @@ public final class RoundRobinLoadBalancer CLOSED_LIST = new ArrayList<>(0); @@ -125,7 +129,6 @@ public final class RoundRobinLoadBalancer eventStream; private final SequentialCancellable discoveryCancellable = new SequentialCancellable(); private final ConnectionFactory connectionFactory; - private final Executor healthCheckExecutor; private final ListenableAsyncCloseable asyncCloseable; /** @@ -139,7 +142,7 @@ public final class RoundRobinLoadBalancer> eventPublisher, final ConnectionFactory connectionFactory) { - this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, SHARED_EXECUTOR); + this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, SHARED_HEALTH_CHECK_CONFIG); } /** @@ -155,11 +158,10 @@ public RoundRobinLoadBalancer(final Publisher> eventPublisher, final ConnectionFactory connectionFactory, final boolean eagerConnectionShutdown, - final Executor healthCheckExecutor) { + final HealthCheckConfig healthCheckConfig) { Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); this.eventStream = fromSource(eventStreamProcessor); this.connectionFactory = requireNonNull(connectionFactory); - this.healthCheckExecutor = healthCheckExecutor; toSource(eventPublisher).subscribe(new Subscriber>() { @@ -233,7 +235,7 @@ private List> markHostAsExpired( } private Host createHost(ResolvedAddress addr) { - Host host = new Host<>(addr); + Host host = new Host<>(addr, healthCheckConfig); if (!eagerConnectionShutdown) { host.onClose().afterFinally(() -> usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { @@ -380,8 +382,7 @@ private Single selectConnection0(Predicate selector) { } // Don't open new connections for expired or unhealthy hosts, try a different one. - // Unhealthy hosts can in fact have open connections – that's why we don't fail earlier. - // When a host accepts a limited amount of connections, we will try to use those that have been established. + // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. if (host.isActiveAndHealthy()) { pickedHost = host; break; @@ -398,7 +399,7 @@ private Single selectConnection0(Predicate selector) { // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. return connectionFactory.newConnection(host.address, null) // Schedule health check before returning - .beforeOnError(t -> host.registerHealthCheck(healthCheckExecutor, connectionFactory, selector)) + .beforeOnError(t -> host.markUnhealthy(connectionFactory, selector)) .flatMap(newCnx -> { // Invoke the selector before adding the connection to the pool, otherwise, connection can be // used concurrently and hence a new connection can be rejected by the selector. @@ -449,7 +450,7 @@ public LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, - EAGER_CONNECTION_SHUTDOWN_ENABLED, SHARED_EXECUTOR); + EAGER_CONNECTION_SHUTDOWN_ENABLED, SHARED_HEALTH_CHECK_CONFIG); } } @@ -458,6 +459,18 @@ List>> usedAddresses() { return usedHosts.stream().map(Host::asEntry).collect(toList()); } + static final class HealthCheckConfig { + private final Executor executor; + private final Duration healthCheckInterval; + private final int failedThreshold; + + HealthCheckConfig(final Executor executor, final Duration healthCheckInterval, final int failedThreshold) { + this.executor = executor; + this.healthCheckInterval = healthCheckInterval; + this.failedThreshold = failedThreshold; + } + } + private static final class Host implements ListenableAsyncCloseable { private static final Object STATE_ACTIVE = new Object(); private static final Object STATE_EXPIRED = new Object(); @@ -470,25 +483,27 @@ private static final class Host implemen private static final AtomicReferenceFieldUpdater connStateUpdater = AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState"); - private static class HealthCheck + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater connectionFailuresUpdater = + AtomicIntegerFieldUpdater.newUpdater(Host.class, "connectionFailures"); + + private static final class HealthCheck implements Runnable, Cancellable { - private Executor executor; - private Predicate selector; - private ConnectionFactory connectionFactory; - private Host host; + private final Predicate selector; + private final ConnectionFactory connectionFactory; + private final Host host; private volatile boolean cancelled; - public HealthCheck(final Executor executor, final Predicate selector, + private HealthCheck(final Predicate selector, final ConnectionFactory connectionFactory, final Host host) { - this.executor = executor; this.selector = selector; this.connectionFactory = connectionFactory; this.host = host; } public void schedule() { - executor.schedule(this, 1, TimeUnit.SECONDS); + host.healthCheckConfig.executor.schedule(this, host.healthCheckConfig.healthCheckInterval); } @Override @@ -533,11 +548,14 @@ private static final class ConnState { } final Addr address; + private final HealthCheckConfig healthCheckConfig; private final ListenableAsyncCloseable closeable; private volatile ConnState connState = EMPTY_CONN_STATE; + private volatile int connectionFailures; - Host(Addr address) { + Host(Addr address, HealthCheckConfig healthCheckConfig) { this.address = requireNonNull(address); + this.healthCheckConfig = requireNonNull(healthCheckConfig); this.closeable = toAsyncCloseable(graceful -> graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); } @@ -616,6 +634,8 @@ currentConnState, new ConnState(newList, currentConnState.state))) { } } + connectionFailuresUpdater.set(this, 0); + // Instrument the new connection so we prune it on close connection.onClose().beforeFinally(() -> { for (;;) { @@ -696,20 +716,18 @@ private Completable doClose(final Function closeFunction }); } - void registerHealthCheck(Executor executor, ConnectionFactory connectionFactory, Predicate selector) { + void markUnhealthy(ConnectionFactory connectionFactory, Predicate selector) { + connectionFailuresUpdater.getAndUpdate(this, val -> val == Integer.MAX_VALUE ? val : val + 1); for (;;) { ConnState previous = connStateUpdater.get(this); - if (previous.state != STATE_ACTIVE) { + if (previous.state != STATE_ACTIVE + || previous.connections.length > 0 + || connectionFailuresUpdater.get(this) < this.healthCheckConfig.failedThreshold) { break; } - // TODO(dj): if we mark the host as unhealthy for any connection failure, transient failures will - // potentially block clients from performing any requests for the health check interval. - // There needs to be more logic for determining when to take the host out of the pool, considering - // the actual pool and the state of other hosts. - final ConnState nextState = new ConnState(previous.connections, - new HealthCheck(executor, selector, connectionFactory, this)); + new HealthCheck(selector, connectionFactory, this)); if (connStateUpdater.compareAndSet(this, previous, nextState)) { @SuppressWarnings("unchecked") final HealthCheck state = (HealthCheck) nextState.state; diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 7e50f5eb46..7fd84f020e 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -20,11 +20,10 @@ import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.ServiceDiscovererEvent; -import io.servicetalk.concurrent.api.DefaultThreadFactory; import io.servicetalk.concurrent.api.Executor; -import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; +import java.time.Duration; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -54,11 +53,12 @@ public final class RoundRobinLoadBalancerFactory LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { return new RoundRobinLoadBalancer<>( - eventPublisher, connectionFactory, eagerConnectionShutdown, backgroundExecutor); + eventPublisher, connectionFactory, eagerConnectionShutdown, healthCheckConfig); } /** @@ -76,12 +76,11 @@ public LoadBalancer newLoadBalancer( * @param The type of connection. */ public static final class Builder { - private static final String BACKGROUND_PROCESSING_EXECUTOR_NAME = "round-robin-load-balancer-executor"; - private static final Executor SHARED_EXECUTOR = Executors.newFixedSizeExecutor(1, - new DefaultThreadFactory(BACKGROUND_PROCESSING_EXECUTOR_NAME)); private boolean eagerConnectionShutdown = EAGER_CONNECTION_SHUTDOWN_ENABLED; @Nullable private Executor backgroundExecutor; + private int healthCheckIntervalMillis; + private int healthCheckFailedConnectionsThreshold; /** * Creates a new instance with default settings. @@ -114,7 +113,7 @@ public RoundRobinLoadBalancerFactory.Builder eagerConnection * establishment fails, the host is not considered for opening new connections for processed requests. * * @param backgroundExecutor {@link Executor} on which to schedule health checking. - * @return @{code this}. + * @return {@code this}. */ public RoundRobinLoadBalancerFactory.Builder backgroundExecutor( Executor backgroundExecutor) { @@ -122,6 +121,31 @@ public RoundRobinLoadBalancerFactory.Builder backgroundExecu return this; } + /** + * Configure an interval for health checking a host that failed to open connections. + * @param intervalMillis interval at which a background health check will be scheduled. + * @return {@code this}. + */ + public RoundRobinLoadBalancerFactory.Builder healthCheckIntervalMillis(int intervalMillis) { + this.healthCheckIntervalMillis = intervalMillis; + return this; + } + + /** + * Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer} fails + * to open a connection in more consecutive attempts than the specified value, the host will be marked as + * unhealthy and a connection establishment will take place in the background. Until finished, the host will + * not take part in load balancing selection. + * @param threshold number of consecutive connection failures to consider a host unhealthy and elligible for + * background health checking. + * @return {@code this}. + */ + public RoundRobinLoadBalancerFactory.Builder healthCheckFailedConnectionsThreshold( + int threshold) { + this.healthCheckFailedConnectionsThreshold = threshold; + return this; + } + /** * Builds the {@link RoundRobinLoadBalancerFactory} configured by this builder. * @@ -129,8 +153,18 @@ public RoundRobinLoadBalancerFactory.Builder backgroundExecu */ public RoundRobinLoadBalancerFactory build() { final Executor backgroundExecutor = this.backgroundExecutor != null ? - this.backgroundExecutor : SHARED_EXECUTOR; - return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, backgroundExecutor); + this.backgroundExecutor : RoundRobinLoadBalancer.SHARED_EXECUTOR; + final Duration healthCheckInterval = healthCheckIntervalMillis > 0 ? + Duration.ofMillis(healthCheckIntervalMillis) + : RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_INTERVAL; + final int healthCheckFailedConnectionsThreshold = this.healthCheckFailedConnectionsThreshold > 0 ? + this.healthCheckFailedConnectionsThreshold + : RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; + final RoundRobinLoadBalancer.HealthCheckConfig healthCheckConfig = + new RoundRobinLoadBalancer.HealthCheckConfig( + backgroundExecutor, healthCheckInterval, healthCheckFailedConnectionsThreshold); + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, + healthCheckConfig); } } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index 6089259134..c17b8bb3d2 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -25,7 +25,6 @@ import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.Completable; -import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.ExecutorRule; import io.servicetalk.concurrent.api.LegacyTestSingle; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; @@ -33,7 +32,6 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TestExecutor; import io.servicetalk.concurrent.api.TestPublisher; -import io.servicetalk.concurrent.api.TestSubscription; import io.servicetalk.concurrent.internal.DeliberateException; import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; import io.servicetalk.concurrent.test.internal.TestSingleSubscriber; @@ -42,6 +40,7 @@ import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -68,7 +67,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; -import javax.annotation.Nullable; +import java.util.stream.IntStream; import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable; import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; @@ -156,19 +155,19 @@ public void initialize() { @After public void closeLoadBalancer() throws Exception { awaitIndefinitely(lb.closeAsync()); - awaitIndefinitely(lb.onClose()); - - TestSubscription subscription = new TestSubscription(); - serviceDiscoveryPublisher.onSubscribe(subscription); - assertTrue(subscription.isCancelled()); - - connectionsCreated.forEach(cnx -> { - try { - awaitIndefinitely(cnx.onClose()); - } catch (final Exception e) { - throw new RuntimeException("Connection: " + cnx + " didn't close properly", e); - } - }); + // awaitIndefinitely(lb.onClose()); + // + // TestSubscription subscription = new TestSubscription(); + // serviceDiscoveryPublisher.onSubscribe(subscription); + // assertTrue(subscription.isCancelled()); + // + // connectionsCreated.forEach(cnx -> { + // try { + // awaitIndefinitely(cnx.onClose()); + // } catch (final Exception e) { + // throw new RuntimeException("Connection: " + cnx + " didn't close properly", e); + // } + // }); } @Test @@ -403,15 +402,17 @@ public void hostUnhealthyIsHealthChecked() throws Exception { serviceDiscoveryPublisher.onComplete(); final Single properConnection = newRealizedConnectionSingle("address-1"); final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = - new UnhealthyHostConnectionFactory(properConnection); + new UnhealthyHostConnectionFactory(3 ,properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); lb = defaultLb(connectionFactory); sendServiceDiscoveryEvents(upEvent("address-1")); - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); - assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + for (int i = 0; i < 5; i++) { + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + } unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); testExecutor.advanceTimeBy(1, SECONDS); @@ -423,19 +424,24 @@ public void hostUnhealthyIsHealthChecked() throws Exception { final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); - // 1 failed attempt triggers health check, 2 health check attempts fail, 3rd health check attempt - // uses the proper connection, final selection reuses that connection. 4 total creation attempts. - int expectedConnectionAttempts = 4; + // 5 failed attempts trigger health check, 2 health check attempts fail, 3rd health check attempt + // uses the proper connection, final selection reuses that connection. 8 total creation attempts. + int expectedConnectionAttempts = 8; assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(expectedConnectionAttempts)); } + // Concurrency test, run multiple times. + // Ignored for now, the test should not monitor failed connection attempts, as the synchronisation point is after + // the failure, not before attempting, so a race is ok. Running multiple checks is not ok. + // We should validate the the host remains unhealthy while the health check is active, only that. + @Ignore @Test public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { serviceDiscoveryPublisher.onComplete(); final Single properConnection = newRealizedConnectionSingle("address-1"); final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = - new UnhealthyHostConnectionFactory(properConnection); + new UnhealthyHostConnectionFactory(3, properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); lb = defaultLb(connectionFactory); @@ -451,9 +457,11 @@ public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { } // From test main thread, wait until the host becomes UNHEALTHY, which is apparent from NoHostAvailableException - // being thrown from selection. + // being thrown from selection AFTER a health check was scheduled by any thread. try { - awaitIndefinitely(lb.selectConnection(any()).retry((i, t) -> t instanceof DeliberateException)); + awaitIndefinitely(lb.selectConnection(any()).retry((i, t) -> + t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0 + )); } catch (Exception e) { assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); } @@ -513,21 +521,9 @@ protected RoundRobinLoadBalancer newTestLoad final TestPublisher> serviceDiscoveryPublisher, final DelegatingConnectionFactory connectionFactory, final boolean eagerConnectionShutdown) { return new RoundRobinLoadBalancer<>(serviceDiscoveryPublisher, connectionFactory, - eagerConnectionShutdown, testExecutor); - } - - protected RoundRobinLoadBalancer newTestLoadBalancer( - final TestPublisher> serviceDiscoveryPublisher, - final DelegatingConnectionFactory connectionFactory, final boolean eagerConnectionShutdown, - @Nullable final Executor executor) { - RoundRobinLoadBalancerFactory.Builder builder = - new RoundRobinLoadBalancerFactory.Builder() - .eagerConnectionShutdown(eagerConnectionShutdown); - if (executor != null) { - builder.backgroundExecutor(executor); - } - return (RoundRobinLoadBalancer) builder.build() - .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); + eagerConnectionShutdown, new RoundRobinLoadBalancer.HealthCheckConfig(testExecutor, + RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_INTERVAL, + RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD)); } @SafeVarargs @@ -634,17 +630,11 @@ protected static class UnhealthyHostConnectionFactory { final AtomicInteger momentInTime = new AtomicInteger(); final AtomicInteger requests = new AtomicInteger(); final Single properConnection; + final List> connections; Function> factory = new Function>() { - final List> connections = - Arrays.asList( - failed(DELIBERATE_EXCEPTION), - failed(DELIBERATE_EXCEPTION), - failed(DELIBERATE_EXCEPTION) - ); - @Override public Single apply(final String s) { requests.incrementAndGet(); @@ -655,7 +645,10 @@ public Single apply(final String s) { } }; - UnhealthyHostConnectionFactory(Single properConnection) { + UnhealthyHostConnectionFactory(int timeAdvancementsTillSuccess, Single properConnection) { + this.connections = IntStream.range(0, timeAdvancementsTillSuccess) + .>mapToObj(__ -> failed(DELIBERATE_EXCEPTION)) + .collect(Collectors.toList()); this.properConnection = properConnection; } From be2871c00e26bd410c8537ed14bd971d6b4d94ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Fri, 30 Jul 2021 11:53:26 +0200 Subject: [PATCH 04/12] Improved tests --- .../RoundRobinLoadBalancerTest.java | 155 ++++++++++++------ 1 file changed, 101 insertions(+), 54 deletions(-) diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index c17b8bb3d2..80e9e358e4 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -32,6 +32,7 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TestExecutor; import io.servicetalk.concurrent.api.TestPublisher; +import io.servicetalk.concurrent.api.TestSubscription; import io.servicetalk.concurrent.internal.DeliberateException; import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; import io.servicetalk.concurrent.test.internal.TestSingleSubscriber; @@ -40,7 +41,6 @@ import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -155,19 +155,19 @@ public void initialize() { @After public void closeLoadBalancer() throws Exception { awaitIndefinitely(lb.closeAsync()); - // awaitIndefinitely(lb.onClose()); - // - // TestSubscription subscription = new TestSubscription(); - // serviceDiscoveryPublisher.onSubscribe(subscription); - // assertTrue(subscription.isCancelled()); - // - // connectionsCreated.forEach(cnx -> { - // try { - // awaitIndefinitely(cnx.onClose()); - // } catch (final Exception e) { - // throw new RuntimeException("Connection: " + cnx + " didn't close properly", e); - // } - // }); + awaitIndefinitely(lb.onClose()); + + TestSubscription subscription = new TestSubscription(); + serviceDiscoveryPublisher.onSubscribe(subscription); + assertTrue(subscription.isCancelled()); + + connectionsCreated.forEach(cnx -> { + try { + awaitIndefinitely(cnx.onClose()); + } catch (final Exception e) { + throw new RuntimeException("Connection: " + cnx + " didn't close properly", e); + } + }); } @Test @@ -397,29 +397,70 @@ public void newConnectionIsClosedWhenSelectorRejects() throws Exception { awaitIndefinitely(connection.onClose()); } + @Test + public void unhealthyHostTakenOutOfPoolForSelection() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = + new UnhealthyHostConnectionFactory("address-1", timeAdvancementsTillHealthy, properConnection); + + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + lb = defaultLb(connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + sendServiceDiscoveryEvents(upEvent("address-2")); + + for (int i = 0; i < RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD * 2; ++i) { + try { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } catch (Exception e) { + assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + } + } + + for (int i = 0; i < 10; ++i) { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(1)); + + for (int i = 0; i < timeAdvancementsTillHealthy; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + for (int i = 0; i < 10; ++i) { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + } + @Test public void hostUnhealthyIsHealthChecked() throws Exception { serviceDiscoveryPublisher.onComplete(); final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = - new UnhealthyHostConnectionFactory(3 ,properConnection); + new UnhealthyHostConnectionFactory("address-1", timeAdvancementsTillHealthy, properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); lb = defaultLb(connectionFactory); sendServiceDiscoveryEvents(upEvent("address-1")); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); } - unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); - testExecutor.advanceTimeBy(1, SECONDS); - unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); - testExecutor.advanceTimeBy(1, SECONDS); - unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); - testExecutor.advanceTimeBy(1, SECONDS); + for (int i = 0; i < timeAdvancementsTillHealthy; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + } final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); @@ -430,18 +471,15 @@ public void hostUnhealthyIsHealthChecked() throws Exception { assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(expectedConnectionAttempts)); } - // Concurrency test, run multiple times. - // Ignored for now, the test should not monitor failed connection attempts, as the synchronisation point is after - // the failure, not before attempting, so a race is ok. Running multiple checks is not ok. - // We should validate the the host remains unhealthy while the health check is active, only that. - @Ignore + // Concurrency test, run multiple times (at least 1000). @Test public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { serviceDiscoveryPublisher.onComplete(); final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = - new UnhealthyHostConnectionFactory(3, properConnection); + new UnhealthyHostConnectionFactory("address-1", timeAdvancementsTillHealthy, properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); lb = defaultLb(connectionFactory); @@ -460,37 +498,35 @@ public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { // being thrown from selection AFTER a health check was scheduled by any thread. try { awaitIndefinitely(lb.selectConnection(any()).retry((i, t) -> - t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0 + // DeliberateException comes from connection opening, check for that first + // Next, NoAvailableHostException is thrown when the host is unhealthy, + // but we still wait until the health check is scheduled and only then stop retrying. + t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0 )); } catch (Exception e) { assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); } // At this point, either the above selection caused the host to be marked as UNHEALTHY, - // or any background thread. We can assume from now on that only the health check can attempt establishing - // new connections. Therefore we can count those attempts. If our assumption doesn't hold, the UNHEALTHY - // state is either not properly set or multiple health checks run at the same time. - int requestsBefore = unhealthyHostConnectionFactory.requests.get(); - unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); - testExecutor.advanceTimeBy(1, SECONDS); - - assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(requestsBefore + 1)); + // or any background thread. We also know that a health check is pending to be executed. + // Now we can validate if there is just one health check happening and confirm that by asserting the host + // is not selected. If our assumption doesn't hold, it means more than one health check was scheduled. + for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); - unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); - testExecutor.advanceTimeBy(1, SECONDS); - assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(requestsBefore + 2)); + // Assert still unhealthy + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + } + // Shutdown the concurrent validation of unhealthiness. executor.shutdownNow(); executor.awaitTermination(10, SECONDS); - unhealthyHostConnectionFactory.momentInTime.incrementAndGet(); - testExecutor.advanceTimeBy(1, SECONDS); - assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(requestsBefore + 3)); + unhealthyHostConnectionFactory.advanceTime(testExecutor); - // After 3 increments a proper established connection is returned and the host should be eligible for selection. final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); - assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(requestsBefore + 3)); } @SuppressWarnings("unchecked") @@ -627,26 +663,32 @@ boolean isClosed() { } protected static class UnhealthyHostConnectionFactory { - final AtomicInteger momentInTime = new AtomicInteger(); + private final String failingHost; + private final AtomicInteger momentInTime = new AtomicInteger(); final AtomicInteger requests = new AtomicInteger(); final Single properConnection; final List> connections; - Function> factory - = new Function>() { + Function> factory = + new Function>() { @Override public Single apply(final String s) { - requests.incrementAndGet(); - if (momentInTime.get() >= connections.size()) { - return properConnection; + if (s.equals(failingHost)) { + requests.incrementAndGet(); + if (momentInTime.get() >= connections.size()) { + return properConnection; + } + return connections.get(momentInTime.get()); } - return connections.get(momentInTime.get()); + return properConnection; } }; - UnhealthyHostConnectionFactory(int timeAdvancementsTillSuccess, Single properConnection) { - this.connections = IntStream.range(0, timeAdvancementsTillSuccess) + UnhealthyHostConnectionFactory(final String failingHost, int timeAdvancementsTillHealthy, + Single properConnection) { + this.failingHost = failingHost; + this.connections = IntStream.range(0, timeAdvancementsTillHealthy) .>mapToObj(__ -> failed(DELIBERATE_EXCEPTION)) .collect(Collectors.toList()); this.properConnection = properConnection; @@ -655,5 +697,10 @@ public Single apply(final String s) { DelegatingConnectionFactory createFactory() { return new DelegatingConnectionFactory(this.factory); } + + void advanceTime(TestExecutor executor) { + momentInTime.incrementAndGet(); + executor.advanceTimeBy(1, SECONDS); + } } } From 2b55a0bc7d16bbb563a5dd1698df64d337218ff0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Fri, 30 Jul 2021 15:22:18 +0200 Subject: [PATCH 05/12] Allow disabling health checking --- .../loadbalancer/RoundRobinLoadBalancer.java | 13 +++-- .../RoundRobinLoadBalancerFactory.java | 52 +++++++++++++------ .../RoundRobinLoadBalancerTest.java | 38 ++++++++++++++ 3 files changed, 85 insertions(+), 18 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index babf8dd017..0891717000 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -53,6 +53,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; +import javax.annotation.Nullable; import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT; import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_READY_EVENT; @@ -154,11 +155,13 @@ public RoundRobinLoadBalancer(final Publisher> eventPublisher, final ConnectionFactory connectionFactory, final boolean eagerConnectionShutdown, - final HealthCheckConfig healthCheckConfig) { + @Nullable final HealthCheckConfig healthCheckConfig) { Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); this.eventStream = fromSource(eventStreamProcessor); this.connectionFactory = requireNonNull(connectionFactory); @@ -548,14 +551,15 @@ private static final class ConnState { } final Addr address; + @Nullable private final HealthCheckConfig healthCheckConfig; private final ListenableAsyncCloseable closeable; private volatile ConnState connState = EMPTY_CONN_STATE; private volatile int connectionFailures; - Host(Addr address, HealthCheckConfig healthCheckConfig) { + Host(Addr address, @Nullable HealthCheckConfig healthCheckConfig) { this.address = requireNonNull(address); - this.healthCheckConfig = requireNonNull(healthCheckConfig); + this.healthCheckConfig = healthCheckConfig; this.closeable = toAsyncCloseable(graceful -> graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); } @@ -717,6 +721,9 @@ private Completable doClose(final Function closeFunction } void markUnhealthy(ConnectionFactory connectionFactory, Predicate selector) { + if (healthCheckConfig == null) { + return; + } connectionFailuresUpdater.getAndUpdate(this, val -> val == Integer.MAX_VALUE ? val : val + 1); for (;;) { ConnState previous = connStateUpdater.get(this); diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 7fd84f020e..58c63fd7ef 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -43,6 +43,10 @@ * to addresses marked as {@link ServiceDiscovererEvent#isAvailable() unavailable} are used for requests, * but no new connections are created for them. In case the address' connections are busy, another host is tried. * If all hosts are busy, selection fails with a {@link io.servicetalk.client.api.ConnectionRejectedException}. + *
  • For hosts to which consecutive connection attempts fail, a background health checking task is created and + * the host is not considered for opening new connections until the background check succeeds to create a connection. + * Upon such event, the connection can immediately be reused and future attempts will again consider this host. + * This behaviour can be disabled using a negative argument for {@link Builder#healthCheckIntervalMillis(int)}
  • * * * @param The resolved address type. @@ -56,7 +60,7 @@ public final class RoundRobinLoadBalancerFactory backgroundExecu } /** - * Configure an interval for health checking a host that failed to open connections. - * @param intervalMillis interval at which a background health check will be scheduled. + * Configure an interval for health checking a host that failed to open connections. To disable the health + * checking logic, a negative value can be provided. + * @param intervalMillis interval (in milliseconds) at which a background health check will be scheduled. + * When the value is negative, the health checking mechanism is disabled. * @return {@code this}. */ public RoundRobinLoadBalancerFactory.Builder healthCheckIntervalMillis(int intervalMillis) { @@ -152,19 +158,35 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckFail * @return a new instance of {@link RoundRobinLoadBalancerFactory} with settings from this builder. */ public RoundRobinLoadBalancerFactory build() { - final Executor backgroundExecutor = this.backgroundExecutor != null ? - this.backgroundExecutor : RoundRobinLoadBalancer.SHARED_EXECUTOR; - final Duration healthCheckInterval = healthCheckIntervalMillis > 0 ? - Duration.ofMillis(healthCheckIntervalMillis) - : RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_INTERVAL; - final int healthCheckFailedConnectionsThreshold = this.healthCheckFailedConnectionsThreshold > 0 ? - this.healthCheckFailedConnectionsThreshold - : RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; - final RoundRobinLoadBalancer.HealthCheckConfig healthCheckConfig = + boolean customHealthCheck = false; + Executor backgroundExecutor = RoundRobinLoadBalancer.SHARED_EXECUTOR; + Duration healthCheckInterval = RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_INTERVAL; + int healthCheckFailedConnectionsThreshold = + RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; + + if (healthCheckIntervalMillis < 0) { + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, null); + } + + if (this.backgroundExecutor != null) { + backgroundExecutor = this.backgroundExecutor; + customHealthCheck = true; + } + if (healthCheckIntervalMillis > 0) { + healthCheckInterval = Duration.ofMillis(healthCheckIntervalMillis); + customHealthCheck = true; + } + if (this.healthCheckFailedConnectionsThreshold > 0) { + healthCheckFailedConnectionsThreshold = this.healthCheckFailedConnectionsThreshold; + customHealthCheck = true; + } + + RoundRobinLoadBalancer.HealthCheckConfig healthCheckConfig = customHealthCheck ? new RoundRobinLoadBalancer.HealthCheckConfig( - backgroundExecutor, healthCheckInterval, healthCheckFailedConnectionsThreshold); - return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, - healthCheckConfig); + backgroundExecutor, healthCheckInterval, healthCheckFailedConnectionsThreshold) + : RoundRobinLoadBalancer.SHARED_HEALTH_CHECK_CONFIG; + + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, healthCheckConfig); } } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index 80e9e358e4..2b3507a512 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -440,6 +440,44 @@ public void unhealthyHostTakenOutOfPoolForSelection() throws Exception { } } + @Test + public void disabledHealthCheckDoesntRun() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = + new UnhealthyHostConnectionFactory("address-1", timeAdvancementsTillHealthy, properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = (RoundRobinLoadBalancer) + new RoundRobinLoadBalancerFactory.Builder() + .healthCheckIntervalMillis(-1) + .build() + .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + + for (int i = 0; i < RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + } + + unhealthyHostConnectionFactory.advanceTime(testExecutor); + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + @Test public void hostUnhealthyIsHealthChecked() throws Exception { serviceDiscoveryPublisher.onComplete(); From 9dcabb5f8f4c71218b390fda5fac30de034a5f01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Fri, 30 Jul 2021 16:27:17 +0200 Subject: [PATCH 06/12] Added missing nullable field annotation --- .../servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 58c63fd7ef..fdae863dd4 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -57,6 +57,7 @@ public final class RoundRobinLoadBalancerFactory Date: Tue, 10 Aug 2021 14:50:29 +0200 Subject: [PATCH 07/12] Review feedback addressed --- .../loadbalancer/RoundRobinLoadBalancer.java | 211 ++++++++---------- .../RoundRobinLoadBalancerFactory.java | 93 +++++--- .../RoundRobinLoadBalancerTest.java | 36 +-- 3 files changed, 177 insertions(+), 163 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 0891717000..f90eeaf278 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -22,16 +22,13 @@ import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscovererEvent; -import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.Executor; import io.servicetalk.concurrent.PublisherSource.Processor; import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.AsyncCloseable; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; -import io.servicetalk.concurrent.api.DefaultThreadFactory; -import io.servicetalk.concurrent.api.Executor; -import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; @@ -87,14 +84,6 @@ public final class RoundRobinLoadBalancer implements LoadBalancer { - static final String BACKGROUND_PROCESSING_EXECUTOR_NAME = "round-robin-load-balancer-executor"; - static final Executor SHARED_EXECUTOR = Executors.newFixedSizeExecutor(1, - new DefaultThreadFactory(BACKGROUND_PROCESSING_EXECUTOR_NAME)); - static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1); - static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy - static final HealthCheckConfig SHARED_HEALTH_CHECK_CONFIG = new HealthCheckConfig(SHARED_EXECUTOR, - DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD); - private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class); private static final List CLOSED_LIST = new ArrayList<>(0); private static final Object[] EMPTY_ARRAY = new Object[0]; @@ -143,7 +132,8 @@ public final class RoundRobinLoadBalancer> eventPublisher, final ConnectionFactory connectionFactory) { - this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, SHARED_HEALTH_CHECK_CONFIG); + this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, + io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.SharedHealthCheckConfig.getInstance()); } /** @@ -341,6 +331,10 @@ RoundRobinLoadBalancerFactory newRoundRobinFactory() { return new RoundRobinLoadBalancerFactory<>(); } + private static Single failedLBClosed() { + return failed(new IllegalStateException("LoadBalancer has closed")); + } + @Override public Single selectConnection(Predicate selector) { return defer(() -> selectConnection0(selector).subscribeShareContext()); @@ -398,11 +392,15 @@ private Single selectConnection0(Predicate selector) { } // No connection was selected: create a new one. final Host host = pickedHost; + // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. - return connectionFactory.newConnection(host.address, null) + Single establishConnection = connectionFactory.newConnection(host.address, null); + if (host.healthCheckConfig != null) { // Schedule health check before returning - .beforeOnError(t -> host.markUnhealthy(connectionFactory, selector)) + establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(connectionFactory)); + } + return establishConnection .flatMap(newCnx -> { // Invoke the selector before adding the connection to the pool, otherwise, connection can be // used concurrently and hence a new connection can be rejected by the selector. @@ -436,6 +434,11 @@ public Completable closeAsyncGracefully() { return asyncCloseable.closeAsyncGracefully(); } + // Visible for testing + List>> usedAddresses() { + return usedHosts.stream().map(Host::asEntry).collect(toList()); + } + /** * Please use {@link io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory} instead of this factory. * @@ -452,16 +455,11 @@ public static final class RoundRobinLoadBalancerFactory LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, - EAGER_CONNECTION_SHUTDOWN_ENABLED, SHARED_HEALTH_CHECK_CONFIG); + return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, + io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.SharedHealthCheckConfig.getInstance()); } } - // Visible for testing - List>> usedAddresses() { - return usedHosts.stream().map(Host::asEntry).collect(toList()); - } - static final class HealthCheckConfig { private final Executor executor; private final Duration healthCheckInterval; @@ -490,66 +488,6 @@ private static final class Host implemen private static final AtomicIntegerFieldUpdater connectionFailuresUpdater = AtomicIntegerFieldUpdater.newUpdater(Host.class, "connectionFailures"); - private static final class HealthCheck - implements Runnable, Cancellable { - private final Predicate selector; - private final ConnectionFactory connectionFactory; - private final Host host; - private volatile boolean cancelled; - - private HealthCheck(final Predicate selector, - final ConnectionFactory connectionFactory, - final Host host) { - this.selector = selector; - this.connectionFactory = connectionFactory; - this.host = host; - } - - public void schedule() { - host.healthCheckConfig.executor.schedule(this, host.healthCheckConfig.healthCheckInterval); - } - - @Override - public void run() { - if (cancelled) { - return; - } - connectionFactory.newConnection(host.address, null) - .flatMapCompletable(newCnx -> { - if (!selector.test(newCnx)) { - return newCnx.closeAsync().concat(Completable.failed(new RuntimeException())); - } - if (host.addConnection(newCnx)) { - host.markHealthy(); - } else { - return newCnx.closeAsync().concat(Completable.failed(new RuntimeException())); - } - return completed(); - }) - .afterOnError(e -> { - if (!cancelled) { - schedule(); - } - }) - .subscribe(); - } - - @Override - public void cancel() { - this.cancelled = true; - } - } - - private static final class ConnState { - final Object[] connections; - final Object state; - - ConnState(final Object[] connections, final Object state) { - this.connections = connections; - this.state = state; - } - } - final Addr address; @Nullable private final HealthCheckConfig healthCheckConfig; @@ -571,7 +509,7 @@ boolean markActiveIfNotClosed() { } // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. - // UNHEALTHY state is treated similarly to ACTIVE + // UNHEALTHY state cannot transition to ACTIVE without passing the health check. return oldConnState; }).state; return oldState != STATE_CLOSED; @@ -610,6 +548,13 @@ void markExpired() { } void markHealthy() { + // Marking healthy is generally called from a successful health check, after a connection was added. + // However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed + // to open connections and entered the UNHEALTHY state before the original thread continues execution here. + // In such case, the flipped state is not the same as the one that just succeeded to open a connection. + // In an unlikely scenario that the following connection attempts fail indefinitely, a health check task + // would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new + // health check. Object oldState = connStateUpdater.getAndUpdate(this, previous -> { if (previous.state.getClass().equals(HealthCheck.class)) { return new ConnState(previous.connections, STATE_ACTIVE); @@ -619,6 +564,28 @@ void markHealthy() { cancelIfHealthCheck(oldState); } + void markUnhealthy(ConnectionFactory connectionFactory) { + if (healthCheckConfig == null) { + return; + } + connectionFailuresUpdater.getAndUpdate(this, val -> val == Integer.MAX_VALUE ? val : val + 1); + for (;;) { + ConnState previous = connStateUpdater.get(this); + if (previous.state != STATE_ACTIVE + || previous.connections.length > 0 + || connectionFailuresUpdater.get(this) < this.healthCheckConfig.failedThreshold) { + break; + } + + final HealthCheck healthCheck = new HealthCheck<>(connectionFactory, this); + final ConnState nextState = new ConnState(previous.connections, healthCheck); + if (connStateUpdater.compareAndSet(this, previous, nextState)) { + healthCheck.schedule(); + break; + } + } + } + boolean isActiveAndHealthy() { return connState.state == STATE_ACTIVE; } @@ -686,8 +653,8 @@ currentConnState, new ConnState(newList, currentConnState.state))) { }).subscribe(); return true; } - // Used for testing only + // Used for testing only @SuppressWarnings("unchecked") Entry> asEntry() { return new SimpleImmutableEntry<>(address, @@ -720,32 +687,8 @@ private Completable doClose(final Function closeFunction }); } - void markUnhealthy(ConnectionFactory connectionFactory, Predicate selector) { - if (healthCheckConfig == null) { - return; - } - connectionFailuresUpdater.getAndUpdate(this, val -> val == Integer.MAX_VALUE ? val : val + 1); - for (;;) { - ConnState previous = connStateUpdater.get(this); - if (previous.state != STATE_ACTIVE - || previous.connections.length > 0 - || connectionFailuresUpdater.get(this) < this.healthCheckConfig.failedThreshold) { - break; - } - - final ConnState nextState = new ConnState(previous.connections, - new HealthCheck(selector, connectionFactory, this)); - if (connStateUpdater.compareAndSet(this, previous, nextState)) { - @SuppressWarnings("unchecked") - final HealthCheck state = (HealthCheck) nextState.state; - state.schedule(); - break; - } - } - } - private void cancelIfHealthCheck(Object o) { - if (o.getClass().equals(HealthCheck.class)) { + if (HealthCheck.class.equals(o.getClass())) { @SuppressWarnings("unchecked") HealthCheck healthCheck = (HealthCheck) o; healthCheck.cancel(); @@ -773,6 +716,52 @@ private String describeState(Object state) { return "unhealthy"; } } + + private static final class HealthCheck + extends SequentialCancellable implements Runnable { + private static final Exception RESCHEDULE_SIGNAL = ThrowableUtils.unknownStackTrace( + new ConnectionRejectedException("Connection rejected during health check."), + HealthCheck.class, "run"); + private final ConnectionFactory connectionFactory; + private final Host host; + + private HealthCheck(final ConnectionFactory connectionFactory, + final Host host) { + this.connectionFactory = connectionFactory; + this.host = host; + } + + public void schedule() { + assert host.healthCheckConfig != null; + nextCancellable(host.healthCheckConfig.executor.schedule( + this, host.healthCheckConfig.healthCheckInterval)); + } + + @Override + public void run() { + connectionFactory.newConnection(host.address, null) + .flatMapCompletable(newCnx -> { + if (host.addConnection(newCnx)) { + host.markHealthy(); + } else { + return newCnx.closeAsync().concat(Completable.failed(RESCHEDULE_SIGNAL)); + } + return completed(); + }) + .afterOnError(e -> schedule()) + .subscribe(); + } + } + + private static final class ConnState { + final Object[] connections; + final Object state; + + ConnState(final Object[] connections, final Object state) { + this.connections = connections; + this.state = state; + } + } } private static final class StacklessNoAvailableHostException extends NoAvailableHostException { @@ -791,8 +780,4 @@ public static StacklessNoAvailableHostException newInstance(String message, Clas return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method); } } - - private static Single failedLBClosed() { - return failed(new IllegalStateException("LoadBalancer has closed")); - } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index fdae863dd4..8f99bdc96e 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -20,13 +20,17 @@ import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.ServiceDiscovererEvent; -import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.Executor; +import io.servicetalk.concurrent.api.DefaultThreadFactory; +import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; import java.time.Duration; import java.util.function.Predicate; import javax.annotation.Nullable; +import static java.util.Objects.requireNonNull; + /** * {@link LoadBalancerFactory} that creates {@link LoadBalancer} instances which use a round robin strategy * for selecting addresses. The created instances have the following behaviour: @@ -46,7 +50,8 @@ *
  • For hosts to which consecutive connection attempts fail, a background health checking task is created and * the host is not considered for opening new connections until the background check succeeds to create a connection. * Upon such event, the connection can immediately be reused and future attempts will again consider this host. - * This behaviour can be disabled using a negative argument for {@link Builder#healthCheckIntervalMillis(int)}
  • + * This behaviour can be disabled using a negative argument for + * {@link Builder#healthCheckFailedConnectionsThreshold(int)} * * * @param The resolved address type. @@ -56,7 +61,11 @@ public final class RoundRobinLoadBalancerFactory { static final boolean EAGER_CONNECTION_SHUTDOWN_ENABLED = true; + static final int DEFAULT_HEALTH_CHECK_INTERVAL_MILLIS = 1000; // 1 second + static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy + private final boolean eagerConnectionShutdown; + @Nullable private final RoundRobinLoadBalancer.HealthCheckConfig healthCheckConfig; @@ -84,7 +93,7 @@ public static final class Builder eagerConnection */ public RoundRobinLoadBalancerFactory.Builder backgroundExecutor( Executor backgroundExecutor) { - this.backgroundExecutor = backgroundExecutor; + this.backgroundExecutor = requireNonNull(backgroundExecutor); return this; } /** - * Configure an interval for health checking a host that failed to open connections. To disable the health - * checking logic, a negative value can be provided. - * @param intervalMillis interval (in milliseconds) at which a background health check will be scheduled. - * When the value is negative, the health checking mechanism is disabled. + * Configure an interval for health checking a host that failed to open connections. + * @param interval interval at which a background health check will be scheduled. * @return {@code this}. */ - public RoundRobinLoadBalancerFactory.Builder healthCheckIntervalMillis(int intervalMillis) { - this.healthCheckIntervalMillis = intervalMillis; + public RoundRobinLoadBalancerFactory.Builder healthCheckInterval(Duration interval) { + this.healthCheckInterval = requireNonNull(interval); + if (interval.isNegative()) { + throw new IllegalArgumentException("Health check interval can't be negative"); + } return this; } @@ -143,8 +153,9 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckInte * to open a connection in more consecutive attempts than the specified value, the host will be marked as * unhealthy and a connection establishment will take place in the background. Until finished, the host will * not take part in load balancing selection. - * @param threshold number of consecutive connection failures to consider a host unhealthy and elligible for - * background health checking. + * Use a negative value of the argument to disable health checking. + * @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for + * background health checking. Use negative value to disable the health checking mechanism. * @return {@code this}. */ public RoundRobinLoadBalancerFactory.Builder healthCheckFailedConnectionsThreshold( @@ -159,35 +170,49 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckFail * @return a new instance of {@link RoundRobinLoadBalancerFactory} with settings from this builder. */ public RoundRobinLoadBalancerFactory build() { - boolean customHealthCheck = false; - Executor backgroundExecutor = RoundRobinLoadBalancer.SHARED_EXECUTOR; - Duration healthCheckInterval = RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_INTERVAL; - int healthCheckFailedConnectionsThreshold = - RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; - - if (healthCheckIntervalMillis < 0) { + if (this.healthCheckFailedConnectionsThreshold < 0) { return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, null); } + assert !healthCheckInterval.isNegative(); - if (this.backgroundExecutor != null) { - backgroundExecutor = this.backgroundExecutor; - customHealthCheck = true; - } - if (healthCheckIntervalMillis > 0) { - healthCheckInterval = Duration.ofMillis(healthCheckIntervalMillis); - customHealthCheck = true; - } - if (this.healthCheckFailedConnectionsThreshold > 0) { - healthCheckFailedConnectionsThreshold = this.healthCheckFailedConnectionsThreshold; - customHealthCheck = true; + if (this.backgroundExecutor == null && this.healthCheckInterval.isZero() + && this.healthCheckFailedConnectionsThreshold == 0) { + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, + SharedHealthCheckConfig.getInstance()); } - RoundRobinLoadBalancer.HealthCheckConfig healthCheckConfig = customHealthCheck ? - new RoundRobinLoadBalancer.HealthCheckConfig( - backgroundExecutor, healthCheckInterval, healthCheckFailedConnectionsThreshold) - : RoundRobinLoadBalancer.SHARED_HEALTH_CHECK_CONFIG; + Executor backgroundExecutor = this.backgroundExecutor == null ? + SharedExecutor.getInstance() : this.backgroundExecutor; + Duration healthCheckInterval = this.healthCheckInterval.isZero() ? + Duration.ofMillis(DEFAULT_HEALTH_CHECK_INTERVAL_MILLIS) : this.healthCheckInterval; + int healthCheckFailedConnectionsThreshold = this.healthCheckFailedConnectionsThreshold == 0 ? + DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD : this.healthCheckFailedConnectionsThreshold; + + RoundRobinLoadBalancer.HealthCheckConfig healthCheckConfig = new RoundRobinLoadBalancer.HealthCheckConfig( + backgroundExecutor, healthCheckInterval, healthCheckFailedConnectionsThreshold); return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, healthCheckConfig); } } + + private static final class SharedExecutor { + private static final String BACKGROUND_PROCESSING_EXECUTOR_NAME = "round-robin-load-balancer-executor"; + private static final Executor INSTANCE = Executors.newFixedSizeExecutor(1, + new DefaultThreadFactory(BACKGROUND_PROCESSING_EXECUTOR_NAME)); + + static Executor getInstance() { + return INSTANCE; + } + } + + static final class SharedHealthCheckConfig { + private static final RoundRobinLoadBalancer.HealthCheckConfig INSTANCE = + new RoundRobinLoadBalancer.HealthCheckConfig(SharedExecutor.getInstance(), + Duration.ofMillis(DEFAULT_HEALTH_CHECK_INTERVAL_MILLIS), + DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD); + + static RoundRobinLoadBalancer.HealthCheckConfig getInstance() { + return INSTANCE; + } + } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index 2b3507a512..7f0aebaf2c 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -76,6 +76,8 @@ import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static io.servicetalk.concurrent.internal.TestTimeoutConstants.DEFAULT_TIMEOUT_SECONDS; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_INTERVAL_MILLIS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; @@ -403,8 +405,8 @@ public void unhealthyHostTakenOutOfPoolForSelection() throws Exception { final Single properConnection = newRealizedConnectionSingle("address-1"); final int timeAdvancementsTillHealthy = 3; - final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = - new UnhealthyHostConnectionFactory("address-1", timeAdvancementsTillHealthy, properConnection); + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); lb = defaultLb(connectionFactory); @@ -412,7 +414,7 @@ public void unhealthyHostTakenOutOfPoolForSelection() throws Exception { sendServiceDiscoveryEvents(upEvent("address-1")); sendServiceDiscoveryEvents(upEvent("address-2")); - for (int i = 0; i < RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD * 2; ++i) { + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD * 2; ++i) { try { final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); @@ -446,19 +448,19 @@ public void disabledHealthCheckDoesntRun() throws Exception { final Single properConnection = newRealizedConnectionSingle("address-1"); final int timeAdvancementsTillHealthy = 3; - final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = - new UnhealthyHostConnectionFactory("address-1", timeAdvancementsTillHealthy, properConnection); + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); lb = (RoundRobinLoadBalancer) new RoundRobinLoadBalancerFactory.Builder() - .healthCheckIntervalMillis(-1) + .healthCheckFailedConnectionsThreshold(-1) .build() .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); sendServiceDiscoveryEvents(upEvent("address-1")); - for (int i = 0; i < RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); } @@ -483,15 +485,15 @@ public void hostUnhealthyIsHealthChecked() throws Exception { serviceDiscoveryPublisher.onComplete(); final Single properConnection = newRealizedConnectionSingle("address-1"); final int timeAdvancementsTillHealthy = 3; - final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = - new UnhealthyHostConnectionFactory("address-1", timeAdvancementsTillHealthy, properConnection); + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); lb = defaultLb(connectionFactory); sendServiceDiscoveryEvents(upEvent("address-1")); - for (int i = 0; i < RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); } @@ -516,8 +518,8 @@ public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { final Single properConnection = newRealizedConnectionSingle("address-1"); final int timeAdvancementsTillHealthy = 3; - final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = - new UnhealthyHostConnectionFactory("address-1", timeAdvancementsTillHealthy, properConnection); + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); lb = defaultLb(connectionFactory); @@ -594,10 +596,12 @@ protected RoundRobinLoadBalancer newTestLoad protected RoundRobinLoadBalancer newTestLoadBalancer( final TestPublisher> serviceDiscoveryPublisher, final DelegatingConnectionFactory connectionFactory, final boolean eagerConnectionShutdown) { - return new RoundRobinLoadBalancer<>(serviceDiscoveryPublisher, connectionFactory, - eagerConnectionShutdown, new RoundRobinLoadBalancer.HealthCheckConfig(testExecutor, - RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_INTERVAL, - RoundRobinLoadBalancer.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD)); + return (RoundRobinLoadBalancer) + new RoundRobinLoadBalancerFactory.Builder() + .eagerConnectionShutdown(eagerConnectionShutdown) + .backgroundExecutor(testExecutor) + .build() + .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); } @SafeVarargs From ee0aaf0f1d0193f218058a9bee7182bfab595a5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 11 Aug 2021 16:17:29 +0200 Subject: [PATCH 08/12] Counting failed connections atomically within the state --- .../loadbalancer/RoundRobinLoadBalancer.java | 122 ++++++++++++------ .../RoundRobinLoadBalancerFactory.java | 2 +- .../RoundRobinLoadBalancerTest.java | 12 +- 3 files changed, 91 insertions(+), 45 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index f90eeaf278..8e6acdef68 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -22,16 +22,18 @@ import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscovererEvent; -import io.servicetalk.concurrent.Executor; import io.servicetalk.concurrent.PublisherSource.Processor; import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.AsyncCloseable; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; +import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.internal.DelayedCancellable; +import io.servicetalk.concurrent.internal.FlowControlUtils; import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.concurrent.internal.ThrowableUtils; @@ -59,6 +61,7 @@ import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; @@ -473,27 +476,22 @@ static final class HealthCheckConfig { } private static final class Host implements ListenableAsyncCloseable { - private static final Object STATE_ACTIVE = new Object(); private static final Object STATE_EXPIRED = new Object(); private static final Object STATE_CLOSED = new Object(); + private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState(); - private static final ConnState EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE); + private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES); private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_CLOSED); @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater connStateUpdater = AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState"); - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater connectionFailuresUpdater = - AtomicIntegerFieldUpdater.newUpdater(Host.class, "connectionFailures"); - final Addr address; @Nullable private final HealthCheckConfig healthCheckConfig; private final ListenableAsyncCloseable closeable; - private volatile ConnState connState = EMPTY_CONN_STATE; - private volatile int connectionFailures; + private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; Host(Addr address, @Nullable HealthCheckConfig healthCheckConfig) { this.address = requireNonNull(address); @@ -505,7 +503,7 @@ private static final class Host implemen boolean markActiveIfNotClosed() { final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { if (oldConnState.state == STATE_EXPIRED) { - return new ConnState(oldConnState.connections, STATE_ACTIVE); + return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES); } // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. @@ -557,7 +555,7 @@ void markHealthy() { // health check. Object oldState = connStateUpdater.getAndUpdate(this, previous -> { if (previous.state.getClass().equals(HealthCheck.class)) { - return new ConnState(previous.connections, STATE_ACTIVE); + return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES); } return previous; }).state; @@ -565,21 +563,32 @@ void markHealthy() { } void markUnhealthy(ConnectionFactory connectionFactory) { - if (healthCheckConfig == null) { - return; - } - connectionFailuresUpdater.getAndUpdate(this, val -> val == Integer.MAX_VALUE ? val : val + 1); + assert healthCheckConfig != null; for (;;) { ConnState previous = connStateUpdater.get(this); - if (previous.state != STATE_ACTIVE - || previous.connections.length > 0 - || connectionFailuresUpdater.get(this) < this.healthCheckConfig.failedThreshold) { + + if (!ActiveState.class.equals(previous.state.getClass()) || previous.connections.length > 0) { break; } + ActiveState previousState = (ActiveState) previous.state; + if (previousState.failedConnections + 1 < this.healthCheckConfig.failedThreshold) { + final ActiveState nextState = previousState.forNextFailedConnection(); + if (connStateUpdater.compareAndSet(this, previous, + new ConnState(previous.connections, nextState))) { + LOGGER.debug("Active host for address {} failed to open {} connections" + + " ({} consecutive failures trigger health check).", + address, nextState.failedConnections, healthCheckConfig.failedThreshold); + break; + } + // another thread won the race, try again + continue; + } + final HealthCheck healthCheck = new HealthCheck<>(connectionFactory, this); final ConnState nextState = new ConnState(previous.connections, healthCheck); if (connStateUpdater.compareAndSet(this, previous, nextState)) { + LOGGER.debug("Triggering health check for address {}", address); healthCheck.schedule(); break; } @@ -587,26 +596,29 @@ void markUnhealthy(ConnectionFactory connectionFactory) { } boolean isActiveAndHealthy() { - return connState.state == STATE_ACTIVE; + return ActiveState.class.equals(connState.state.getClass()); } boolean addConnection(C connection) { for (;;) { - final ConnState currentConnState = this.connState; - if (currentConnState == CLOSED_CONN_STATE) { + final ConnState previous = connStateUpdater.get(this); + if (previous == CLOSED_CONN_STATE) { return false; } - final Object[] existing = currentConnState.connections; + + final Object[] existing = previous.connections; Object[] newList = Arrays.copyOf(existing, existing.length + 1); newList[existing.length] = connection; + + Object newState = ActiveState.class.equals(previous.state.getClass()) ? + STATE_ACTIVE_NO_FAILURES : previous.state; + if (connStateUpdater.compareAndSet(this, - currentConnState, new ConnState(newList, currentConnState.state))) { + previous, new ConnState(newList, newState))) { break; } } - connectionFailuresUpdater.set(this, 0); - // Instrument the new connection so we prune it on close connection.onClose().beforeFinally(() -> { for (;;) { @@ -624,8 +636,8 @@ currentConnState, new ConnState(newList, currentConnState.state))) { if (i == connections.length) { break; } else if (connections.length == 1) { - if (currentConnState.state == STATE_ACTIVE) { - if (connStateUpdater.compareAndSet(this, currentConnState, EMPTY_CONN_STATE)) { + if (ActiveState.class.equals(currentConnState.state.getClass())) { + if (connStateUpdater.compareAndSet(this, currentConnState, ACTIVE_EMPTY_CONN_STATE)) { break; } } else if (currentConnState.state == STATE_EXPIRED @@ -691,6 +703,7 @@ private void cancelIfHealthCheck(Object o) { if (HealthCheck.class.equals(o.getClass())) { @SuppressWarnings("unchecked") HealthCheck healthCheck = (HealthCheck) o; + LOGGER.debug("Health check for address {} cancelled.", healthCheck.host.address); healthCheck.cancel(); } } @@ -706,22 +719,40 @@ public String toString() { } private String describeState(Object state) { - if (state == STATE_ACTIVE) { + if (ActiveState.class.equals(state.getClass())) { return "active"; + } else if (HealthCheck.class.equals(state.getClass())) { + return "unhealthy"; } else if (state == STATE_EXPIRED) { return "expired"; } else if (state == STATE_CLOSED) { return "closed"; } else { - return "unhealthy"; + return "unknown"; + } + } + + private static final class ActiveState { + private final int failedConnections; + + ActiveState() { + this(0); + } + + private ActiveState(int failedConnections) { + this.failedConnections = failedConnections; + } + + ActiveState forNextFailedConnection() { + return new ActiveState(FlowControlUtils.addWithOverflowProtection(this.failedConnections, 1)); } } private static final class HealthCheck - extends SequentialCancellable implements Runnable { + extends DelayedCancellable { private static final Exception RESCHEDULE_SIGNAL = ThrowableUtils.unknownStackTrace( new ConnectionRejectedException("Connection rejected during health check."), - HealthCheck.class, "run"); + HealthCheck.class, "run()"); private final ConnectionFactory connectionFactory; private final Host host; @@ -733,23 +764,36 @@ private HealthCheck(final ConnectionFactory connec public void schedule() { assert host.healthCheckConfig != null; - nextCancellable(host.healthCheckConfig.executor.schedule( - this, host.healthCheckConfig.healthCheckInterval)); + delayedCancellable( + retryWithConstantBackoffFullJitter(cause -> true, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.executor) + .apply(0, RESCHEDULE_SIGNAL) + .concat(reconnect() + .retryWhen(retryWithConstantBackoffFullJitter( + cause -> cause == RESCHEDULE_SIGNAL, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.executor))) + .subscribe()); } - @Override - public void run() { - connectionFactory.newConnection(host.address, null) + public Completable reconnect() { + return defer(() -> connectionFactory.newConnection(host.address, null)) + .onErrorMap(cause -> { + LOGGER.debug("Health check failed for address {}. Cause: {}", host.address, cause); + return RESCHEDULE_SIGNAL; + }) .flatMapCompletable(newCnx -> { if (host.addConnection(newCnx)) { + LOGGER.debug("Health check passed for address {}.", host.address); host.markHealthy(); } else { + LOGGER.debug("Health check failed for address {}. Host rejected connection.", + host.address); return newCnx.closeAsync().concat(Completable.failed(RESCHEDULE_SIGNAL)); } return completed(); - }) - .afterOnError(e -> schedule()) - .subscribe(); + }); } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 8f99bdc96e..7131da37c5 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -20,8 +20,8 @@ import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.ServiceDiscovererEvent; -import io.servicetalk.concurrent.Executor; import io.servicetalk.concurrent.api.DefaultThreadFactory; +import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index 7f0aebaf2c..cb102a13eb 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -46,6 +46,7 @@ import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; +import java.time.Duration; import java.util.AbstractMap; import java.util.Arrays; import java.util.List; @@ -71,13 +72,13 @@ import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable; import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; +import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static io.servicetalk.concurrent.internal.TestTimeoutConstants.DEFAULT_TIMEOUT_SECONDS; import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; -import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_INTERVAL_MILLIS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; @@ -148,8 +149,8 @@ protected RoundRobinLoadBalancer defaultLb( @Before public void initialize() { - lb = defaultLb(); testExecutor = executor.executor(); + lb = defaultLb(); connectionsCreated.clear(); connectionRealizers.clear(); } @@ -537,12 +538,13 @@ public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { // From test main thread, wait until the host becomes UNHEALTHY, which is apparent from NoHostAvailableException // being thrown from selection AFTER a health check was scheduled by any thread. try { - awaitIndefinitely(lb.selectConnection(any()).retry((i, t) -> + awaitIndefinitely(lb.selectConnection(any()).retryWhen(retryWithConstantBackoffFullJitter((t) -> // DeliberateException comes from connection opening, check for that first // Next, NoAvailableHostException is thrown when the host is unhealthy, // but we still wait until the health check is scheduled and only then stop retrying. - t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0 - )); + t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0, + // try to prevent stack overflow + Duration.ofMillis(30), io.servicetalk.concurrent.api.Executors.newFixedSizeExecutor(1)))); } catch (Exception e) { assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); } From 97fad11305c9f78110f256f5b33635b120776713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Fri, 13 Aug 2021 16:26:26 +0200 Subject: [PATCH 09/12] Review feedback --- .../loadbalancer/RoundRobinLoadBalancer.java | 72 +++++++++++-------- .../RoundRobinLoadBalancerFactory.java | 34 +++------ 2 files changed, 52 insertions(+), 54 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 8e6acdef68..58af9839f8 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -33,9 +33,9 @@ import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.DelayedCancellable; -import io.servicetalk.concurrent.internal.FlowControlUtils; import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.concurrent.internal.ThrowableUtils; +import io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.SharedExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +67,9 @@ import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_INTERVAL; import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.EAGER_CONNECTION_SHUTDOWN_ENABLED; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -136,7 +139,8 @@ public final class RoundRobinLoadBalancer> eventPublisher, final ConnectionFactory connectionFactory) { this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, - io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.SharedHealthCheckConfig.getInstance()); + new HealthCheckConfig(SharedExecutor.getInstance(), + DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD)); } /** @@ -459,7 +463,8 @@ public LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, - io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.SharedHealthCheckConfig.getInstance()); + new HealthCheckConfig(SharedExecutor.getInstance(), + DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD)); } } @@ -476,12 +481,19 @@ static final class HealthCheckConfig { } private static final class Host implements ListenableAsyncCloseable { - private static final Object STATE_EXPIRED = new Object(); - private static final Object STATE_CLOSED = new Object(); - private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState(); + private enum State { + // The enum is not exhaustive, as other states have dynamic properties. + // For clarity, the other state classes are listed as comments: + // ACTIVE - see ActiveState + // UNHEALTHY - see HealthCheck + EXPIRED, + CLOSED + } + + private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState(); private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES); - private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_CLOSED); + private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater connStateUpdater = @@ -502,7 +514,7 @@ private static final class Host implemen boolean markActiveIfNotClosed() { final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { - if (oldConnState.state == STATE_EXPIRED) { + if (oldConnState.state == State.EXPIRED) { return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES); } // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, @@ -510,7 +522,7 @@ boolean markActiveIfNotClosed() { // UNHEALTHY state cannot transition to ACTIVE without passing the health check. return oldConnState; }).state; - return oldState != STATE_CLOSED; + return oldState != State.CLOSED; } void markClosed() { @@ -528,15 +540,15 @@ void markClosed() { void markExpired() { for (;;) { ConnState oldState = connStateUpdater.get(this); - if (oldState.state == STATE_EXPIRED || oldState.state == STATE_CLOSED) { + if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) { break; } - Object nextState = oldState.connections.length == 0 ? STATE_CLOSED : STATE_EXPIRED; + Object nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED; if (connStateUpdater.compareAndSet(this, oldState, new ConnState(oldState.connections, nextState))) { cancelIfHealthCheck(oldState.state); - if (nextState == STATE_CLOSED) { + if (nextState == State.CLOSED) { // Trigger the callback to remove the host from usedHosts array. this.closeAsync().subscribe(); } @@ -554,7 +566,7 @@ void markHealthy() { // would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new // health check. Object oldState = connStateUpdater.getAndUpdate(this, previous -> { - if (previous.state.getClass().equals(HealthCheck.class)) { + if (HealthCheck.class.equals(previous.state.getClass())) { return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES); } return previous; @@ -588,7 +600,8 @@ void markUnhealthy(ConnectionFactory connectionFactory) { final HealthCheck healthCheck = new HealthCheck<>(connectionFactory, this); final ConnState nextState = new ConnState(previous.connections, healthCheck); if (connStateUpdater.compareAndSet(this, previous, nextState)) { - LOGGER.debug("Triggering health check for address {}", address); + LOGGER.debug("Triggering health check for address {} after {} failed attempts" + + " to open a new connection", address, previousState.failedConnections); healthCheck.schedule(); break; } @@ -637,10 +650,11 @@ previous, new ConnState(newList, newState))) { break; } else if (connections.length == 1) { if (ActiveState.class.equals(currentConnState.state.getClass())) { - if (connStateUpdater.compareAndSet(this, currentConnState, ACTIVE_EMPTY_CONN_STATE)) { + if (connStateUpdater.compareAndSet(this, currentConnState, + new ConnState(EMPTY_ARRAY, currentConnState.state))) { break; } - } else if (currentConnState.state == STATE_EXPIRED + } else if (currentConnState.state == State.EXPIRED // We're closing the last connection, close the Host. // Closing the host will trigger the Host's onClose method, which will remove the host // from used hosts list. If a race condition appears and a new connection was added @@ -718,18 +732,11 @@ public String toString() { '}'; } - private String describeState(Object state) { - if (ActiveState.class.equals(state.getClass())) { - return "active"; - } else if (HealthCheck.class.equals(state.getClass())) { + private static String describeState(Object state) { + if (HealthCheck.class.equals(state.getClass())) { return "unhealthy"; - } else if (state == STATE_EXPIRED) { - return "expired"; - } else if (state == STATE_CLOSED) { - return "closed"; - } else { - return "unknown"; } + return state.toString(); } private static final class ActiveState { @@ -744,7 +751,12 @@ private ActiveState(int failedConnections) { } ActiveState forNextFailedConnection() { - return new ActiveState(FlowControlUtils.addWithOverflowProtection(this.failedConnections, 1)); + return new ActiveState(addWithOverflowProtection(this.failedConnections, 1)); + } + + @Override + public String toString() { + return "active{failedConnections=" + failedConnections + ')'; } } @@ -780,7 +792,7 @@ public void schedule() { public Completable reconnect() { return defer(() -> connectionFactory.newConnection(host.address, null)) .onErrorMap(cause -> { - LOGGER.debug("Health check failed for address {}. Cause: {}", host.address, cause); + LOGGER.debug("Health check failed for address {}.", host.address, cause); return RESCHEDULE_SIGNAL; }) .flatMapCompletable(newCnx -> { @@ -788,9 +800,9 @@ public Completable reconnect() { LOGGER.debug("Health check passed for address {}.", host.address); host.markHealthy(); } else { - LOGGER.debug("Health check failed for address {}. Host rejected connection.", + LOGGER.debug("Health check finished for address {}. Host rejected connection.", host.address); - return newCnx.closeAsync().concat(Completable.failed(RESCHEDULE_SIGNAL)); + return newCnx.closeAsync().concat(completed()); } return completed(); }); diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 7131da37c5..ab5ec18825 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -61,7 +61,7 @@ public final class RoundRobinLoadBalancerFactory { static final boolean EAGER_CONNECTION_SHUTDOWN_ENABLED = true; - static final int DEFAULT_HEALTH_CHECK_INTERVAL_MILLIS = 1000; // 1 second + static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1); static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy private final boolean eagerConnectionShutdown; @@ -121,10 +121,12 @@ public RoundRobinLoadBalancerFactory.Builder eagerConnection } /** - * This {@link LoadBalancer} monitors hosts to which connection establishment has failed + * This {@link LoadBalancer} may monitor hosts to which connection establishment has failed * using health checks that run in the background. The health check tries to establish a new connection * and if it succeeds, the host is returned to the load balancing pool. As long as the connection * establishment fails, the host is not considered for opening new connections for processed requests. + * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable this mechanism and always + * consider all hosts for establishing new connections. * * @param backgroundExecutor {@link Executor} on which to schedule health checking. * @return {@code this}. @@ -137,14 +139,16 @@ public RoundRobinLoadBalancerFactory.Builder backgroundExecu /** * Configure an interval for health checking a host that failed to open connections. + * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism + * and always consider all hosts for establishing new connections. * @param interval interval at which a background health check will be scheduled. * @return {@code this}. */ public RoundRobinLoadBalancerFactory.Builder healthCheckInterval(Duration interval) { - this.healthCheckInterval = requireNonNull(interval); - if (interval.isNegative()) { + if (requireNonNull(interval).isNegative()) { throw new IllegalArgumentException("Health check interval can't be negative"); } + this.healthCheckInterval = interval; return this; } @@ -173,18 +177,11 @@ public RoundRobinLoadBalancerFactory build() { if (this.healthCheckFailedConnectionsThreshold < 0) { return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, null); } - assert !healthCheckInterval.isNegative(); - - if (this.backgroundExecutor == null && this.healthCheckInterval.isZero() - && this.healthCheckFailedConnectionsThreshold == 0) { - return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, - SharedHealthCheckConfig.getInstance()); - } Executor backgroundExecutor = this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor; Duration healthCheckInterval = this.healthCheckInterval.isZero() ? - Duration.ofMillis(DEFAULT_HEALTH_CHECK_INTERVAL_MILLIS) : this.healthCheckInterval; + DEFAULT_HEALTH_CHECK_INTERVAL : this.healthCheckInterval; int healthCheckFailedConnectionsThreshold = this.healthCheckFailedConnectionsThreshold == 0 ? DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD : this.healthCheckFailedConnectionsThreshold; @@ -195,7 +192,7 @@ public RoundRobinLoadBalancerFactory build() { } } - private static final class SharedExecutor { + static final class SharedExecutor { private static final String BACKGROUND_PROCESSING_EXECUTOR_NAME = "round-robin-load-balancer-executor"; private static final Executor INSTANCE = Executors.newFixedSizeExecutor(1, new DefaultThreadFactory(BACKGROUND_PROCESSING_EXECUTOR_NAME)); @@ -204,15 +201,4 @@ static Executor getInstance() { return INSTANCE; } } - - static final class SharedHealthCheckConfig { - private static final RoundRobinLoadBalancer.HealthCheckConfig INSTANCE = - new RoundRobinLoadBalancer.HealthCheckConfig(SharedExecutor.getInstance(), - Duration.ofMillis(DEFAULT_HEALTH_CHECK_INTERVAL_MILLIS), - DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD); - - static RoundRobinLoadBalancer.HealthCheckConfig getInstance() { - return INSTANCE; - } - } } From 1513ca548728fd0fc106201d64323fe37171d123 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Mon, 16 Aug 2021 15:31:54 +0200 Subject: [PATCH 10/12] Review feedback --- .../loadbalancer/RoundRobinLoadBalancer.java | 20 ++-- .../RoundRobinLoadBalancerFactory.java | 35 ++++--- .../RoundRobinLoadBalancerTest.java | 93 ++++++++++--------- 3 files changed, 80 insertions(+), 68 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 58af9839f8..d712159de4 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -727,18 +727,11 @@ public String toString() { final ConnState connState = this.connState; return "Host{" + "address=" + address + - ", state=" + describeState(connState.state) + + ", state=" + connState.state + ", #connections=" + connState.connections.length + '}'; } - private static String describeState(Object state) { - if (HealthCheck.class.equals(state.getClass())) { - return "unhealthy"; - } - return state.toString(); - } - private static final class ActiveState { private final int failedConnections; @@ -756,7 +749,7 @@ ActiveState forNextFailedConnection() { @Override public String toString() { - return "active{failedConnections=" + failedConnections + ')'; + return "ACTIVE(failedConnections=" + failedConnections + ')'; } } @@ -790,7 +783,7 @@ public void schedule() { } public Completable reconnect() { - return defer(() -> connectionFactory.newConnection(host.address, null)) + return connectionFactory.newConnection(host.address, null) .onErrorMap(cause -> { LOGGER.debug("Health check failed for address {}.", host.address, cause); return RESCHEDULE_SIGNAL; @@ -802,11 +795,16 @@ public Completable reconnect() { } else { LOGGER.debug("Health check finished for address {}. Host rejected connection.", host.address); - return newCnx.closeAsync().concat(completed()); + return newCnx.closeAsync(); } return completed(); }); } + + @Override + public String toString() { + return "UNHEALTHY"; + } } private static final class ConnState { diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index ab5ec18825..debd0bb384 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -24,6 +24,7 @@ import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.loadbalancer.RoundRobinLoadBalancer.HealthCheckConfig; import java.time.Duration; import java.util.function.Predicate; @@ -67,10 +68,10 @@ public final class RoundRobinLoadBalancerFactory eagerConnection * using health checks that run in the background. The health check tries to establish a new connection * and if it succeeds, the host is returned to the load balancing pool. As long as the connection * establishment fails, the host is not considered for opening new connections for processed requests. + *

    * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable this mechanism and always * consider all hosts for establishing new connections. + *

    * * @param backgroundExecutor {@link Executor} on which to schedule health checking. * @return {@code this}. + * @see #healthCheckFailedConnectionsThreshold(int) */ public RoundRobinLoadBalancerFactory.Builder backgroundExecutor( Executor backgroundExecutor) { @@ -139,10 +143,13 @@ public RoundRobinLoadBalancerFactory.Builder backgroundExecu /** * Configure an interval for health checking a host that failed to open connections. + *

    * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism * and always consider all hosts for establishing new connections. + *

    * @param interval interval at which a background health check will be scheduled. * @return {@code this}. + * @see #healthCheckFailedConnectionsThreshold(int) */ public RoundRobinLoadBalancerFactory.Builder healthCheckInterval(Duration interval) { if (requireNonNull(interval).isNegative()) { @@ -161,6 +168,8 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckInte * @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for * background health checking. Use negative value to disable the health checking mechanism. * @return {@code this}. + * @see #backgroundExecutor(Executor) + * @see #healthCheckFailedConnectionsThreshold(int) */ public RoundRobinLoadBalancerFactory.Builder healthCheckFailedConnectionsThreshold( int threshold) { @@ -178,24 +187,20 @@ public RoundRobinLoadBalancerFactory build() { return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, null); } - Executor backgroundExecutor = this.backgroundExecutor == null ? - SharedExecutor.getInstance() : this.backgroundExecutor; - Duration healthCheckInterval = this.healthCheckInterval.isZero() ? - DEFAULT_HEALTH_CHECK_INTERVAL : this.healthCheckInterval; - int healthCheckFailedConnectionsThreshold = this.healthCheckFailedConnectionsThreshold == 0 ? - DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD : this.healthCheckFailedConnectionsThreshold; - - RoundRobinLoadBalancer.HealthCheckConfig healthCheckConfig = new RoundRobinLoadBalancer.HealthCheckConfig( - backgroundExecutor, healthCheckInterval, healthCheckFailedConnectionsThreshold); + HealthCheckConfig healthCheckConfig = new HealthCheckConfig( + this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor, + healthCheckInterval, healthCheckFailedConnectionsThreshold); return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, healthCheckConfig); } } static final class SharedExecutor { - private static final String BACKGROUND_PROCESSING_EXECUTOR_NAME = "round-robin-load-balancer-executor"; private static final Executor INSTANCE = Executors.newFixedSizeExecutor(1, - new DefaultThreadFactory(BACKGROUND_PROCESSING_EXECUTOR_NAME)); + new DefaultThreadFactory("round-robin-load-balancer-executor")); + + private SharedExecutor() { + } static Executor getInstance() { return INSTANCE; diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index cb102a13eb..eda50e7991 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -25,6 +25,7 @@ import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.ExecutorRule; import io.servicetalk.concurrent.api.LegacyTestSingle; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; @@ -73,6 +74,7 @@ import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable; import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter; +import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; @@ -420,7 +422,7 @@ public void unhealthyHostTakenOutOfPoolForSelection() throws Exception { final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); } catch (Exception e) { - assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); } } @@ -463,7 +465,7 @@ public void disabledHealthCheckDoesntRun() throws Exception { for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); - assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); } assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); @@ -471,7 +473,7 @@ public void disabledHealthCheckDoesntRun() throws Exception { for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { unhealthyHostConnectionFactory.advanceTime(testExecutor); Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); - assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); } unhealthyHostConnectionFactory.advanceTime(testExecutor); @@ -496,7 +498,7 @@ public void hostUnhealthyIsHealthChecked() throws Exception { for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); - assertThat(e.getCause(), instanceOf(DELIBERATE_EXCEPTION.getClass())); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); } for (int i = 0; i < timeAdvancementsTillHealthy; ++i) { @@ -528,43 +530,48 @@ public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { // Imitate concurrency by running multiple threads attempting to establish connections. ExecutorService executor = Executors.newFixedThreadPool(3); - final Runnable runnable = () -> - assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + try { + final Runnable runnable = () -> + assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); - for (int i = 0; i < 1000; i++) { - executor.submit(runnable); - } + for (int i = 0; i < 1000; i++) { + executor.submit(runnable); + } - // From test main thread, wait until the host becomes UNHEALTHY, which is apparent from NoHostAvailableException - // being thrown from selection AFTER a health check was scheduled by any thread. - try { - awaitIndefinitely(lb.selectConnection(any()).retryWhen(retryWithConstantBackoffFullJitter((t) -> - // DeliberateException comes from connection opening, check for that first - // Next, NoAvailableHostException is thrown when the host is unhealthy, - // but we still wait until the health check is scheduled and only then stop retrying. - t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0, - // try to prevent stack overflow - Duration.ofMillis(30), io.servicetalk.concurrent.api.Executors.newFixedSizeExecutor(1)))); - } catch (Exception e) { - assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); - } + // From test main thread, wait until the host becomes UNHEALTHY, which is apparent from + // NoHostAvailableException being thrown from selection AFTER a health check was scheduled by any thread. + final Executor executorForRetries = io.servicetalk.concurrent.api.Executors.newFixedSizeExecutor(1); + try { + awaitIndefinitely(lb.selectConnection(any()).retryWhen(retryWithConstantBackoffFullJitter((t) -> + // DeliberateException comes from connection opening, check for that first + // Next, NoAvailableHostException is thrown when the host is unhealthy, + // but we still wait until the health check is scheduled and only then stop retrying. + t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0, + // try to prevent stack overflow + Duration.ofMillis(30), executorForRetries))); + } catch (Exception e) { + assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + } finally { + executorForRetries.closeAsync().toFuture().get(); + } - // At this point, either the above selection caused the host to be marked as UNHEALTHY, - // or any background thread. We also know that a health check is pending to be executed. - // Now we can validate if there is just one health check happening and confirm that by asserting the host - // is not selected. If our assumption doesn't hold, it means more than one health check was scheduled. - for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { - unhealthyHostConnectionFactory.advanceTime(testExecutor); + // At this point, either the above selection caused the host to be marked as UNHEALTHY, + // or any background thread. We also know that a health check is pending to be executed. + // Now we can validate if there is just one health check happening and confirm that by asserting the host + // is not selected. If our assumption doesn't hold, it means more than one health check was scheduled. + for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); - // Assert still unhealthy - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); - assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + // Assert still unhealthy + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + } + } finally { + // Shutdown the concurrent validation of unhealthiness. + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); } - // Shutdown the concurrent validation of unhealthiness. - executor.shutdownNow(); - executor.awaitTermination(10, SECONDS); - unhealthyHostConnectionFactory.advanceTime(testExecutor); final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); @@ -718,14 +725,16 @@ protected static class UnhealthyHostConnectionFactory { @Override public Single apply(final String s) { - if (s.equals(failingHost)) { - requests.incrementAndGet(); - if (momentInTime.get() >= connections.size()) { - return properConnection; + return defer(() -> { + if (s.equals(failingHost)) { + requests.incrementAndGet(); + if (momentInTime.get() >= connections.size()) { + return properConnection; + } + return connections.get(momentInTime.get()); } - return connections.get(momentInTime.get()); - } - return properConnection; + return properConnection; + }); } }; From 90000491cf8caf34c683ab16ef91f58365a5cee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Mon, 16 Aug 2021 18:55:38 +0200 Subject: [PATCH 11/12] Improved javadoc --- .../io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index d712159de4..a4d2ef9066 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -153,7 +153,7 @@ public RoundRobinLoadBalancer(final Publisher> eventPublisher, final ConnectionFactory connectionFactory, From 837e293feb0a43efc96784152d6d452eafeee7eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 17 Aug 2021 11:15:02 +0200 Subject: [PATCH 12/12] Javadoc improvements --- .../loadbalancer/RoundRobinLoadBalancer.java | 4 ++- .../RoundRobinLoadBalancerFactory.java | 29 ++++++++++++------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index a4d2ef9066..2481ef0547 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -153,7 +153,9 @@ public RoundRobinLoadBalancer(final Publisher> eventPublisher, final ConnectionFactory connectionFactory, diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index debd0bb384..5c2513b66f 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -52,7 +52,8 @@ * the host is not considered for opening new connections until the background check succeeds to create a connection. * Upon such event, the connection can immediately be reused and future attempts will again consider this host. * This behaviour can be disabled using a negative argument for - * {@link Builder#healthCheckFailedConnectionsThreshold(int)} + * {@link Builder#healthCheckFailedConnectionsThreshold(int)} and the failing host will take part in the regular + * round robin cycle for trying to establish a connection on the request path. * * * @param The resolved address type. @@ -126,10 +127,11 @@ public RoundRobinLoadBalancerFactory.Builder eagerConnection * using health checks that run in the background. The health check tries to establish a new connection * and if it succeeds, the host is returned to the load balancing pool. As long as the connection * establishment fails, the host is not considered for opening new connections for processed requests. + * If an {@link Executor} is not provided using this method, a default shared instance is used + * for all {@link LoadBalancer LoadBalancers} created by this factory. *

    * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable this mechanism and always * consider all hosts for establishing new connections. - *

    * * @param backgroundExecutor {@link Executor} on which to schedule health checking. * @return {@code this}. @@ -142,37 +144,42 @@ public RoundRobinLoadBalancerFactory.Builder backgroundExecu } /** - * Configure an interval for health checking a host that failed to open connections. + * Configure an interval for health checking a host that failed to open connections. If no interval is provided + * using this method, a default value will be used. *

    * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism * and always consider all hosts for establishing new connections. - *

    * @param interval interval at which a background health check will be scheduled. * @return {@code this}. * @see #healthCheckFailedConnectionsThreshold(int) */ public RoundRobinLoadBalancerFactory.Builder healthCheckInterval(Duration interval) { - if (requireNonNull(interval).isNegative()) { - throw new IllegalArgumentException("Health check interval can't be negative"); + if (interval.isNegative() || interval.isZero()) { + throw new IllegalArgumentException("Health check interval should be greater than 0"); } this.healthCheckInterval = interval; return this; } /** - * Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer} fails - * to open a connection in more consecutive attempts than the specified value, the host will be marked as - * unhealthy and a connection establishment will take place in the background. Until finished, the host will - * not take part in load balancing selection. + * Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer} + * consecutively fails to open connections in the amount greater or equal to the specified value, + * the host will be marked as unhealthy and connection establishment will take place in the background + * repeatedly until a connection is established. During that time, the host will not take part in + * load balancing selection. + *

    * Use a negative value of the argument to disable health checking. * @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for * background health checking. Use negative value to disable the health checking mechanism. * @return {@code this}. * @see #backgroundExecutor(Executor) - * @see #healthCheckFailedConnectionsThreshold(int) + * @see #healthCheckInterval(Duration) */ public RoundRobinLoadBalancerFactory.Builder healthCheckFailedConnectionsThreshold( int threshold) { + if (threshold == 0) { + throw new IllegalArgumentException("Health check failed connections threshold should not be 0"); + } this.healthCheckFailedConnectionsThreshold = threshold; return this; }