Skip to content

Commit 01c03b3

Browse files
committed
feat(executor): support https schema for subgraph calls
feat(router, config): add support for loading supergraph from hive feat(config): improve file-loading for supergraph
1 parent 2abfb59 commit 01c03b3

File tree

13 files changed

+820
-39
lines changed

13 files changed

+820
-39
lines changed

Cargo.lock

Lines changed: 625 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,4 @@ thiserror = "2.0.14"
4949
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
5050
tokio = { version = "1.47.1", features = ["full"] }
5151
rand = "0.9.2"
52+
reqwest = "0.12.23"

bin/router/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ futures = { workspace = true }
2525
graphql-parser = { workspace = true }
2626
graphql-tools = { workspace = true }
2727
serde = { workspace = true }
28+
reqwest = { workspace = true }
2829
sonic-rs = { workspace = true }
2930
tracing = { workspace = true }
3031
tracing-subscriber = { workspace = true }

bin/router/src/lib.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod http_utils;
22
mod logger;
33
mod pipeline;
44
mod shared_state;
5+
mod supergraph;
56

67
use std::sync::Arc;
78

@@ -18,8 +19,6 @@ use ntex::{
1819
web::{self, HttpRequest},
1920
};
2021

21-
use hive_router_query_planner::utils::parsing::parse_schema;
22-
2322
async fn graphql_endpoint_handler(
2423
mut request: HttpRequest,
2524
body_bytes: Bytes,
@@ -33,10 +32,8 @@ pub async fn router_entrypoint() -> Result<(), Box<dyn std::error::Error>> {
3332
let router_config = load_config(config_path)?;
3433
configure_logging(&router_config.log);
3534

36-
let supergraph_sdl = router_config.supergraph.load().await?;
37-
let parsed_schema = parse_schema(&supergraph_sdl);
3835
let addr = router_config.http.address();
39-
let shared_state = RouterSharedState::new(parsed_schema, router_config);
36+
let shared_state = RouterSharedState::new(router_config).await?;
4037

4138
web::HttpServer::new(move || {
4239
web::App::new()

bin/router/src/shared_state.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
use std::sync::Arc;
22

3-
use graphql_parser::schema::Document;
43
use graphql_tools::validation::{utils::ValidationError, validate::ValidationPlan};
54
use hive_router_config::HiveRouterConfig;
65
use hive_router_plan_executor::{
6+
executors::error::SubgraphExecutorError,
77
introspection::schema::{SchemaMetadata, SchemaWithMetadata},
88
SubgraphExecutorMap,
99
};
1010
use hive_router_query_planner::{
11-
planner::{plan_nodes::QueryPlan, Planner},
11+
planner::{plan_nodes::QueryPlan, Planner, PlannerError},
1212
state::supergraph_state::SupergraphState,
13+
utils::parsing::parse_schema,
1314
};
1415
use moka::future::Cache;
1516

16-
use crate::pipeline::normalize::GraphQLNormalizationPayload;
17+
use crate::{
18+
pipeline::normalize::GraphQLNormalizationPayload,
19+
supergraph::{base::LoadSupergraphError, resolve_from_config},
20+
};
1721

1822
pub struct RouterSharedState {
1923
pub schema_metadata: SchemaMetadata,
@@ -27,23 +31,32 @@ pub struct RouterSharedState {
2731
pub router_config: HiveRouterConfig,
2832
}
2933

34+
#[derive(Debug, thiserror::Error)]
35+
pub enum RouterSharedStateError {
36+
#[error("Failed to load supergraph: {0}")]
37+
SupergraphInitError(#[from] LoadSupergraphError),
38+
#[error("Failed to init planner: {0}")]
39+
PlannerInitError(#[from] PlannerError),
40+
#[error("Failed to init executor: {0}")]
41+
ExecutorInitError(#[from] SubgraphExecutorError),
42+
}
43+
3044
impl RouterSharedState {
31-
pub fn new(
32-
parsed_supergraph_sdl: Document<'static, String>,
33-
router_config: HiveRouterConfig,
34-
) -> Arc<Self> {
45+
pub async fn new(router_config: HiveRouterConfig) -> Result<Arc<Self>, RouterSharedStateError> {
46+
let mut supergraph_source_loader = resolve_from_config(&router_config.supergraph).await?;
47+
supergraph_source_loader.reload().await?;
48+
let supergraph_sdl = supergraph_source_loader.current().unwrap();
49+
let parsed_supergraph_sdl = parse_schema(supergraph_sdl);
3550
let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
36-
let planner =
37-
Planner::new_from_supergraph(&parsed_supergraph_sdl).expect("failed to create planner");
51+
let planner = Planner::new_from_supergraph(&parsed_supergraph_sdl)?;
3852
let schema_metadata = planner.consumer_schema.schema_metadata();
3953

4054
let subgraph_executor_map = SubgraphExecutorMap::from_http_endpoint_map(
4155
supergraph_state.subgraph_endpoint_map,
4256
router_config.traffic_shaping.clone(),
43-
)
44-
.expect("Failed to create subgraph executor map");
57+
)?;
4558

46-
Arc::new(Self {
59+
Ok(Arc::new(Self {
4760
schema_metadata,
4861
planner,
4962
validation_plan: graphql_tools::validation::rules::default_rules_validation_plan(),
@@ -53,6 +66,6 @@ impl RouterSharedState {
5366
parse_cache: moka::future::Cache::new(1000),
5467
normalize_cache: moka::future::Cache::new(1000),
5568
router_config,
56-
})
69+
}))
5770
}
5871
}

bin/router/src/supergraph/base.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use async_trait::async_trait;
2+
3+
#[derive(Debug, thiserror::Error)]
4+
pub enum LoadSupergraphError {
5+
#[error("Failed to read supergraph file: {0}")]
6+
ReadFileError(#[from] std::io::Error),
7+
#[error("Failed to read supergraph from network: {0}")]
8+
NetworkError(#[from] reqwest::Error),
9+
}
10+
11+
#[async_trait]
12+
pub trait SupergraphLoader {
13+
async fn reload(&mut self) -> Result<(), LoadSupergraphError>;
14+
fn current(&self) -> Option<&str>;
15+
}

bin/router/src/supergraph/file.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use async_trait::async_trait;
2+
use hive_router_config::primitives::file_path::FilePath;
3+
use tracing::{debug, trace};
4+
5+
use crate::supergraph::base::{LoadSupergraphError, SupergraphLoader};
6+
7+
pub struct SupergraphFileLoader {
8+
file_path: FilePath,
9+
current: Option<String>,
10+
}
11+
12+
#[async_trait]
13+
impl SupergraphLoader for SupergraphFileLoader {
14+
async fn reload(&mut self) -> Result<(), LoadSupergraphError> {
15+
debug!(
16+
"Reloading supergraph from file path: '{}'",
17+
self.file_path.0
18+
);
19+
let content = tokio::fs::read_to_string(&self.file_path.0).await?;
20+
trace!(
21+
"Supergraph loaded from file path: '{}', content: {}",
22+
self.file_path.0,
23+
content
24+
);
25+
26+
self.current = Some(content);
27+
Ok(())
28+
}
29+
30+
fn current(&self) -> Option<&str> {
31+
self.current.as_deref()
32+
}
33+
}
34+
35+
impl SupergraphFileLoader {
36+
pub async fn new(file_path: &FilePath) -> Result<Box<Self>, LoadSupergraphError> {
37+
debug!(
38+
"Creating supergraph source from file path: '{}'",
39+
file_path.0
40+
);
41+
42+
Ok(Box::new(Self {
43+
file_path: file_path.clone(),
44+
current: None,
45+
}))
46+
}
47+
}

bin/router/src/supergraph/hive.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use async_trait::async_trait;
2+
use http::header::USER_AGENT;
3+
use tracing::debug;
4+
5+
use crate::supergraph::base::{LoadSupergraphError, SupergraphLoader};
6+
7+
static USER_AGENT_VALUE: &str = "hive-router";
8+
static TIMEOUT: u64 = 10;
9+
static AUTH_HEADER_NAME: &str = "X-Hive-CDN-Key";
10+
11+
pub struct SupergraphHiveConsoleLoader {
12+
endpoint: String,
13+
key: String,
14+
current: Option<String>,
15+
http_client: reqwest::Client,
16+
}
17+
18+
#[async_trait]
19+
impl SupergraphLoader for SupergraphHiveConsoleLoader {
20+
async fn reload(&mut self) -> Result<(), LoadSupergraphError> {
21+
debug!(
22+
"Fetching supergraph from Hive Console CDN: '{}'",
23+
self.endpoint
24+
);
25+
26+
let response = self
27+
.http_client
28+
.get(&self.endpoint)
29+
.header(AUTH_HEADER_NAME, &self.key)
30+
.header(USER_AGENT, USER_AGENT_VALUE)
31+
.timeout(std::time::Duration::from_secs(TIMEOUT))
32+
.send()
33+
.await
34+
.map_err(LoadSupergraphError::NetworkError)?;
35+
36+
let content = response
37+
.text()
38+
.await
39+
.map_err(LoadSupergraphError::NetworkError)?;
40+
41+
self.current = Some(content);
42+
Ok(())
43+
}
44+
45+
fn current(&self) -> Option<&str> {
46+
self.current.as_deref()
47+
}
48+
}
49+
50+
impl SupergraphHiveConsoleLoader {
51+
pub async fn new(endpoint: &str, key: &str) -> Result<Box<Self>, LoadSupergraphError> {
52+
debug!(
53+
"Creating supergraph source from Hive Console CDN: '{}'",
54+
endpoint
55+
);
56+
57+
Ok(Box::new(Self {
58+
endpoint: endpoint.to_string(),
59+
key: key.to_string(),
60+
current: None,
61+
http_client: reqwest::Client::new(),
62+
}))
63+
}
64+
}

bin/router/src/supergraph/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use hive_router_config::supergraph::SupergraphSource;
2+
3+
use crate::supergraph::{
4+
base::{LoadSupergraphError, SupergraphLoader},
5+
file::SupergraphFileLoader,
6+
hive::SupergraphHiveConsoleLoader,
7+
};
8+
use tracing::debug;
9+
10+
pub mod base;
11+
pub mod file;
12+
pub mod hive;
13+
14+
pub async fn resolve_from_config(
15+
config: &SupergraphSource,
16+
) -> Result<Box<dyn SupergraphLoader>, LoadSupergraphError> {
17+
debug!("Resolving supergraph from config: {:?}", config);
18+
19+
match config {
20+
SupergraphSource::File { path } => Ok(SupergraphFileLoader::new(path).await?),
21+
SupergraphSource::HiveConsole { endpoint, key } => {
22+
Ok(SupergraphHiveConsoleLoader::new(endpoint, key).await?)
23+
}
24+
}
25+
}

lib/executor/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ tracing = { workspace = true }
2828
thiserror = { workspace = true }
2929
xxhash-rust = { workspace = true }
3030
tokio = { workspace = true, features = ["sync"] }
31+
3132
dashmap = "6.0.0"
3233
ahash = "0.8.12"
33-
34+
hyper-tls = "0.6.0"
3435
hyper-util = { version = "0.1.16", features = [
3536
"client",
3637
"client-legacy",

0 commit comments

Comments
 (0)