Skip to content

Commit 4fd0463

Browse files
committed
feat: Add API to activate/deactivate clusters
1 parent 525c8db commit 4fd0463

File tree

11 files changed

+571
-331
lines changed

11 files changed

+571
-331
lines changed

Cargo.lock

Lines changed: 327 additions & 324 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ repository = "https://github.com/stackabletech/trino-lb"
1717
[workspace.dependencies]
1818
askama = "0.14"
1919
axum = { version = "0.8", features = ["tracing"] }
20+
axum-extra = { version = "0.10", features = ["typed-header"] }
2021
# If we use the feature "tls-rustls" it will pull in the "aws-lc-rs" crate, which as of 2024-08-16 I did not get to build in the "make run-dev" workflow :/
2122
axum-server = { version = "0.7", features = ["tls-rustls-no-provider"] }
2223
bincode = { version = "2.0", features = ["serde"] }

deploy/helm/trino-lb/configs/trino-lb-config.yaml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ trinoLb:
44
enabled: true
55
certPemFile: /certificates/cert.pem
66
keyPemFile: /certificates/key.pem
7+
adminAuthentication:
8+
basicAuth:
9+
username: admin
10+
password: admin
711
persistence:
812
redis:
913
clusterMode: true
@@ -35,7 +39,7 @@ trinoClusterGroups:
3539
min: 0
3640
trinoClusters:
3741
- name: trino-s-1
38-
endpoint: https://trino-s-1-coordinator-default.default.svc.cluster.local:8443
42+
endpoint: https://trino-s-1-coordinator.default.svc.cluster.local:8443
3943
credentials: &common-credentials
4044
username: admin
4145
password: adminadmin
@@ -51,10 +55,10 @@ trinoClusterGroups:
5155
min: 1
5256
trinoClusters:
5357
- name: trino-m-1
54-
endpoint: https://trino-m-1-coordinator-default.default.svc.cluster.local:8443
58+
endpoint: https://trino-m-1-coordinator.default.svc.cluster.local:8443
5559
credentials: *common-credentials
5660
- name: trino-m-2
57-
endpoint: https://trino-m-2-coordinator-default.default.svc.cluster.local:8443
61+
endpoint: https://trino-m-2-coordinator.default.svc.cluster.local:8443
5862
credentials: *common-credentials
5963
# oidc:
6064
# maxRunningQueries: 3

deploy/helm/trino-lb/templates/trinos.yaml

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ metadata:
88
namespace: default
99
spec:
1010
image:
11-
productVersion: "455"
11+
productVersion: "476"
1212
clusterConfig:
1313
catalogLabelSelector:
1414
matchLabels:
@@ -18,6 +18,10 @@ spec:
1818
coordinators:
1919
roleConfig:
2020
listenerClass: external-stable
21+
config:
22+
resources:
23+
cpu:
24+
min: 50m
2125
configOverrides: &configOverrides
2226
config.properties:
2327
tracing.enabled: "true"
@@ -27,6 +31,10 @@ spec:
2731
replicas: 1
2832
workers:
2933
configOverrides: *configOverrides
34+
config:
35+
resources:
36+
cpu:
37+
min: 50m
3038
roleGroups:
3139
default:
3240
replicas: 1
@@ -41,7 +49,7 @@ metadata:
4149
namespace: default
4250
spec:
4351
image:
44-
productVersion: "455"
52+
productVersion: "476"
4553
clusterConfig:
4654
catalogLabelSelector:
4755
matchLabels:
@@ -51,6 +59,10 @@ spec:
5159
coordinators:
5260
roleConfig:
5361
listenerClass: external-stable
62+
config:
63+
resources:
64+
cpu:
65+
min: 50m
5466
configOverrides: &configOverrides
5567
config.properties:
5668
tracing.enabled: "true"
@@ -60,6 +72,10 @@ spec:
6072
replicas: 1
6173
workers:
6274
configOverrides: *configOverrides
75+
config:
76+
resources:
77+
cpu:
78+
min: 50m
6379
roleGroups:
6480
default:
6581
replicas: 1

trino-lb-core/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ pub struct TrinoLbConfig {
5353

5454
pub persistence: PersistenceConfig,
5555

56+
pub admin_authentication: Option<TrinoLbAdminAuthenticationConfig>,
57+
5658
#[serde(default)]
5759
pub tls: TrinoLbTlsConfig,
5860

@@ -170,6 +172,12 @@ fn default_max_connections() -> u32 {
170172
10
171173
}
172174

175+
#[derive(Clone, Debug, Deserialize)]
176+
#[serde(deny_unknown_fields, rename_all = "camelCase")]
177+
pub enum TrinoLbAdminAuthenticationConfig {
178+
BasicAuth { username: String, password: String },
179+
}
180+
173181
#[derive(Clone, Debug, Deserialize)]
174182
#[serde(deny_unknown_fields, rename_all = "camelCase")]
175183
pub struct TrinoClusterGroupConfig {
@@ -349,3 +357,12 @@ impl Config {
349357
.context(ParseConfigFileSnafu { config_file })
350358
}
351359
}
360+
361+
impl Config {
362+
pub fn cluster_in_config(&self, cluster_name: &TrinoClusterName) -> bool {
363+
self.trino_cluster_groups
364+
.values()
365+
.flat_map(|cluster_group| &cluster_group.trino_clusters)
366+
.any(|cluster| &cluster.name == cluster_name)
367+
}
368+
}

trino-lb-core/src/trino_cluster.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ impl ClusterState {
5353
}
5454
}
5555

56+
pub fn turn_ready_if_not_deactivated(&self) -> Self {
57+
match self {
58+
ClusterState::Deactivated => Self::Deactivated,
59+
_ => ClusterState::Ready,
60+
}
61+
}
62+
5663
pub fn can_be_started(&self) -> bool {
5764
match self {
5865
ClusterState::Stopped | ClusterState::Draining { .. } => true,

trino-lb/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ trino-lb-core = { path = "../trino-lb-core" }
1616
trino-lb-persistence = { path = "../trino-lb-persistence" }
1717

1818
askama.workspace = true
19+
axum-extra.workspace = true
1920
axum-server.workspace = true
2021
axum.workspace = true
2122
chrono.workspace = true

trino-lb/src/cluster_group_manager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use axum::{Json, body::Body, response::IntoResponse};
88
use futures::future::try_join_all;
99
use http::{HeaderMap, StatusCode};
1010
use reqwest::Client;
11+
use serde::{Deserialize, Serialize};
1112
use snafu::{OptionExt, ResultExt, Snafu};
1213
use tracing::{Instrument, debug, info_span, instrument};
1314
use tracing_opentelemetry::OpenTelemetrySpanExt;
@@ -87,7 +88,7 @@ pub struct TrinoCluster {
8788
pub endpoint: Url,
8889
}
8990

90-
#[derive(Clone, Debug)]
91+
#[derive(Clone, Debug, Serialize, Deserialize)]
9192
pub struct ClusterStats {
9293
pub state: ClusterState,
9394
pub query_counter: u64,
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use std::{collections::BTreeMap, sync::Arc};
2+
3+
use axum::{
4+
Json,
5+
extract::{Path, State},
6+
response::{IntoResponse, Response},
7+
};
8+
use axum_extra::{
9+
TypedHeader,
10+
headers::{Authorization, authorization::Basic},
11+
};
12+
use http::StatusCode;
13+
use opentelemetry::KeyValue;
14+
use serde::{Deserialize, Serialize};
15+
use snafu::{ResultExt, Snafu, ensure};
16+
use tracing::{info, instrument, warn};
17+
use trino_lb_core::{
18+
TrinoClusterName, config::TrinoLbAdminAuthenticationConfig, trino_cluster::ClusterState,
19+
};
20+
use trino_lb_persistence::Persistence;
21+
22+
use crate::{
23+
cluster_group_manager::{self, ClusterStats},
24+
http_server::AppState,
25+
};
26+
27+
#[derive(Snafu, Debug)]
28+
pub enum Error {
29+
#[snafu(display("No admin authentication method defined"))]
30+
NoAdminAuthenticationMethodDefined,
31+
32+
#[snafu(display("Invalid admin credentials"))]
33+
InvalidAdminCredentials,
34+
35+
#[snafu(display("Unknown Trino cluster {cluster:?}"))]
36+
UnknownCluster { cluster: TrinoClusterName },
37+
38+
#[snafu(display("Failed to set cluster state for cluster {cluster:?} in persistence"))]
39+
SetClusterStateInPersistence {
40+
source: trino_lb_persistence::Error,
41+
cluster: TrinoClusterName,
42+
},
43+
44+
#[snafu(display("Failed to get all cluster states"))]
45+
GetAllClusterStates {
46+
source: cluster_group_manager::Error,
47+
},
48+
}
49+
50+
impl IntoResponse for Error {
51+
fn into_response(self) -> Response {
52+
warn!(error = ?self, "Error while processing admin request");
53+
let status_code = match self {
54+
Error::NoAdminAuthenticationMethodDefined => StatusCode::UNAUTHORIZED,
55+
Error::InvalidAdminCredentials => StatusCode::UNAUTHORIZED,
56+
Error::UnknownCluster { .. } => StatusCode::NOT_FOUND,
57+
Error::SetClusterStateInPersistence { .. } => StatusCode::INTERNAL_SERVER_ERROR,
58+
Error::GetAllClusterStates { .. } => StatusCode::INTERNAL_SERVER_ERROR,
59+
};
60+
(status_code, format!("{self}")).into_response()
61+
}
62+
}
63+
64+
/// (Re)-Activates a Trino Cluster, so that it receives new queries.
65+
///
66+
/// This is useful for maintenance actions (in combination with deactivation).
67+
#[instrument(name = "POST /admin/activate-cluster/{cluster_name}", skip(state))]
68+
pub async fn post_activate_cluster(
69+
TypedHeader(Authorization(basic_auth)): TypedHeader<Authorization<Basic>>,
70+
State(state): State<Arc<AppState>>,
71+
Path(cluster_name): Path<TrinoClusterName>,
72+
) -> Result<Json<ClusterActivationResponse>, Error> {
73+
state
74+
.metrics
75+
.http_counter
76+
.add(1, &[KeyValue::new("resource", "post_activate_cluster")]);
77+
78+
set_cluster_activation(state, basic_auth, &cluster_name, true).await
79+
}
80+
81+
/// Deactivate a Trino Cluster, so that it doesn't receive any new queries.
82+
///
83+
/// This is useful for maintenance actions (in combination with activation).
84+
#[instrument(name = "POST /admin/deactivate-cluster/{cluster_name}", skip(state))]
85+
pub async fn post_deactivate_cluster(
86+
TypedHeader(Authorization(basic_auth)): TypedHeader<Authorization<Basic>>,
87+
State(state): State<Arc<AppState>>,
88+
Path(cluster_name): Path<TrinoClusterName>,
89+
) -> Result<Json<ClusterActivationResponse>, Error> {
90+
state
91+
.metrics
92+
.http_counter
93+
.add(1, &[KeyValue::new("resource", "post_deactivate_cluster")]);
94+
95+
set_cluster_activation(state, basic_auth, &cluster_name, false).await
96+
}
97+
98+
/// Get the status of the Trino clusters
99+
#[instrument(name = "GET /admin/cluster-status", skip(state))]
100+
pub async fn get_cluster_status(
101+
State(state): State<Arc<AppState>>,
102+
) -> Result<Json<BTreeMap<TrinoClusterName, ClusterStats>>, Error> {
103+
state
104+
.metrics
105+
.http_counter
106+
.add(1, &[KeyValue::new("resource", "get_cluster_status")]);
107+
108+
let cluster_stats = state
109+
.cluster_group_manager
110+
.get_all_cluster_stats()
111+
.await
112+
.context(GetAllClusterStatesSnafu)?;
113+
114+
Ok(Json(
115+
cluster_stats
116+
.into_iter()
117+
.map(|(cluster, stats)| (cluster.name.clone(), stats))
118+
.collect(),
119+
))
120+
}
121+
122+
#[derive(Debug, Deserialize, Serialize)]
123+
pub struct ClusterActivationResponse {
124+
state: ClusterState,
125+
}
126+
127+
#[instrument(skip(state))]
128+
async fn set_cluster_activation(
129+
state: Arc<AppState>,
130+
basic_auth: Basic,
131+
cluster_name: &TrinoClusterName,
132+
activation: bool,
133+
) -> Result<Json<ClusterActivationResponse>, Error> {
134+
match &state.config.trino_lb.admin_authentication {
135+
Some(TrinoLbAdminAuthenticationConfig::BasicAuth { username, password }) => {
136+
ensure!(
137+
basic_auth.username() == username && basic_auth.password() == password,
138+
InvalidAdminCredentialsSnafu {}
139+
);
140+
}
141+
None => return Err(Error::NoAdminAuthenticationMethodDefined),
142+
}
143+
144+
ensure!(
145+
state.config.cluster_in_config(cluster_name),
146+
UnknownClusterSnafu {
147+
cluster: cluster_name
148+
}
149+
);
150+
151+
let desired_state = if activation {
152+
info!(cluster = cluster_name, "Re-activating Trino cluster");
153+
ClusterState::Unknown
154+
} else {
155+
info!(cluster = cluster_name, "Deactivating Trino cluster");
156+
ClusterState::Deactivated
157+
};
158+
159+
state
160+
.persistence
161+
.set_cluster_state(cluster_name, desired_state.clone())
162+
.await
163+
.context(SetClusterStateInPersistenceSnafu {
164+
cluster: cluster_name,
165+
})?;
166+
167+
Ok(Json(ClusterActivationResponse {
168+
state: desired_state,
169+
}))
170+
}

trino-lb/src/http_server/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::{
2323
cluster_group_manager::ClusterGroupManager, config::Config, metrics::Metrics, routing,
2424
};
2525

26+
mod admin;
2627
mod metrics;
2728
mod ui;
2829
mod v1;
@@ -121,6 +122,15 @@ pub async fn start_http_server(
121122
"/v1/statement/executing/{query_id}/{slug}/{token}",
122123
delete(v1::statement::delete_trino_executing_statement),
123124
)
125+
.route(
126+
"/admin/activate-cluster/{cluster_name}",
127+
post(admin::post_activate_cluster),
128+
)
129+
.route(
130+
"/admin/deactivate-cluster/{cluster_name}",
131+
post(admin::post_deactivate_cluster),
132+
)
133+
.route("/admin/cluster-status", get(admin::get_cluster_status))
124134
.route("/ui/index.html", get(ui::index::get_ui_index))
125135
.route("/ui/query.html", get(ui::query::get_ui_query))
126136
.layer(TraceLayer::new_for_http())

0 commit comments

Comments
 (0)