diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b82a9d..e81ef40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +- The Stackable scaler now ensures that a `TrinoCluster` has changed to `ready` more than 5 seconds + ago before marking it as `ready` ([#68]). + +[#68]: https://github.com/stackabletech/trino-lb/pull/68 + ## [0.5.0] - 2025-03-14 ### Added diff --git a/trino-lb/src/main.rs b/trino-lb/src/main.rs index 7be249d..8231452 100644 --- a/trino-lb/src/main.rs +++ b/trino-lb/src/main.rs @@ -82,7 +82,7 @@ fn main() -> Result<(), MainError> { assert!(n > 0, "{ENV_WORKER_THREADS:?} cannot be set to 0"); n } - // We default to at least 2 workers + // We default to at least 3 workers Err(std::env::VarError::NotPresent) => usize::max(3, num_cpus::get()), Err(std::env::VarError::NotUnicode(e)) => { panic!("{ENV_WORKER_THREADS:?} must be valid unicode, error: {e:?}") diff --git a/trino-lb/src/scaling/stackable.rs b/trino-lb/src/scaling/stackable.rs index f2ef8c7..44d44e5 100644 --- a/trino-lb/src/scaling/stackable.rs +++ b/trino-lb/src/scaling/stackable.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use chrono::{DateTime, Utc}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; use kube::{ Api, Client, Discovery, api::{Patch, PatchParams}, @@ -16,6 +18,7 @@ use trino_lb_core::{ use super::ScalerTrait; const K8S_FIELD_MANAGER: &str = "trino-lb"; +const MIN_READY_SECONDS_SINCE_LAST_TRANSITION: i64 = 5; #[derive(Snafu, Debug)] pub enum Error { @@ -123,6 +126,16 @@ pub enum Error { cluster: TrinoClusterName, namespace: String, }, + + #[snafu(display( + "Could not parse the lastTransitionTime {last_transition_time:?} for the Trino cluster {cluster:?} in namespace {namespace:?}" + ))] + ParseLastTransitionTime { + source: serde_json::Error, + last_transition_time: Value, + cluster: TrinoClusterName, + namespace: String, + }, } pub struct StackableScaler { @@ -160,7 +173,7 @@ impl StackableScaler { }; let (trino_resource, _) = discovery .resolve_gvk(&trino_gvk) - .context(ResolveGvkSnafu { gvk: trino_gvk })?; + .with_context(|| ResolveGvkSnafu { gvk: trino_gvk })?; for cluster in trino_cluster_groups .values() @@ -185,13 +198,13 @@ impl StackableScaler { let api: Api = Api::namespaced_with(client.clone(), &cluster.namespace, &trino_resource); - let trino = api - .get_opt(&cluster.name) - .await - .context(ReadTrinoClusterSnafu { - cluster: &cluster.name, - namespace: &cluster.namespace, - })?; + let trino = + api.get_opt(&cluster.name) + .await + .with_context(|_| ReadTrinoClusterSnafu { + cluster: &cluster.name, + namespace: &cluster.namespace, + })?; if trino.is_none() { TrinoClusterNotFoundSnafu { @@ -219,7 +232,7 @@ impl StackableScaler { let cluster = self .clusters .get(cluster) - .context(ClusterNotFoundSnafu { cluster })?; + .with_context(|| ClusterNotFoundSnafu { cluster })?; let patch = serde_json::json!({ "apiVersion": "trino.stackable.tech/v1alpha1", @@ -241,7 +254,7 @@ impl StackableScaler { .patch(&cluster.name, ¶ms, &patch) .instrument(debug_span!("Patching Trino cluster")) .await - .context(PatchTrinoClusterSnafu { + .with_context(|_| PatchTrinoClusterSnafu { cluster: &cluster.name, namespace: &cluster.namespace, })?; @@ -266,14 +279,14 @@ impl ScalerTrait for StackableScaler { let cluster = self .clusters .get(cluster) - .context(ClusterNotFoundSnafu { cluster })?; + .with_context(|| ClusterNotFoundSnafu { cluster })?; let status = cluster .api .get_status(&cluster.name) .instrument(debug_span!("Get Trino cluster status")) .await - .context(GetTrinoClusterStatusSnafu { + .with_context(|_| GetTrinoClusterStatusSnafu { cluster: &cluster.name, namespace: &cluster.namespace, })?; @@ -286,17 +299,17 @@ impl ScalerTrait for StackableScaler { let conditions = status .data .get("status") - .context(StatusFieldMissingInTrinoClusterSnafu { + .with_context(|| StatusFieldMissingInTrinoClusterSnafu { cluster: &cluster.name, namespace: &cluster.namespace, })? .get("conditions") - .context(StatusConditionsFieldMissingInTrinoClusterSnafu { + .with_context(|| StatusConditionsFieldMissingInTrinoClusterSnafu { cluster: &cluster.name, namespace: &cluster.namespace, })? .as_array() - .context(StatusConditionsFieldIsNotArraySnafu { + .with_context(|| StatusConditionsFieldIsNotArraySnafu { cluster: &cluster.name, namespace: &cluster.namespace, })?; @@ -304,21 +317,21 @@ impl ScalerTrait for StackableScaler { let available = conditions .iter() .find(|c| c.get("type") == Some(&Value::String("Available".to_string()))) - .context(NoAvailableEntryInStatusConditionsListSnafu { + .with_context(|| NoAvailableEntryInStatusConditionsListSnafu { cluster: &cluster.name, namespace: &cluster.namespace, })?; - let available = available.get("status").context( + let status = available.get("status").with_context(|| { NoStatusInAvailableEntryInStatusConditionsListSnafu { cluster: &cluster.name, namespace: &cluster.namespace, - }, - )?; + } + })?; - let available = match available { - Value::String(available) if available == "True" => true, - Value::String(available) if available == "False" => false, + let is_available = match status { + Value::String(status) if status == "True" => true, + Value::String(status) if status == "False" => false, _ => StatusNotParsableInAvailableEntryInStatusConditionsListSnafu { cluster: &cluster.name, namespace: &cluster.namespace, @@ -326,7 +339,50 @@ impl ScalerTrait for StackableScaler { .fail()?, }; - Ok(available) + // Return early if the cluster is not available + if !is_available { + return Ok(false); + } + + // Careful investigation has shown that trino-lb can quickly react to TrinoClusters coming + // available. When trying to immediately send queries to Trino we encountered errors such as: + // + // WARN trino_lb::http_server::v1::statement: Error while processing request + // error=SendQueryToTrino { source: ContactTrinoPostQuery { source: reqwest::Error { kind: Request, + // url: "https://trino-m-1-coordinator-default.default.svc.cluster.local:8443/v1/statement", + // source: hyper_util::client::legacy::Error(Connect, ConnectError("dns error", Custom { kind: Uncategorized, + // error: "failed to lookup address information: Name or service not known" })) } } } + // + // This is because the coordinator is ready but it might take some additional time for the + // DNS record of the Service to propagate. + // To prevent that we only consider TrinoClusters healthy if a minimum amount of seconds + // passed after it was marked as healthy. + + // It's valid for the lastTransitionTime to be not set, we assume the cluster is old in this + // case + if let Some(last_transition_time) = available.get("lastTransitionTime") { + let last_transition_time: Time = serde_json::from_value(last_transition_time.clone()) + .with_context(|_| ParseLastTransitionTimeSnafu { + last_transition_time: last_transition_time.clone(), + cluster: &cluster.name, + namespace: &cluster.namespace, + })?; + + let seconds_since_last_transition = elapsed_seconds_since(last_transition_time.0); + if seconds_since_last_transition < MIN_READY_SECONDS_SINCE_LAST_TRANSITION { + tracing::debug!( + seconds_since_last_transition, + min_ready_seconds_since_last_transition = + MIN_READY_SECONDS_SINCE_LAST_TRANSITION, + "The trino cluster recently turned ready, not marking as ready yet" + ); + + return Ok(false); + } + } + + // All checks succeeded, TrinoCluster is ready + Ok(true) } #[instrument(name = "StackableScaler::is_activated", skip(self))] @@ -358,3 +414,8 @@ impl ScalerTrait for StackableScaler { })?) } } + +fn elapsed_seconds_since(datetime: DateTime) -> i64 { + let now = Utc::now(); + (now - datetime).num_seconds() +}