diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml
index be43afa2d..73eab3631 100644
--- a/gateway-ha/pom.xml
+++ b/gateway-ha/pom.xml
@@ -62,11 +62,6 @@
jackson-dataformat-yaml
-
- com.google.errorprone
- error_prone_annotations
-
-
com.google.guava
guava
diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java
index 5ce124570..5b0a158ba 100644
--- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java
+++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java
@@ -28,6 +28,6 @@ public HealthCheckObserver(RoutingManager routingManager)
@Override
public void observe(java.util.List clustersStats)
{
- routingManager.updateBackEndStats(clustersStats);
+ routingManager.updateClusterStats(clustersStats);
}
}
diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java
new file mode 100644
index 000000000..b030ecc1c
--- /dev/null
+++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.gateway.ha.router;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import io.airlift.log.Logger;
+import io.trino.gateway.ha.clustermonitor.ClusterStats;
+import io.trino.gateway.ha.clustermonitor.TrinoStatus;
+import io.trino.gateway.ha.config.ProxyBackendConfiguration;
+import io.trino.gateway.ha.config.RoutingConfiguration;
+import jakarta.annotation.Nullable;
+import jakarta.ws.rs.HttpMethod;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class performs health check, stats counts for each backend and provides a backend given
+ * request object. Default implementation comes here.
+ */
+public abstract class BaseRoutingManager
+ implements RoutingManager
+{
+ private static final Logger log = Logger.get(BaseRoutingManager.class);
+ private final ExecutorService executorService = Executors.newFixedThreadPool(5);
+ private final GatewayBackendManager gatewayBackendManager;
+ private final ConcurrentHashMap backendToStatus;
+ private final String defaultRoutingGroup;
+ private final QueryHistoryManager queryHistoryManager;
+ private final LoadingCache queryIdBackendCache;
+ private final LoadingCache queryIdRoutingGroupCache;
+ private final LoadingCache queryIdExternalUrlCache;
+
+ public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, RoutingConfiguration routingConfiguration)
+ {
+ this.gatewayBackendManager = gatewayBackendManager;
+ this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
+ this.queryHistoryManager = queryHistoryManager;
+ this.queryIdBackendCache = buildCache(this::findBackendForUnknownQueryId);
+ this.queryIdRoutingGroupCache = buildCache(this::findRoutingGroupForUnknownQueryId);
+ this.queryIdExternalUrlCache = buildCache(this::findExternalUrlForUnknownQueryId);
+ this.backendToStatus = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Provide a strategy to select a backend out of all available backends
+ */
+ protected abstract Optional selectBackend(List backends, String user);
+
+ @Override
+ public void setBackendForQueryId(String queryId, String backend)
+ {
+ queryIdBackendCache.put(queryId, backend);
+ }
+
+ @Override
+ public void setRoutingGroupForQueryId(String queryId, String routingGroup)
+ {
+ queryIdRoutingGroupCache.put(queryId, routingGroup);
+ }
+
+ /**
+ * Performs routing to a default backend.
+ */
+ public ProxyBackendConfiguration provideDefaultBackendConfiguration(String user)
+ {
+ List backends = gatewayBackendManager.getActiveDefaultBackends().stream()
+ .filter(backEnd -> isBackendHealthy(backEnd.getName()))
+ .toList();
+ return selectBackend(backends, user).orElseThrow(() -> new IllegalStateException("Number of active backends found zero"));
+ }
+
+ /**
+ * Performs routing to a given cluster group. This falls back to a default backend, if no scheduled
+ * backend is found.
+ */
+ @Override
+ public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
+ {
+ List backends = gatewayBackendManager.getActiveBackends(routingGroup).stream()
+ .filter(backEnd -> isBackendHealthy(backEnd.getName()))
+ .toList();
+ return selectBackend(backends, user).orElseGet(() -> provideDefaultBackendConfiguration(user));
+ }
+
+ /**
+ * Performs cache look up, if a backend not found, it checks with all backends and tries to find
+ * out which backend has info about given query id.
+ */
+ @Nullable
+ @Override
+ public String findBackendForQueryId(String queryId)
+ {
+ String backendAddress = null;
+ try {
+ backendAddress = queryIdBackendCache.get(queryId);
+ }
+ catch (ExecutionException e) {
+ log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
+ }
+ return backendAddress;
+ }
+
+ @Nullable
+ @Override
+ public String findExternalUrlForQueryId(String queryId)
+ {
+ String externalUrl = null;
+ try {
+ externalUrl = queryIdExternalUrlCache.get(queryId);
+ }
+ catch (ExecutionException e) {
+ log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
+ }
+ return externalUrl;
+ }
+
+ /**
+ * Looks up the routing group associated with the queryId in the cache.
+ * If it's not in the cache, look up in query history
+ */
+ @Nullable
+ @Override
+ public String findRoutingGroupForQueryId(String queryId)
+ {
+ String routingGroup = null;
+ try {
+ routingGroup = queryIdRoutingGroupCache.get(queryId);
+ }
+ catch (ExecutionException e) {
+ log.warn("Exception while loading queryId from routing group cache %s", e.getLocalizedMessage());
+ }
+ return routingGroup;
+ }
+
+ @Override
+ public void updateBackEndHealth(String backendId, TrinoStatus value)
+ {
+ log.info("backend %s isHealthy %s", backendId, value);
+ backendToStatus.put(backendId, value);
+ }
+
+ @Override
+ public void updateClusterStats(List stats)
+ {
+ for (ClusterStats clusterStats : stats) {
+ updateBackEndHealth(clusterStats.clusterId(), clusterStats.trinoStatus());
+ }
+ }
+
+ @VisibleForTesting
+ void setExternalUrlForQueryId(String queryId, String externalUrl)
+ {
+ queryIdExternalUrlCache.put(queryId, externalUrl);
+ }
+
+ @VisibleForTesting
+ String findBackendForUnknownQueryId(String queryId)
+ {
+ String backend;
+ backend = queryHistoryManager.getBackendForQueryId(queryId);
+ if (Strings.isNullOrEmpty(backend)) {
+ log.debug("Unable to find backend mapping for [%s]. Searching for suitable backend", queryId);
+ backend = searchAllBackendForQuery(queryId);
+ }
+ return backend;
+ }
+
+ /**
+ * This tries to find out which backend may have info about given query id. If not found returns
+ * the first healthy backend.
+ */
+ private String searchAllBackendForQuery(String queryId)
+ {
+ List backends = gatewayBackendManager.getAllBackends();
+
+ Map> responseCodes = new HashMap<>();
+ try {
+ for (ProxyBackendConfiguration backend : backends) {
+ String target = backend.getProxyTo() + "/v1/query/" + queryId;
+
+ Future call =
+ executorService.submit(
+ () -> {
+ URL url = new URL(target);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(5));
+ conn.setReadTimeout((int) TimeUnit.SECONDS.toMillis(5));
+ conn.setRequestMethod(HttpMethod.HEAD);
+ return conn.getResponseCode();
+ });
+ responseCodes.put(backend.getProxyTo(), call);
+ }
+ for (Map.Entry> entry : responseCodes.entrySet()) {
+ if (entry.getValue().isDone()) {
+ int responseCode = entry.getValue().get();
+ if (responseCode == 200) {
+ log.info("Found query [%s] on backend [%s]", queryId, entry.getKey());
+ setBackendForQueryId(queryId, entry.getKey());
+ return entry.getKey();
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ log.warn("Query id [%s] not found", queryId);
+ }
+ // Fallback on first active backend if queryId mapping not found.
+ return gatewayBackendManager.getActiveBackends(defaultRoutingGroup).get(0).getProxyTo();
+ }
+
+ /**
+ * Attempts to look up the routing group associated with the query id from query history table
+ */
+ private String findRoutingGroupForUnknownQueryId(String queryId)
+ {
+ String routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId);
+ setRoutingGroupForQueryId(queryId, routingGroup);
+ return routingGroup;
+ }
+
+ /**
+ * Attempts to look up the external url associated with the query id from query history table
+ */
+ private String findExternalUrlForUnknownQueryId(String queryId)
+ {
+ String externalUrl = queryHistoryManager.getExternalUrlForQueryId(queryId);
+ setExternalUrlForQueryId(queryId, externalUrl);
+ return externalUrl;
+ }
+
+ private static LoadingCache buildCache(Function loader)
+ {
+ return CacheBuilder.newBuilder()
+ .maximumSize(10000)
+ .expireAfterAccess(30, TimeUnit.MINUTES)
+ .build(
+ new CacheLoader<>()
+ {
+ @Override
+ public String load(String queryId)
+ {
+ return loader.apply(queryId);
+ }
+ });
+ }
+
+ private boolean isBackendHealthy(String backendId)
+ {
+ TrinoStatus status = backendToStatus.getOrDefault(backendId, TrinoStatus.UNKNOWN);
+ return status == TrinoStatus.HEALTHY;
+ }
+}
diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java
index 022556018..0aba3eb8e 100644
--- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java
+++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java
@@ -15,35 +15,28 @@
package io.trino.gateway.ha.router;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
-import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.config.RoutingConfiguration;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
+import java.util.concurrent.ConcurrentHashMap;
public class QueryCountBasedRouter
- extends StochasticRoutingManager
+ extends BaseRoutingManager
{
- private static final Logger log = Logger.get(QueryCountBasedRouter.class);
- @GuardedBy("this")
- private List clusterStats;
- private final String defaultRoutingGroup;
+ private ConcurrentHashMap clusterStats;
@VisibleForTesting
- synchronized List clusterStats()
+ synchronized Map clusterStats()
{
- return ImmutableList.copyOf(clusterStats);
+ return ImmutableMap.copyOf(clusterStats);
}
static class LocalStats
@@ -167,8 +160,7 @@ public QueryCountBasedRouter(
RoutingConfiguration routingConfiguration)
{
super(gatewayBackendManager, queryHistoryManager, routingConfiguration);
- this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
- clusterStats = new ArrayList<>();
+ clusterStats = new ConcurrentHashMap<>();
}
private int compareStats(LocalStats lhs, LocalStats rhs, String user)
@@ -192,41 +184,6 @@ private int compareStats(LocalStats lhs, LocalStats rhs, String user)
return Integer.compare(lhs.runningQueryCount(), rhs.runningQueryCount());
}
- // We sort and find the backend based on the individual user's count of the queued queries
- // first, in case user doesn't have any queries queued we use the cluster wide stats
- //
- // First filter the list of clusters for a particular routing group
- //
- // If a user has queued queries, then find a cluster with the least number of QUEUED queries
- // for that user.
- //
- // If a user's QUEUED query count is the same on every cluster then go with a cluster with
- // the cluster wide stats.
- //
- // Find a cluster with the least number of QUEUED queries, if there are the same number of
- // queries queued, then compare the number of running queries.
- //
- // After a query is routed, we need to update the stats for that cluster until we received the
- // updated stats for all the clusters.
- // if a user has queries queued then we assume that the routed query will be also queued or
- // else we assume it would be scheduled immediately and we increment the stats for the running
- // queries
-
- private synchronized Optional getClusterToRoute(String user, String routingGroup)
- {
- log.debug("sorting cluster stats for %s %s", user, routingGroup);
- List filteredList = clusterStats.stream()
- .filter(stats -> stats.trinoStatus() == TrinoStatus.HEALTHY)
- .filter(stats -> routingGroup.equals(stats.routingGroup()))
- .collect(Collectors.toList());
-
- if (filteredList.isEmpty()) {
- return Optional.empty();
- }
-
- return Optional.of(Collections.min(filteredList, (lhs, rhs) -> compareStats(lhs, rhs, user)));
- }
-
private void updateLocalStats(LocalStats stats, String user)
{
// The live stats refresh every few seconds, so we update the stats immediately
@@ -243,24 +200,39 @@ private void updateLocalStats(LocalStats stats, String user)
stats.runningQueryCount(stats.runningQueryCount() + 1);
}
- public synchronized Optional getBackendConfigurationForRoutingGroup(String routingGroup, String user)
- {
- Optional localStats = getClusterToRoute(user, routingGroup);
- localStats.ifPresent(stats -> updateLocalStats(stats, user));
- return localStats.map(LocalStats::backendConfiguration);
- }
-
@Override
- public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
+ public synchronized void updateClusterStats(List stats)
{
- return getBackendConfigurationForRoutingGroup(routingGroup, user)
- .orElseGet(() -> getBackendConfigurationForRoutingGroup(defaultRoutingGroup, user)
- .orElseThrow(() -> new RouterException("did not find any cluster for the default routing group: " + defaultRoutingGroup)));
+ super.updateClusterStats(stats);
+ for (ClusterStats stat : stats) {
+ clusterStats.put(stat.clusterId(), new LocalStats(stat));
+ }
}
+ // We sort and find the backend based on the individual user's count of the queued queries
+ // first, in case user doesn't have any queries queued we use the cluster wide stats
+ //
+ // First filter the list of clusters for a particular routing group
+ //
+ // If a user has queued queries, then find a cluster with the least number of QUEUED queries
+ // for that user.
+ //
+ // If a user's QUEUED query count is the same on every cluster then go with a cluster with
+ // the cluster wide stats.
+ //
+ // Find a cluster with the least number of QUEUED queries, if there are the same number of
+ // queries queued, then compare the number of running queries.
+ //
+ // After a query is routed, we need to update the stats for that cluster until we received the
+ // updated stats for all the clusters.
+ // if a user has queries queued then we assume that the routed query will be also queued or
+ // else we assume it would be scheduled immediately and we increment the stats for the running
+ // queries
@Override
- public synchronized void updateBackEndStats(List stats)
+ protected synchronized Optional selectBackend(List backends, String user)
{
- clusterStats = stats.stream().map(a -> new LocalStats(a)).collect(Collectors.toList());
+ Optional cluster = backends.stream().min((a, b) -> compareStats(clusterStats.get(a.getName()), clusterStats.get(b.getName()), user));
+ cluster.ifPresent(c -> updateLocalStats(clusterStats.get(c.getName()), user));
+ return cluster;
}
}
diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java
index c95134780..2f112f6dd 100644
--- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java
+++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java
@@ -13,280 +13,75 @@
*/
package io.trino.gateway.ha.router;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
-import io.trino.gateway.ha.config.RoutingConfiguration;
-import jakarta.annotation.Nullable;
-import jakarta.ws.rs.HttpMethod;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-/**
- * This class performs health check, stats counts for each backend and provides a backend given
- * request object. Default implementation comes here.
- */
-public abstract class RoutingManager
+public interface RoutingManager
{
- private static final Random RANDOM = new Random();
- private static final Logger log = Logger.get(RoutingManager.class);
- private final LoadingCache queryIdBackendCache;
- private final ExecutorService executorService = Executors.newFixedThreadPool(5);
- private final GatewayBackendManager gatewayBackendManager;
- private final ConcurrentHashMap backendToStatus;
- private final String defaultRoutingGroup;
- private final LoadingCache queryIdRoutingGroupCache;
- private final LoadingCache queryIdExternalUrlCache;
- private final QueryHistoryManager queryHistoryManager;
-
- public RoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, RoutingConfiguration routingConfiguration)
- {
- this.gatewayBackendManager = gatewayBackendManager;
- this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
- this.queryHistoryManager = queryHistoryManager;
- queryIdBackendCache =
- CacheBuilder.newBuilder()
- .maximumSize(10000)
- .expireAfterAccess(30, TimeUnit.MINUTES)
- .build(
- new CacheLoader<>()
- {
- @Override
- public String load(String queryId)
- {
- return findBackendForUnknownQueryId(queryId);
- }
- });
- queryIdRoutingGroupCache =
- CacheBuilder.newBuilder()
- .maximumSize(10000)
- .expireAfterAccess(30, TimeUnit.MINUTES)
- .build(
- new CacheLoader<>()
- {
- @Override
- public String load(String queryId)
- {
- return findRoutingGroupForUnknownQueryId(queryId);
- }
- });
- queryIdExternalUrlCache =
- CacheBuilder.newBuilder()
- .maximumSize(10000)
- .expireAfterAccess(30, TimeUnit.MINUTES)
- .build(
- new CacheLoader<>()
- {
- @Override
- public String load(String queryId)
- {
- return findExternalUrlForUnknownQueryId(queryId);
- }
- });
-
- this.backendToStatus = new ConcurrentHashMap<>();
- }
-
- protected GatewayBackendManager getGatewayBackendManager()
- {
- return gatewayBackendManager;
- }
-
- public void setBackendForQueryId(String queryId, String backend)
- {
- queryIdBackendCache.put(queryId, backend);
- }
-
- public void setExternalUrlForQueryId(String queryId, String externalUrl)
- {
- queryIdExternalUrlCache.put(queryId, externalUrl);
- }
-
- public void setRoutingGroupForQueryId(String queryId, String routingGroup)
- {
- queryIdRoutingGroupCache.put(queryId, routingGroup);
- }
-
/**
- * Performs routing to a default backend.
+ * Updates the health status of a backend cluster.
+ *
+ * @param backendId the unique identifier of the backend cluster
+ * @param value the health status of the backend (UP, DOWN, etc.)
*/
- private ProxyBackendConfiguration provideDefaultBackendConfiguration()
- {
- List backends = gatewayBackendManager.getActiveDefaultBackends();
- backends.removeIf(backend -> isBackendNotHealthy(backend.getName()));
- if (backends.isEmpty()) {
- throw new IllegalStateException("Number of active backends found zero");
- }
- int backendId = Math.abs(RANDOM.nextInt()) % backends.size();
- return backends.get(backendId);
- }
+ void updateBackEndHealth(String backendId, TrinoStatus value);
/**
- * Performs routing to a given cluster group. This falls back to a default backend, if no scheduled
- * backend is found.
+ * Updates the statistics for all backend clusters.
+ *
+ * @param stats a list of ClusterStats objects representing the current state of each backend cluster
*/
- public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
- {
- List backends =
- gatewayBackendManager.getActiveBackends(routingGroup);
- backends.removeIf(backend -> isBackendNotHealthy(backend.getName()));
- if (backends.isEmpty()) {
- return provideDefaultBackendConfiguration();
- }
- int backendId = Math.abs(RANDOM.nextInt()) % backends.size();
- return backends.get(backendId);
- }
+ void updateClusterStats(List stats);
/**
- * Performs cache look up, if a backend not found, it checks with all backends and tries to find
- * out which backend has info about given query id.
+ * Associates a backend cluster with a specific query ID for sticky routing.
+ *
+ * @param queryId the unique identifier of the query
+ * @param backend the backend cluster to associate with the query
*/
- @Nullable
- public String findBackendForQueryId(String queryId)
- {
- String backendAddress = null;
- try {
- backendAddress = queryIdBackendCache.get(queryId);
- }
- catch (ExecutionException e) {
- log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
- }
- return backendAddress;
- }
-
- @Nullable
- public String findExternalUrlForQueryId(String queryId)
- {
- String externalUrl = null;
- try {
- externalUrl = queryIdExternalUrlCache.get(queryId);
- }
- catch (ExecutionException e) {
- log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
- }
- return externalUrl;
- }
+ void setBackendForQueryId(String queryId, String backend);
/**
- * Looks up the routing group associated with the queryId in the cache.
- * If it's not in the cache, look up in query history
+ * Associates a routing group with a specific query ID.
+ *
+ * @param queryId the unique identifier of the query
+ * @param routingGroup the routing group to associate with the query
*/
- @Nullable
- public String findRoutingGroupForQueryId(String queryId)
- {
- String routingGroup = null;
- try {
- routingGroup = queryIdRoutingGroupCache.get(queryId);
- }
- catch (ExecutionException e) {
- log.warn("Exception while loading queryId from routing group cache %s", e.getLocalizedMessage());
- }
- return routingGroup;
- }
-
- public void updateBackEndHealth(String backendId, TrinoStatus value)
- {
- log.info("backend %s isHealthy %s", backendId, value);
- backendToStatus.put(backendId, value);
- }
-
- public void updateBackEndStats(List stats)
- {
- for (ClusterStats clusterStats : stats) {
- updateBackEndHealth(clusterStats.clusterId(), clusterStats.trinoStatus());
- }
- }
+ void setRoutingGroupForQueryId(String queryId, String routingGroup);
/**
- * This tries to find out which backend may have info about given query id. If not found returns
- * the first healthy backend.
+ * Finds the backend cluster associated with a given query ID.
+ *
+ * @param queryId the unique identifier of the query
+ * @return the backend cluster ID, or null if not found
*/
- protected String findBackendForUnknownQueryId(String queryId)
- {
- List backends = gatewayBackendManager.getAllBackends();
-
- Map> responseCodes = new HashMap<>();
- try {
- for (ProxyBackendConfiguration backend : backends) {
- String target = backend.getProxyTo() + "/v1/query/" + queryId;
-
- Future call =
- executorService.submit(
- () -> {
- URL url = new URL(target);
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(5));
- conn.setReadTimeout((int) TimeUnit.SECONDS.toMillis(5));
- conn.setRequestMethod(HttpMethod.HEAD);
- return conn.getResponseCode();
- });
- responseCodes.put(backend.getProxyTo(), call);
- }
- for (Map.Entry> entry : responseCodes.entrySet()) {
- if (entry.getValue().isDone()) {
- int responseCode = entry.getValue().get();
- if (responseCode == 200) {
- log.info("Found query [%s] on backend [%s]", queryId, entry.getKey());
- setBackendForQueryId(queryId, entry.getKey());
- return entry.getKey();
- }
- }
- }
- }
- catch (Exception e) {
- log.warn("Query id [%s] not found", queryId);
- }
- // Fallback on first active backend if queryId mapping not found.
- return gatewayBackendManager.getActiveBackends(defaultRoutingGroup).get(0).getProxyTo();
- }
+ String findBackendForQueryId(String queryId);
/**
- * Attempts to look up the routing group associated with the query id from query history table
+ * Finds the external URL for the backend cluster associated with a given query ID.
+ *
+ * @param queryId the unique identifier of the query
+ * @return the external URL, or null if not found
*/
- protected String findRoutingGroupForUnknownQueryId(String queryId)
- {
- String routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId);
- setRoutingGroupForQueryId(queryId, routingGroup);
- return routingGroup;
- }
+ String findExternalUrlForQueryId(String queryId);
/**
- * Attempts to look up the external url associated with the query id from query history table
+ * Finds the routing group associated with a given query ID.
+ *
+ * @param queryId the unique identifier of the query
+ * @return the routing group, or null if not found
*/
- protected String findExternalUrlForUnknownQueryId(String queryId)
- {
- String externalUrl = queryHistoryManager.getExternalUrlForQueryId(queryId);
- setExternalUrlForQueryId(queryId, externalUrl);
- return externalUrl;
- }
+ String findRoutingGroupForQueryId(String queryId);
- // Predicate helper function to remove the backends from the list
- // We are returning the unhealthy (not healthy)
- private boolean isBackendNotHealthy(String backendId)
- {
- if (backendToStatus.isEmpty()) {
- log.error("backends can not be empty");
- return true;
- }
- TrinoStatus status = backendToStatus.get(backendId);
- if (status == null) {
- return true;
- }
- return status != TrinoStatus.HEALTHY;
- }
+ /**
+ * Provides the backend configuration for a given routing group and user.
+ *
+ * @param routingGroup the routing group to use for backend selection
+ * @param user the user requesting the backend
+ * @return the backend configuration for the selected cluster
+ */
+ ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user);
}
diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/StochasticRoutingManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/StochasticRoutingManager.java
index 1df1705e7..e82401015 100644
--- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/StochasticRoutingManager.java
+++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/StochasticRoutingManager.java
@@ -13,16 +13,18 @@
*/
package io.trino.gateway.ha.router;
-import com.google.common.base.Strings;
import com.google.inject.Inject;
-import io.airlift.log.Logger;
+import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.config.RoutingConfiguration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+
public class StochasticRoutingManager
- extends RoutingManager
+ extends BaseRoutingManager
{
- private static final Logger log = Logger.get(StochasticRoutingManager.class);
- private final QueryHistoryManager queryHistoryManager;
+ private static final Random RANDOM = new Random();
@Inject
public StochasticRoutingManager(
@@ -31,18 +33,15 @@ public StochasticRoutingManager(
RoutingConfiguration routingConfiguration)
{
super(gatewayBackendManager, queryHistoryManager, routingConfiguration);
- this.queryHistoryManager = queryHistoryManager;
}
@Override
- protected String findBackendForUnknownQueryId(String queryId)
+ protected Optional selectBackend(List backends, String user)
{
- String backend;
- backend = queryHistoryManager.getBackendForQueryId(queryId);
- if (Strings.isNullOrEmpty(backend)) {
- log.debug("Unable to find backend mapping for [%s]. Searching for suitable backend", queryId);
- backend = super.findBackendForUnknownQueryId(queryId);
+ if (backends.isEmpty()) {
+ return Optional.empty();
}
- return backend;
+ int backendId = Math.abs(RANDOM.nextInt()) % backends.size();
+ return Optional.of(backends.get(backendId));
}
}
diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java
index 8317a5988..04c3f8b6e 100644
--- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java
+++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java
@@ -18,6 +18,7 @@
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.config.RoutingConfiguration;
+import io.trino.gateway.ha.persistence.JdbcConnectionManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -25,6 +26,7 @@
import java.util.List;
import java.util.Map;
+import static io.trino.gateway.ha.TestingJdbcConnectionManager.createTestingJdbcConnectionManager;
import static org.assertj.core.api.Assertions.assertThat;
final class TestQueryCountBasedRouter
@@ -38,6 +40,8 @@ final class TestQueryCountBasedRouter
static final int LEAST_QUEUED_COUNT = 1;
static final int SAME_QUERY_COUNT = 5;
+ GatewayBackendManager backendManager;
+ QueryHistoryManager historyManager;
QueryCountBasedRouter queryCountBasedRouter;
ImmutableList clusters;
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
@@ -54,7 +58,7 @@ private static List getClusterStatsList(String routingGroup)
ImmutableList.Builder clustersBuilder = new ImmutableList.Builder();
// Set Cluster1 stats
{
- ClusterStats.Builder cluster = ClusterStats.builder("c1");
+ ClusterStats.Builder cluster = ClusterStats.builder("c1-" + routingGroup);
cluster.proxyTo(BACKEND_URL_1);
cluster.trinoStatus(TrinoStatus.HEALTHY);
cluster.routingGroup(routingGroup);
@@ -65,7 +69,7 @@ private static List getClusterStatsList(String routingGroup)
}
// Set Cluster2 stats
{
- ClusterStats.Builder cluster = ClusterStats.builder("c2");
+ ClusterStats.Builder cluster = ClusterStats.builder("c2-" + routingGroup);
cluster.proxyTo(BACKEND_URL_2);
cluster.trinoStatus(TrinoStatus.HEALTHY);
cluster.routingGroup(routingGroup);
@@ -80,7 +84,7 @@ private static List getClusterStatsList(String routingGroup)
}
// Set Cluster3 stats with the least no of queries running
{
- ClusterStats.Builder cluster = ClusterStats.builder("c3");
+ ClusterStats.Builder cluster = ClusterStats.builder("c3-" + routingGroup);
cluster.proxyTo(BACKEND_URL_3);
cluster.trinoStatus(TrinoStatus.HEALTHY);
cluster.routingGroup(routingGroup);
@@ -95,7 +99,7 @@ private static List getClusterStatsList(String routingGroup)
}
// cluster - unhealthy one
{
- ClusterStats.Builder cluster = ClusterStats.builder("c-unhealthy");
+ ClusterStats.Builder cluster = ClusterStats.builder("c-unhealthy-" + routingGroup);
cluster.proxyTo("http://c-unhealthy");
cluster.trinoStatus(TrinoStatus.UNHEALTHY); //This cluster should never show up to route
cluster.routingGroup(routingGroup);
@@ -107,7 +111,7 @@ private static List getClusterStatsList(String routingGroup)
}
// cluster - unhealthy one, no stats
{
- ClusterStats.Builder cluster = ClusterStats.builder("c-unhealthy2");
+ ClusterStats.Builder cluster = ClusterStats.builder("c-unhealthy2-" + routingGroup);
cluster.proxyTo("http://c-unhealthy2");
cluster.trinoStatus(TrinoStatus.UNHEALTHY); //This cluster should never show up to route
@@ -115,7 +119,7 @@ private static List getClusterStatsList(String routingGroup)
}
// cluster - it's messed up - healthy but no stats
{
- ClusterStats.Builder cluster = ClusterStats.builder("c-unhealthy2");
+ ClusterStats.Builder cluster = ClusterStats.builder("c-unhealthy3-" + routingGroup);
cluster.proxyTo("http://c-messed-up");
//This is a scenrio when, something is really wrong
//We just get the cluster state as health but no stats
@@ -126,7 +130,7 @@ private static List getClusterStatsList(String routingGroup)
return clustersBuilder.build();
}
- static ClusterStats getClusterWithNoUserQueueAndMinQueueCount()
+ private ClusterStats getClusterWithNoUserQueueAndMinQueueCount()
{
ClusterStats.Builder cluster = ClusterStats.builder("c-Minimal-Queue");
cluster.proxyTo(BACKEND_URL_4);
@@ -134,10 +138,13 @@ static ClusterStats getClusterWithNoUserQueueAndMinQueueCount()
cluster.routingGroup("adhoc");
cluster.runningQueryCount(5);
cluster.queuedQueryCount(LEAST_QUEUED_COUNT);
- return cluster.build();
+ ClusterStats clusterStats = cluster.build();
+
+ backendManager.addBackend(createProxyBackendConfiguration(clusterStats));
+ return clusterStats;
}
- static ClusterStats getClusterWithMinRunningQueries()
+ private ClusterStats getClusterWithMinRunningQueries()
{
ClusterStats.Builder cluster = ClusterStats.builder("c-Minimal-Running");
cluster.proxyTo(BACKEND_URL_5);
@@ -145,21 +152,43 @@ static ClusterStats getClusterWithMinRunningQueries()
cluster.routingGroup("adhoc");
cluster.runningQueryCount(1);
cluster.queuedQueryCount(LEAST_QUEUED_COUNT);
- cluster.userQueuedCount(new HashMap());
- return cluster.build();
+ cluster.userQueuedCount(new HashMap<>());
+ ClusterStats clusterStats = cluster.build();
+
+ backendManager.addBackend(createProxyBackendConfiguration(clusterStats));
+ return clusterStats;
+ }
+
+ static ProxyBackendConfiguration createProxyBackendConfiguration(ClusterStats clusterStats)
+ {
+ ProxyBackendConfiguration proxyBackend = new ProxyBackendConfiguration();
+ proxyBackend.setActive(true);
+ proxyBackend.setRoutingGroup(clusterStats.routingGroup());
+ proxyBackend.setName(clusterStats.clusterId());
+ proxyBackend.setProxyTo(clusterStats.proxyTo());
+ proxyBackend.setExternalUrl(clusterStats.externalUrl());
+ return proxyBackend;
}
@BeforeEach
public void init()
+ {
+ JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
+ backendManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration);
+ historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), false);
+ queryCountBasedRouter = new QueryCountBasedRouter(backendManager, historyManager, routingConfiguration);
+ populateData();
+ queryCountBasedRouter.updateClusterStats(clusters);
+ }
+
+ private void populateData()
{
//Have a adoc and an etl routing groups - 2 sets of clusters
clusters = new ImmutableList.Builder()
.addAll(getClusterStatsList("adhoc"))
.addAll(getClusterStatsList("etl"))
.build();
-
- queryCountBasedRouter = new QueryCountBasedRouter(null, null, routingConfiguration);
- queryCountBasedRouter.updateBackEndStats(clusters);
+ clusters.forEach(c -> backendManager.addBackend(createProxyBackendConfiguration(c)));
}
@Test
@@ -176,8 +205,8 @@ void testUserWithSameNoOfQueuedQueries()
//After the above code is run, c3 cluster has 6 queued queries
//c1, c2 cluster will be with same original number of queued queries i.e. 5 each
//The next query should go to the c1 cluster, as it would have less number of cluster wide
- QueryCountBasedRouter.LocalStats c3Stats = queryCountBasedRouter.clusterStats().stream()
- .filter(c -> c.clusterId().equals("c3") &&
+ QueryCountBasedRouter.LocalStats c3Stats = queryCountBasedRouter.clusterStats().values().stream()
+ .filter(c -> c.clusterId().equals("c3-etl") &&
c.routingGroup().equals("etl"))
.findAny().orElseThrow();
assertThat(c3Stats.userQueuedCount().getOrDefault("u1", 0))
@@ -240,7 +269,7 @@ void testClusterWithLeastQueueCount()
.addAll(clusters)
.add(getClusterWithNoUserQueueAndMinQueueCount())
.build();
- queryCountBasedRouter.updateBackEndStats(clusters);
+ queryCountBasedRouter.updateClusterStats(clusters);
ProxyBackendConfiguration proxyConfig = queryCountBasedRouter.provideBackendConfiguration("NonExisting", "u1");
String proxyTo = proxyConfig.getProxyTo();
@@ -259,7 +288,7 @@ void testClusterWithLeastRunningCount()
.add(getClusterWithMinRunningQueries())
.build();
- queryCountBasedRouter.updateBackEndStats(clusters);
+ queryCountBasedRouter.updateClusterStats(clusters);
ProxyBackendConfiguration proxyConfig = queryCountBasedRouter.provideBackendConfiguration("NonExisting", "u1");
String proxyTo = proxyConfig.getProxyTo();
diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerExternalUrlCache.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerExternalUrlCache.java
index a4aa6bfd5..86779e884 100644
--- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerExternalUrlCache.java
+++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerExternalUrlCache.java
@@ -26,7 +26,7 @@
@TestInstance(Lifecycle.PER_CLASS)
final class TestRoutingManagerExternalUrlCache
{
- private RoutingManager routingManager;
+ private StochasticRoutingManager routingManager;
private GatewayBackendManager backendManager;
private QueryHistoryManager queryHistoryManager;
private QueryHistoryManager mockQueryHistoryManager;
@@ -116,8 +116,7 @@ void testEmptyStringExternalUrl()
@Test
void testCacheWithMockQueryHistoryManager()
{
- RoutingManager mockRoutingManager = new TestRoutingManager(backendManager, mockQueryHistoryManager,
- routingConfiguration);
+ RoutingManager mockRoutingManager = new TestRoutingManager(backendManager, mockQueryHistoryManager, routingConfiguration);
String queryId = "mock-test-query";
String expectedUrl = "https://mock-gateway.example.com";
@@ -130,7 +129,7 @@ void testCacheWithMockQueryHistoryManager()
}
private static class TestRoutingManager
- extends RoutingManager
+ extends StochasticRoutingManager
{
public TestRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager,
RoutingConfiguration routingConfiguration)