From 0dfb70a60f28d7e29fe12c1749cef48162340bbd Mon Sep 17 00:00:00 2001 From: Jesse Luehrs <doy@materialize.com> Date: Thu, 10 Apr 2025 14:34:59 -0400 Subject: [PATCH] allow orchestratord to force a rollout to promote before it's ready this allows us to choose how to resolve a rollout which is stuck or taking too long - we can either cancel the rollout (by resetting the reconciliation_id to the last reconciliation id from the status) or now force it to complete by setting force_promote to the reconciliation_id value. --- src/cloud-resources/src/crd/materialize.rs | 14 +++++++++++++ .../src/controller/materialize.rs | 8 ++++++- .../controller/materialize/environmentd.rs | 21 ++++++++++++++++++- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/src/cloud-resources/src/crd/materialize.rs b/src/cloud-resources/src/crd/materialize.rs index 685576a98391c..02ec1d6aaaf0d 100644 --- a/src/cloud-resources/src/crd/materialize.rs +++ b/src/cloud-resources/src/crd/materialize.rs @@ -108,6 +108,12 @@ pub mod v1alpha1 { // generation rollout is automatically triggered. #[serde(default = "Uuid::new_v4")] pub request_rollout: Uuid, + // If force_promote is set to the same value as request_rollout, the + // current rollout will skip waiting for clusters in the new + // generation to rehydrate before promoting the new environmentd to + // leader. + #[serde(default)] + pub force_promote: Uuid, // This value will be written to an annotation in the generated // environmentd statefulset, in order to force the controller to // detect the generated resources as changed even if no other changes @@ -320,6 +326,14 @@ pub mod v1alpha1 { .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request) } + pub fn set_force_promote(&mut self) { + self.spec.force_promote = self.spec.request_rollout; + } + + pub fn should_force_promote(&self) -> bool { + self.spec.force_promote == self.spec.request_rollout + } + pub fn conditions_need_update(&self) -> bool { let Some(status) = self.status.as_ref() else { return true; diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index 4d36f9b0a0025..c493d3fd3f44f 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -392,7 +392,13 @@ impl k8s_controller::Context for Context { trace!("applying environment resources"); match resources - .apply(&client, &self.config, increment_generation, &mz.namespace()) + .apply( + &client, + &self.config, + increment_generation, + mz.should_force_promote(), + &mz.namespace(), + ) .await { Ok(Some(action)) => { diff --git a/src/orchestratord/src/controller/materialize/environmentd.rs b/src/orchestratord/src/controller/materialize/environmentd.rs index c135a84512946..e24374b8b11ce 100644 --- a/src/orchestratord/src/controller/materialize/environmentd.rs +++ b/src/orchestratord/src/controller/materialize/environmentd.rs @@ -31,6 +31,7 @@ use k8s_openapi::{ use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action}; use maplit::btreemap; use rand::{Rng, thread_rng}; +use reqwest::StatusCode; use semver::{BuildMetadata, Prerelease, Version}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -127,6 +128,7 @@ impl Resources { client: &Client, args: &super::MaterializeControllerArgs, increment_generation: bool, + force_promote: bool, namespace: &str, ) -> Result<Option<Action>, anyhow::Error> { let environmentd_network_policy_api: Api<NetworkPolicy> = @@ -215,7 +217,19 @@ impl Resources { match http_client.get(status_url.clone()).send().await { Ok(response) => { let response: BTreeMap<String, DeploymentStatus> = response.json().await?; - if response["status"] == DeploymentStatus::Initializing { + if force_promote { + trace!("skipping cluster catchup"); + let skip_catchup_url = reqwest::Url::parse(&format!( + "http://{}/api/leader/skip-catchup", + environmentd_url + )) + .unwrap(); + let response = http_client.post(skip_catchup_url).send().await?; + if response.status() == StatusCode::BAD_REQUEST { + let err: SkipCatchupError = response.json().await?; + bail!("failed to skip catchup: {}", err.message); + } + } else if response["status"] == DeploymentStatus::Initializing { trace!("environmentd is still initializing, retrying..."); return Ok(Some(retry_action)); } else { @@ -1392,6 +1406,11 @@ enum BecomeLeaderResult { Failure { message: String }, } +#[derive(Debug, Deserialize, PartialEq, Eq)] +struct SkipCatchupError { + message: String, +} + fn environmentd_internal_http_address( args: &super::MaterializeControllerArgs, namespace: &str,