From ffa206b25e9ba12c6f58377ec6722f1fb7fe07a1 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 17:09:44 +0300 Subject: [PATCH 01/20] Subgraph Timeout Configuration --- docs/README.md | 66 ++++++++++++- lib/executor/src/execution/plan.rs | 1 + lib/executor/src/executors/common.rs | 3 + lib/executor/src/executors/error.rs | 4 + lib/executor/src/executors/http.rs | 53 +++++++++-- lib/executor/src/executors/map.rs | 115 ++++++++++++++++++----- lib/executor/src/executors/mod.rs | 1 + lib/executor/src/executors/timeout.rs | 111 ++++++++++++++++++++++ lib/router-config/src/lib.rs | 14 ++- lib/router-config/src/traffic_shaping.rs | 31 ++++++ 10 files changed, 363 insertions(+), 36 deletions(-) create mode 100644 lib/executor/src/executors/timeout.rs diff --git a/docs/README.md b/docs/README.md index abf6278df..17baa8376 100644 --- a/docs/README.md +++ b/docs/README.md @@ -11,7 +11,7 @@ |[**log**](#log)|`object`|The router logger configuration.
Default: `{"filter":null,"format":"json","level":"info"}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
Default: `{"path":"supergraph.graphql","source":"file"}`
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}}`
|| **Additional Properties:** not allowed **Example** @@ -64,9 +64,10 @@ supergraph: path: supergraph.graphql source: file traffic_shaping: - dedupe_enabled: true - max_connections_per_host: 100 - pool_idle_timeout_seconds: 50 + all: + dedupe_enabled: true + max_connections_per_host: 100 + pool_idle_timeout_seconds: 50 ``` @@ -1362,6 +1363,62 @@ source: file Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
|| + +**Example** + +```yaml +all: + dedupe_enabled: true + max_connections_per_host: 100 + pool_idle_timeout_seconds: 50 + +``` + + +### traffic\_shaping\.all: object + +The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration. + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| +|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| +|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| +|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.operation.kind == "mutation") {
10s
} else {
5s
}
```
|| + +**Example** + +```yaml +dedupe_enabled: true +max_connections_per_host: 100 +pool_idle_timeout_seconds: 50 + +``` + + +### traffic\_shaping\.subgraphs: object + +Optional per-subgraph configurations that will override the default configuration for specific subgraphs. + + +**Additional Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|[**Additional Properties**](#traffic_shapingsubgraphsadditionalproperties)|`object`||| + + +#### traffic\_shaping\.subgraphs\.additionalProperties: object + **Properties** |Name|Type|Description|Required| @@ -1369,6 +1426,7 @@ Configuration for the traffic-shaper executor. Use these configurations to contr |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| +|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.operation.kind == "mutation") {
10s
} else {
5s
}
```
|| **Example** diff --git a/lib/executor/src/execution/plan.rs b/lib/executor/src/execution/plan.rs index cb1edde28..3e2e434c1 100644 --- a/lib/executor/src/execution/plan.rs +++ b/lib/executor/src/execution/plan.rs @@ -726,6 +726,7 @@ impl<'exec> Executor<'exec> { variables: variable_refs, representations, headers: headers_map, + client_request: self.client_request, }, ) .await diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index 6a053bdd5..f85fc49d4 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -4,6 +4,8 @@ use async_trait::async_trait; use bytes::Bytes; use http::HeaderMap; +use crate::execution::plan::ClientRequestDetails; + #[async_trait] pub trait SubgraphExecutor { async fn execute<'a>( @@ -30,6 +32,7 @@ pub struct HttpExecutionRequest<'a> { pub variables: Option>, pub headers: HeaderMap, pub representations: Option>, + pub client_request: &'a ClientRequestDetails<'a>, } pub struct HttpExecutionResponse { diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 501855220..2fb67e851 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + #[derive(thiserror::Error, Debug, Clone)] pub enum SubgraphExecutorError { #[error("Failed to parse endpoint \"{0}\" as URI: {1}")] @@ -8,4 +10,6 @@ pub enum SubgraphExecutorError { RequestFailure(String, String), #[error("Failed to serialize variable \"{0}\": {1}")] VariablesSerializationFailure(String, String), + #[error("Request to subgraph \"{0}\" timed out after {1:?}")] + RequestTimeout(String, Duration), } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index e1a285430..641d6f5c6 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,8 +1,10 @@ use std::sync::Arc; +use std::time::Duration; -use crate::executors::common::HttpExecutionResponse; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; +use crate::executors::timeout::{ExpressionContext, HTTPTimeout}; use dashmap::DashMap; +use futures::TryFutureExt; use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig; use tokio::sync::OnceCell; @@ -17,8 +19,9 @@ use hyper::Version; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client}; use tokio::sync::Semaphore; +use tracing::warn; -use crate::executors::common::HttpExecutionRequest; +use crate::executors::common::{HttpExecutionRequest, HttpExecutionResponse}; use crate::executors::error::SubgraphExecutorError; use crate::response::graphql_error::GraphQLError; use crate::utils::consts::CLOSE_BRACE; @@ -35,6 +38,7 @@ pub struct HTTPSubgraphExecutor { pub semaphore: Arc, pub config: Arc, pub in_flight_requests: Arc>, ABuildHasher>>, + pub timeout: Option, } const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{"; @@ -58,6 +62,21 @@ impl HTTPSubgraphExecutor { HeaderValue::from_static("keep-alive"), ); + let timeout = if let Some(timeout_config) = &config.timeout { + match HTTPTimeout::try_from(timeout_config) { + Ok(timeout) => Some(timeout), + Err(diagnostic) => { + warn!( + "Failed to parse timeout expression for subgraph {}: {:#?}", + endpoint, diagnostic + ); + None + } + } + } else { + None + }; + Self { endpoint, http_client, @@ -65,6 +84,7 @@ impl HTTPSubgraphExecutor { semaphore, config, in_flight_requests, + timeout, } } @@ -120,6 +140,7 @@ impl HTTPSubgraphExecutor { &self, body: Vec, headers: HeaderMap, + timeout: Option, ) -> Result { let mut req = hyper::Request::builder() .method(http::Method::POST) @@ -132,11 +153,21 @@ impl HTTPSubgraphExecutor { *req.headers_mut() = headers; - let res = self.http_client.request(req).await.map_err(|e| { + let request_op = self.http_client.request(req).map_err(|e| { SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - })?; + }); - let (parts, body) = res.into_parts(); + let res = if let Some(timeout) = timeout { + tokio::time::timeout(timeout, request_op) + .await + .map_err(|_| { + SubgraphExecutorError::RequestTimeout(self.endpoint.to_string(), timeout) + })? + } else { + request_op.await + }; + + let (parts, body) = res?.into_parts(); Ok(SharedResponse { status: parts.status, @@ -193,7 +224,11 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - return match self._send_request(body, headers).await { + let ctx = ExpressionContext { + client_request: execution_request.client_request, + }; + let timeout = self.get_timeout_duration(&ctx); + return match self._send_request(body, headers, timeout).await { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body, headers: shared_response.headers, @@ -218,11 +253,15 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { let response_result = cell .get_or_try_init(|| async { + let ctx = ExpressionContext { + client_request: execution_request.client_request, + }; + let timeout = self.get_timeout_duration(&ctx); let res = { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - self._send_request(body, headers).await + self._send_request(body, headers, timeout).await }; // It's important to remove the entry from the map before returning the result. // This ensures that once the OnceCell is set, no future requests can join it. diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index ff18d118c..e31c7cfbe 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -1,12 +1,13 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig; +use hive_router_config::{traffic_shaping::TrafficShapingExecutorConfig, TrafficShapingConfig}; use http::Uri; +use http_body_util::Full; use hyper_tls::HttpsConnector; use hyper_util::{ - client::legacy::Client, + client::legacy::{connect::HttpConnector, Client}, rt::{TokioExecutor, TokioTimer}, }; use tokio::sync::{OnceCell, Semaphore}; @@ -74,21 +75,14 @@ impl SubgraphExecutorMap { pub fn from_http_endpoint_map( subgraph_endpoint_map: HashMap, - config: TrafficShapingExecutorConfig, + config: TrafficShapingConfig, ) -> Result { - let https = HttpsConnector::new(); - let client = Client::builder(TokioExecutor::new()) - .pool_timer(TokioTimer::new()) - .pool_idle_timeout(Duration::from_secs(config.pool_idle_timeout_seconds)) - .pool_max_idle_per_host(config.max_connections_per_host) - .build(https); - - let client_arc = Arc::new(client); - let semaphores_by_origin: DashMap> = DashMap::new(); - let max_connections_per_host = config.max_connections_per_host; - let config_arc = Arc::new(config); - let in_flight_requests: Arc>, ABuildHasher>> = - Arc::new(DashMap::with_hasher(ABuildHasher::default())); + let global_client_arc = from_traffic_shaping_config_to_client(&config.all); + let global_semaphores_by_origin: DashMap> = DashMap::new(); + let global_config_arc = Arc::new(config.all); + let global_in_flight_requests: Arc< + DashMap>, ABuildHasher>, + > = Arc::new(DashMap::with_hasher(ABuildHasher::default())); let executor_map = subgraph_endpoint_map .into_iter() @@ -110,17 +104,38 @@ impl SubgraphExecutorMap { }) ); - let semaphore = semaphores_by_origin - .entry(origin) - .or_insert_with(|| Arc::new(Semaphore::new(max_connections_per_host))) - .clone(); + let subgraph_config = config.subgraphs.get(&subgraph_name); + + let semaphore = get_semaphore_for_subgraph( + &origin, + &global_semaphores_by_origin, + subgraph_config + .map(|cfg| cfg.max_connections_per_host) + .unwrap_or(global_config_arc.max_connections_per_host), + global_config_arc.max_connections_per_host, + ); + + let http_client = get_http_client_for_subgraph( + subgraph_config, + &global_config_arc, + &global_client_arc, + ); + + // TODO: Maybe reuse the in-flight requests map if dedupe is the same + let inflight_requests = subgraph_config + .map(|_| Arc::new(DashMap::with_hasher(ABuildHasher::default()))) + .unwrap_or_else(|| global_in_flight_requests.clone()); + + let config_arc = subgraph_config + .map(|cfg| Arc::new(cfg.clone())) + .unwrap_or_else(|| global_config_arc.clone()); let executor = HTTPSubgraphExecutor::new( endpoint_uri, - client_arc.clone(), + http_client, semaphore, - config_arc.clone(), - in_flight_requests.clone(), + config_arc, + inflight_requests, ); Ok((subgraph_name, executor.to_boxed_arc())) @@ -132,3 +147,55 @@ impl SubgraphExecutorMap { }) } } + +// Create a new hyper client based on the traffic shaping config +fn from_traffic_shaping_config_to_client( + config: &TrafficShapingExecutorConfig, +) -> Arc, Full>> { + Arc::new( + Client::builder(TokioExecutor::new()) + .pool_timer(TokioTimer::new()) + .pool_idle_timeout(Duration::from_secs(config.pool_idle_timeout_seconds)) + .pool_max_idle_per_host(config.max_connections_per_host) + .build(HttpsConnector::new()), + ) +} + +// Reuse the global client if the subgraph config is the same as the global config +// Otherwise, create a new client based on the subgraph config +fn get_http_client_for_subgraph( + subgraph_config: Option<&TrafficShapingExecutorConfig>, + global_config: &TrafficShapingExecutorConfig, + global_client: &Arc, Full>>, +) -> Arc, Full>> { + match subgraph_config { + Some(cfg) => { + if global_config.max_connections_per_host == cfg.max_connections_per_host + && global_config.pool_idle_timeout_seconds == cfg.pool_idle_timeout_seconds + { + global_client.clone() + } else { + from_traffic_shaping_config_to_client(cfg) + } + } + None => global_client.clone(), + } +} + +// If the subgraph has a specific max_connections_per_host, create a new semaphore for it. +// Otherwise, reuse the global semaphore for that origin. +fn get_semaphore_for_subgraph( + origin: &str, + semaphores_by_origin: &DashMap>, + max_connections_per_host: usize, + global_max_connections_per_host: usize, +) -> Arc { + if max_connections_per_host == global_max_connections_per_host { + semaphores_by_origin + .entry(origin.to_string()) + .or_insert_with(|| Arc::new(Semaphore::new(global_max_connections_per_host))) + .clone() + } else { + Arc::new(Semaphore::new(max_connections_per_host)) + } +} diff --git a/lib/executor/src/executors/mod.rs b/lib/executor/src/executors/mod.rs index 520ff5f94..4b64bda51 100644 --- a/lib/executor/src/executors/mod.rs +++ b/lib/executor/src/executors/mod.rs @@ -3,3 +3,4 @@ pub mod dedupe; pub mod error; pub mod http; pub mod map; +pub mod timeout; diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs new file mode 100644 index 000000000..b4ef9cc28 --- /dev/null +++ b/lib/executor/src/executors/timeout.rs @@ -0,0 +1,111 @@ +use std::collections::BTreeMap; +use std::time::Duration; + +use hive_router_config::traffic_shaping::HTTPTimeoutConfig; +use tracing::warn; +use vrl::compiler::Program as VrlProgram; +use vrl::diagnostic::DiagnosticList; + +use crate::execution::plan::ClientRequestDetails; +use crate::executors::http::HTTPSubgraphExecutor; +use vrl::{ + compiler::TargetValue as VrlTargetValue, + core::Value as VrlValue, + prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, + value::Secrets as VrlSecrets, +}; + +use vrl::{compiler::compile as vrl_compile, stdlib::all as vrl_build_functions}; + +#[derive(Debug)] +pub enum HTTPTimeout { + Expression(Box), + Duration(Duration), +} + +impl TryFrom<&HTTPTimeoutConfig> for HTTPTimeout { + type Error = DiagnosticList; + fn try_from(timeout_config: &HTTPTimeoutConfig) -> Result { + match timeout_config { + HTTPTimeoutConfig::Duration(dur) => Ok(HTTPTimeout::Duration(*dur)), + HTTPTimeoutConfig::Expression(expr) => { + // Compile the VRL expression into a Program + let functions = vrl_build_functions(); + let compilation_result = vrl_compile(expr, &functions)?; + Ok(HTTPTimeout::Expression(Box::new( + compilation_result.program, + ))) + } + } + } +} + +pub struct ExpressionContext<'a> { + pub client_request: &'a ClientRequestDetails<'a>, +} + +impl From<&ExpressionContext<'_>> for VrlValue { + fn from(ctx: &ExpressionContext) -> Self { + // .request + let request_value: Self = ctx.client_request.into(); + + Self::Object(BTreeMap::from([("request".into(), request_value)])) + } +} + +fn warn_unsupported_conversion_option(type_name: &str) -> Option { + warn!( + "Cannot convert VRL {} value to a Duration value. Please convert it to a number first.", + type_name + ); + + None +} + +fn vrl_value_to_duration(value: VrlValue) -> Option { + match value { + VrlValue::Integer(i) => Some(Duration::from_millis(u64::from_ne_bytes(i.to_ne_bytes()))), + VrlValue::Bytes(_) => warn_unsupported_conversion_option("Bytes"), + VrlValue::Float(_) => warn_unsupported_conversion_option("Float"), + VrlValue::Boolean(_) => warn_unsupported_conversion_option("Boolean"), + VrlValue::Array(_) => warn_unsupported_conversion_option("Array"), + VrlValue::Regex(_) => warn_unsupported_conversion_option("Regex"), + VrlValue::Timestamp(_) => warn_unsupported_conversion_option("Timestamp"), + VrlValue::Object(_) => warn_unsupported_conversion_option("Object"), + VrlValue::Null => { + warn!("Cannot convert VRL Null value to a url value."); + None + } + } +} + +impl HTTPSubgraphExecutor { + pub fn get_timeout_duration<'a>( + &self, + expression_context: &ExpressionContext<'a>, + ) -> Option { + self.timeout.as_ref().and_then(|timeout| { + match timeout { + HTTPTimeout::Duration(dur) => Some(*dur), + HTTPTimeout::Expression(program) => { + let mut target = VrlTargetValue { + value: VrlValue::from(expression_context), + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let timezone = VrlTimeZone::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + match program.resolve(&mut ctx) { + Ok(resolved) => vrl_value_to_duration(resolved), + Err(err) => { + warn!("Failed to evaluate timeout expression: {:#?}, falling back to no timeout.", err); + None + } + } + }, + } + }) + } +} diff --git a/lib/router-config/src/lib.rs b/lib/router-config/src/lib.rs index 38eefdfdc..08af2bacd 100644 --- a/lib/router-config/src/lib.rs +++ b/lib/router-config/src/lib.rs @@ -8,6 +8,8 @@ pub mod query_planner; pub mod supergraph; pub mod traffic_shaping; +use std::collections::HashMap; + use config::{Config, Environment, File, FileFormat, FileSourceFile}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -42,7 +44,7 @@ pub struct HiveRouterConfig { /// Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. #[serde(default)] - pub traffic_shaping: TrafficShapingExecutorConfig, + pub traffic_shaping: TrafficShapingConfig, /// Configuration for the headers. #[serde(default)] @@ -56,6 +58,16 @@ pub struct HiveRouterConfig { pub cors: cors::CORSConfig, } +#[derive(Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct TrafficShapingConfig { + /// The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration. + #[serde(default)] + pub all: TrafficShapingExecutorConfig, + /// Optional per-subgraph configurations that will override the default configuration for specific subgraphs. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub subgraphs: HashMap, +} + #[derive(Debug, thiserror::Error)] pub enum RouterConfigError { #[error("Failed to load configuration: {0}")] diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 595112e37..e8b4dbfd4 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -17,6 +19,34 @@ pub struct TrafficShapingExecutorConfig { /// be deduplicated by sharing the response of other in-flight requests. #[serde(default = "default_dedupe_enabled")] pub dedupe_enabled: bool, + + /// Optional timeout configuration for requests to subgraphs. + /// + /// Example with a fixed duration: + /// ```yaml + /// timeout: + /// duration: 5s + /// ``` + /// + /// Or with a VRL expression that can return a duration based on the operation kind: + /// ```yaml + /// timeout: + /// expression: | + /// if (.operation.kind == "mutation") { + /// 10s + /// } else { + /// 5s + /// } + /// ``` + #[serde(default, skip_serializing_if = "Option::is_none")] + pub timeout: Option, +} + +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +#[serde(rename_all = "camelCase")] +pub enum HTTPTimeoutConfig { + Expression(String), + Duration(Duration), } impl Default for TrafficShapingExecutorConfig { @@ -25,6 +55,7 @@ impl Default for TrafficShapingExecutorConfig { max_connections_per_host: default_max_connections_per_host(), pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(), dedupe_enabled: default_dedupe_enabled(), + timeout: None, } } } From 9a82b2a634b69e83cb5b090153bae308f030a213 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 17:42:44 +0300 Subject: [PATCH 02/20] Better impl --- lib/executor/src/executors/http.rs | 12 +- lib/executor/src/executors/timeout.rs | 133 +++++++++++++++++++---- lib/router-config/src/traffic_shaping.rs | 2 +- 3 files changed, 114 insertions(+), 33 deletions(-) diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 641d6f5c6..228015800 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Duration; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; -use crate::executors::timeout::{ExpressionContext, HTTPTimeout}; +use crate::executors::timeout::HTTPTimeout; use dashmap::DashMap; use futures::TryFutureExt; use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig; @@ -224,10 +224,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - let ctx = ExpressionContext { - client_request: execution_request.client_request, - }; - let timeout = self.get_timeout_duration(&ctx); + let timeout = self.get_timeout_duration(execution_request.client_request); return match self._send_request(body, headers, timeout).await { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body, @@ -253,10 +250,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { let response_result = cell .get_or_try_init(|| async { - let ctx = ExpressionContext { - client_request: execution_request.client_request, - }; - let timeout = self.get_timeout_duration(&ctx); + let timeout = self.get_timeout_duration(execution_request.client_request); let res = { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index b4ef9cc28..dd0ff3514 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -79,33 +79,120 @@ fn vrl_value_to_duration(value: VrlValue) -> Option { } } +fn get_timeout_duration<'a>( + timeout: &Option, + expression_context: &ExpressionContext<'a>, +) -> Option { + timeout.as_ref().and_then(|timeout| match timeout { + HTTPTimeout::Duration(dur) => Some(*dur), + HTTPTimeout::Expression(program) => { + let mut target = VrlTargetValue { + value: VrlValue::from(expression_context), + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let timezone = VrlTimeZone::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + match program.resolve(&mut ctx) { + Ok(resolved) => vrl_value_to_duration(resolved), + Err(err) => { + warn!( + "Failed to evaluate timeout expression: {:#?}, falling back to no timeout.", + err + ); + None + } + } + } + }) +} + impl HTTPSubgraphExecutor { pub fn get_timeout_duration<'a>( &self, - expression_context: &ExpressionContext<'a>, + client_request: &'a ClientRequestDetails<'a>, ) -> Option { - self.timeout.as_ref().and_then(|timeout| { - match timeout { - HTTPTimeout::Duration(dur) => Some(*dur), - HTTPTimeout::Expression(program) => { - let mut target = VrlTargetValue { - value: VrlValue::from(expression_context), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - match program.resolve(&mut ctx) { - Ok(resolved) => vrl_value_to_duration(resolved), - Err(err) => { - warn!("Failed to evaluate timeout expression: {:#?}, falling back to no timeout.", err); - None - } - } - }, + let expression_context = ExpressionContext { client_request }; + get_timeout_duration(&self.timeout, &expression_context) + } +} + +#[cfg(test)] +mod tests { + use http::Method; + use ntex_http::HeaderMap; + + use crate::{ + execution::plan::{ClientRequestDetails, OperationDetails}, + executors::timeout::get_timeout_duration, + }; + + #[test] + fn get_timeout_duration_from_expression() { + use std::time::Duration; + + use hive_router_config::traffic_shaping::HTTPTimeoutConfig; + + use crate::executors::timeout::HTTPTimeout; + + let timeout_config = HTTPTimeoutConfig::Expression( + r#" + if .request.operation.type == "mutation" { + 10000 + } else { + 5000 } - }) + "# + .to_string(), + ); + + let timeout = HTTPTimeout::try_from(&timeout_config).expect("Failed to create timeout"); + let headers = HeaderMap::new(); + + let client_request_mutation = crate::execution::plan::ClientRequestDetails { + operation: OperationDetails { + name: Some("TestMutation".to_string()), + kind: "mutation", + query: "mutation TestMutation { doSomething }".into(), + }, + url: "http://example.com/graphql".parse().unwrap(), + headers: &headers, + method: Method::POST, + }; + + let timeout = Some(timeout); + + let client_request_query = ClientRequestDetails { + operation: OperationDetails { + name: Some("TestQuery".to_string()), + kind: "query", + query: "query TestQuery { field }".into(), + }, + url: "http://example.com/graphql".parse().unwrap(), + headers: &headers, + method: Method::POST, + }; + + let query_ctx = crate::executors::timeout::ExpressionContext { + client_request: &client_request_query, + }; + let duration_query = get_timeout_duration(&timeout, &query_ctx); + assert_eq!( + duration_query, + Some(Duration::from_millis(5000)), + "Expected 5000ms for query" + ); + + let mutation_ctx = crate::executors::timeout::ExpressionContext { + client_request: &client_request_mutation, + }; + let duration_mutation = get_timeout_duration(&timeout, &mutation_ctx); + assert_eq!( + duration_mutation, + Some(Duration::from_millis(10000)), + "Expected 10000ms for mutation" + ); } } diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index e8b4dbfd4..89adb0a06 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -32,7 +32,7 @@ pub struct TrafficShapingExecutorConfig { /// ```yaml /// timeout: /// expression: | - /// if (.operation.kind == "mutation") { + /// if (.operation.type == "mutation") { /// 10s /// } else { /// 5s From 7f4ae2099689257ca4bb599f61e2422f6f57ff8c Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 17:43:09 +0300 Subject: [PATCH 03/20] Update docs --- docs/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/README.md b/docs/README.md index 17baa8376..32e17f79d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1393,7 +1393,7 @@ The default configuration that will be applied to all subgraphs, unless overridd |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| -|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.operation.kind == "mutation") {
10s
} else {
5s
}
```
|| +|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.operation.type == "mutation") {
10s
} else {
5s
}
```
|| **Example** @@ -1426,7 +1426,7 @@ Optional per-subgraph configurations that will override the default configuratio |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| -|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.operation.kind == "mutation") {
10s
} else {
5s
}
```
|| +|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.operation.type == "mutation") {
10s
} else {
5s
}
```
|| **Example** From d8b5fd4d8a6c1f9e0bcb5b5587c2c54e6ba72ecb Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:06:23 +0300 Subject: [PATCH 04/20] More readable --- lib/executor/src/executors/http.rs | 29 ++++++++++++++++----------- lib/executor/src/executors/timeout.rs | 19 +++++++++++++++++- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 228015800..ed2d7c5a1 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -6,13 +6,14 @@ use crate::executors::timeout::HTTPTimeout; use dashmap::DashMap; use futures::TryFutureExt; use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig; +use hyper::body::Incoming; use tokio::sync::OnceCell; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; -use http::HeaderMap; use http::HeaderValue; +use http::{HeaderMap, Request, Response}; use http_body_util::BodyExt; use http_body_util::Full; use hyper::Version; @@ -136,6 +137,18 @@ impl HTTPSubgraphExecutor { Ok(body) } + pub async fn send_request_to_client( + &self, + req: Request>, + ) -> Result, SubgraphExecutorError> { + self.http_client + .request(req) + .map_err(|e| { + SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) + }) + .await + } + async fn _send_request( &self, body: Vec, @@ -153,21 +166,13 @@ impl HTTPSubgraphExecutor { *req.headers_mut() = headers; - let request_op = self.http_client.request(req).map_err(|e| { - SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - }); - let res = if let Some(timeout) = timeout { - tokio::time::timeout(timeout, request_op) - .await - .map_err(|_| { - SubgraphExecutorError::RequestTimeout(self.endpoint.to_string(), timeout) - })? + self.send_request_with_timeout(req, timeout).await? } else { - request_op.await + self.send_request_to_client(req).await? }; - let (parts, body) = res?.into_parts(); + let (parts, body) = res.into_parts(); Ok(SharedResponse { status: parts.status, diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index dd0ff3514..0bbc1e049 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -1,13 +1,18 @@ use std::collections::BTreeMap; use std::time::Duration; +use bytes::Bytes; +use futures::TryFutureExt; use hive_router_config::traffic_shaping::HTTPTimeoutConfig; +use http::{Request, Response}; +use http_body_util::Full; +use hyper::body::Incoming; use tracing::warn; use vrl::compiler::Program as VrlProgram; use vrl::diagnostic::DiagnosticList; -use crate::execution::plan::ClientRequestDetails; use crate::executors::http::HTTPSubgraphExecutor; +use crate::{execution::plan::ClientRequestDetails, executors::error::SubgraphExecutorError}; use vrl::{ compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, @@ -117,6 +122,18 @@ impl HTTPSubgraphExecutor { let expression_context = ExpressionContext { client_request }; get_timeout_duration(&self.timeout, &expression_context) } + + pub async fn send_request_with_timeout( + &self, + req: Request>, + timeout: Duration, + ) -> Result, SubgraphExecutorError> { + let request_op = self.send_request_to_client(req); + + tokio::time::timeout(timeout, request_op) + .map_err(|_| SubgraphExecutorError::RequestTimeout(self.endpoint.to_string(), timeout)) + .await? + } } #[cfg(test)] From d7b4839ecaff50d9f91270483b4da7c90d13d022 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:15:37 +0300 Subject: [PATCH 05/20] Update lib/executor/src/executors/timeout.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- lib/executor/src/executors/timeout.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index 0bbc1e049..4d577e0c3 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -69,7 +69,14 @@ fn warn_unsupported_conversion_option(type_name: &str) -> Option { fn vrl_value_to_duration(value: VrlValue) -> Option { match value { - VrlValue::Integer(i) => Some(Duration::from_millis(u64::from_ne_bytes(i.to_ne_bytes()))), + VrlValue::Integer(i) => { + if i < 0 { + warn!("Cannot convert negative integer ({}) to Duration.", i); + None + } else { + Some(Duration::from_millis(i as u64)) + } + }, VrlValue::Bytes(_) => warn_unsupported_conversion_option("Bytes"), VrlValue::Float(_) => warn_unsupported_conversion_option("Float"), VrlValue::Boolean(_) => warn_unsupported_conversion_option("Boolean"), From eaaca403dbfe670928d55c166d1a16b76ce9a542 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:15:43 +0300 Subject: [PATCH 06/20] Update lib/router-config/src/traffic_shaping.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- lib/router-config/src/traffic_shaping.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 89adb0a06..64e2639c8 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -32,7 +32,7 @@ pub struct TrafficShapingExecutorConfig { /// ```yaml /// timeout: /// expression: | - /// if (.operation.type == "mutation") { + /// if (.request.operation.type == "mutation") { /// 10s /// } else { /// 5s From de668727e9347d90dc104a14cd2b075bbe991c45 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:15:50 +0300 Subject: [PATCH 07/20] Update lib/executor/src/executors/timeout.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- lib/executor/src/executors/timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index 4d577e0c3..081ca307d 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -85,7 +85,7 @@ fn vrl_value_to_duration(value: VrlValue) -> Option { VrlValue::Timestamp(_) => warn_unsupported_conversion_option("Timestamp"), VrlValue::Object(_) => warn_unsupported_conversion_option("Object"), VrlValue::Null => { - warn!("Cannot convert VRL Null value to a url value."); + warn!("Cannot convert VRL Null value to a Duration value."); None } } From 9e7f2131ad3f88d0504a914c34f80c0369bc3020 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:16:09 +0300 Subject: [PATCH 08/20] Update docs/README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/README.md b/docs/README.md index 32e17f79d..4f9e6ee20 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1393,7 +1393,7 @@ The default configuration that will be applied to all subgraphs, unless overridd |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| -|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.operation.type == "mutation") {
10s
} else {
5s
}
```
|| +|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: |
if (.request.operation.type == "mutation") {
10000
} else {
5000
}
```
|| **Example** From aee4595df190cf0cc75cb9eb11c240b77f77885d Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:16:16 +0300 Subject: [PATCH 09/20] Update docs/README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/README.md b/docs/README.md index 4f9e6ee20..bdfa7bcad 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1426,7 +1426,7 @@ Optional per-subgraph configurations that will override the default configuratio |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| -|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.operation.type == "mutation") {
10s
} else {
5s
}
```
|| +|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: |
if (.request.operation.type == "mutation") {
10000
} else {
5000
}
```
|| **Example** From 62e897526ea640782e53ecef712af30fed99b06b Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:16:26 +0300 Subject: [PATCH 10/20] Update lib/router-config/src/traffic_shaping.rs Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- lib/router-config/src/traffic_shaping.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 64e2639c8..a8cbffbda 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -33,9 +33,9 @@ pub struct TrafficShapingExecutorConfig { /// timeout: /// expression: | /// if (.request.operation.type == "mutation") { - /// 10s + /// 10000 /// } else { - /// 5s + /// 5000 /// } /// ``` #[serde(default, skip_serializing_if = "Option::is_none")] From bba2c57d238d7c61d9ede3a44ad82a251f54cdc2 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:18:10 +0300 Subject: [PATCH 11/20] Fix todo --- lib/executor/src/executors/map.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index e31c7cfbe..fe50fb725 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -121,7 +121,7 @@ impl SubgraphExecutorMap { &global_client_arc, ); - // TODO: Maybe reuse the in-flight requests map if dedupe is the same + // TODO: Maybe reuse the in-flight requests map in some cases ??? let inflight_requests = subgraph_config .map(|_| Arc::new(DashMap::with_hasher(ABuildHasher::default()))) .unwrap_or_else(|| global_in_flight_requests.clone()); From 2e63a591623427aa4a7e60a9c764623358def41d Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:19:20 +0300 Subject: [PATCH 12/20] Fix format --- lib/executor/src/executors/timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index 081ca307d..96413a85e 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -76,7 +76,7 @@ fn vrl_value_to_duration(value: VrlValue) -> Option { } else { Some(Duration::from_millis(i as u64)) } - }, + } VrlValue::Bytes(_) => warn_unsupported_conversion_option("Bytes"), VrlValue::Float(_) => warn_unsupported_conversion_option("Float"), VrlValue::Boolean(_) => warn_unsupported_conversion_option("Boolean"), From 921d71ec2e9115ced1b4645befa7e07a8ab7f1f7 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 10 Oct 2025 18:20:03 +0300 Subject: [PATCH 13/20] Fix docs --- docs/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/README.md b/docs/README.md index bdfa7bcad..408195e89 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1393,7 +1393,7 @@ The default configuration that will be applied to all subgraphs, unless overridd |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| -|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: |
if (.request.operation.type == "mutation") {
10000
} else {
5000
}
```
|| +|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
10000
} else {
5000
}
```
|| **Example** @@ -1426,7 +1426,7 @@ Optional per-subgraph configurations that will override the default configuratio |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| -|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: |
if (.request.operation.type == "mutation") {
10000
} else {
5000
}
```
|| +|**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
10000
} else {
5000
}
```
|| **Example** From 28182e90fa5b6aadaae03d171756c4688b0b6f8d Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Sun, 12 Oct 2025 10:57:27 +0300 Subject: [PATCH 14/20] Timeout Executor approach --- lib/executor/src/executors/error.rs | 22 ++- lib/executor/src/executors/http.rs | 81 ++------- lib/executor/src/executors/map.rs | 19 +- lib/executor/src/executors/timeout.rs | 221 +++++++++++++---------- lib/router-config/src/traffic_shaping.rs | 4 +- 5 files changed, 181 insertions(+), 166 deletions(-) diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 2fb67e851..68a534346 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -1,5 +1,9 @@ use std::time::Duration; +use bytes::{BufMut, Bytes, BytesMut}; + +use crate::response::graphql_error::GraphQLError; + #[derive(thiserror::Error, Debug, Clone)] pub enum SubgraphExecutorError { #[error("Failed to parse endpoint \"{0}\" as URI: {1}")] @@ -10,6 +14,20 @@ pub enum SubgraphExecutorError { RequestFailure(String, String), #[error("Failed to serialize variable \"{0}\": {1}")] VariablesSerializationFailure(String, String), - #[error("Request to subgraph \"{0}\" timed out after {1:?}")] - RequestTimeout(String, Duration), + #[error("Failed to parse timeout duration from expression: {0}")] + TimeoutExpressionParseFailure(String), + #[error("Request timed out after {0:?}")] + RequestTimeout(Duration), +} +pub fn error_to_graphql_bytes(endpoint: &http::Uri, e: SubgraphExecutorError) -> Bytes { + let graphql_error: GraphQLError = + format!("Failed to execute request to subgraph {}: {}", endpoint, e).into(); + let errors = vec![graphql_error]; + // This unwrap is safe as GraphQLError serialization shouldn't fail. + let errors_bytes = sonic_rs::to_vec(&errors).unwrap(); + let mut buffer = BytesMut::new(); + buffer.put_slice(b"{\"errors\":"); + buffer.put_slice(&errors_bytes); + buffer.put_slice(b"}"); + buffer.freeze() } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index ed2d7c5a1..b1297a9b8 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,30 +1,25 @@ use std::sync::Arc; -use std::time::Duration; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; -use crate::executors::timeout::HTTPTimeout; use dashmap::DashMap; use futures::TryFutureExt; use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig; -use hyper::body::Incoming; use tokio::sync::OnceCell; use async_trait::async_trait; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{BufMut, Bytes}; +use http::HeaderMap; use http::HeaderValue; -use http::{HeaderMap, Request, Response}; use http_body_util::BodyExt; use http_body_util::Full; use hyper::Version; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client}; use tokio::sync::Semaphore; -use tracing::warn; use crate::executors::common::{HttpExecutionRequest, HttpExecutionResponse}; -use crate::executors::error::SubgraphExecutorError; -use crate::response::graphql_error::GraphQLError; +use crate::executors::error::{error_to_graphql_bytes, SubgraphExecutorError}; use crate::utils::consts::CLOSE_BRACE; use crate::utils::consts::COLON; use crate::utils::consts::COMMA; @@ -39,7 +34,6 @@ pub struct HTTPSubgraphExecutor { pub semaphore: Arc, pub config: Arc, pub in_flight_requests: Arc>, ABuildHasher>>, - pub timeout: Option, } const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{"; @@ -63,21 +57,6 @@ impl HTTPSubgraphExecutor { HeaderValue::from_static("keep-alive"), ); - let timeout = if let Some(timeout_config) = &config.timeout { - match HTTPTimeout::try_from(timeout_config) { - Ok(timeout) => Some(timeout), - Err(diagnostic) => { - warn!( - "Failed to parse timeout expression for subgraph {}: {:#?}", - endpoint, diagnostic - ); - None - } - } - } else { - None - }; - Self { endpoint, http_client, @@ -85,7 +64,6 @@ impl HTTPSubgraphExecutor { semaphore, config, in_flight_requests, - timeout, } } @@ -137,23 +115,10 @@ impl HTTPSubgraphExecutor { Ok(body) } - pub async fn send_request_to_client( - &self, - req: Request>, - ) -> Result, SubgraphExecutorError> { - self.http_client - .request(req) - .map_err(|e| { - SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - }) - .await - } - async fn _send_request( &self, body: Vec, headers: HeaderMap, - timeout: Option, ) -> Result { let mut req = hyper::Request::builder() .method(http::Method::POST) @@ -166,11 +131,13 @@ impl HTTPSubgraphExecutor { *req.headers_mut() = headers; - let res = if let Some(timeout) = timeout { - self.send_request_with_timeout(req, timeout).await? - } else { - self.send_request_to_client(req).await? - }; + let res = self + .http_client + .request(req) + .map_err(|e| { + SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) + }) + .await?; let (parts, body) = res.into_parts(); @@ -186,22 +153,6 @@ impl HTTPSubgraphExecutor { headers: parts.headers, }) } - - fn error_to_graphql_bytes(&self, e: SubgraphExecutorError) -> Bytes { - let graphql_error: GraphQLError = format!( - "Failed to execute request to subgraph {}: {}", - self.endpoint, e - ) - .into(); - let errors = vec![graphql_error]; - // This unwrap is safe as GraphQLError serialization shouldn't fail. - let errors_bytes = sonic_rs::to_vec(&errors).unwrap(); - let mut buffer = BytesMut::new(); - buffer.put_slice(b"{\"errors\":"); - buffer.put_slice(&errors_bytes); - buffer.put_slice(b"}"); - buffer.freeze() - } } #[async_trait] @@ -214,7 +165,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { Ok(body) => body, Err(e) => { return HttpExecutionResponse { - body: self.error_to_graphql_bytes(e), + body: error_to_graphql_bytes(&self.endpoint, e), headers: Default::default(), } } @@ -229,14 +180,13 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - let timeout = self.get_timeout_duration(execution_request.client_request); - return match self._send_request(body, headers, timeout).await { + return match self._send_request(body, headers).await { Ok(shared_response) => HttpExecutionResponse { body: shared_response.body, headers: shared_response.headers, }, Err(e) => HttpExecutionResponse { - body: self.error_to_graphql_bytes(e), + body: error_to_graphql_bytes(&self.endpoint, e), headers: Default::default(), }, }; @@ -255,12 +205,11 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { let response_result = cell .get_or_try_init(|| async { - let timeout = self.get_timeout_duration(execution_request.client_request); let res = { // This unwrap is safe because the semaphore is never closed during the application's lifecycle. // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`. let _permit = self.semaphore.acquire().await.unwrap(); - self._send_request(body, headers, timeout).await + self._send_request(body, headers).await }; // It's important to remove the entry from the map before returning the result. // This ensures that once the OnceCell is set, no future requests can join it. @@ -276,7 +225,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { headers: shared_response.headers.clone(), }, Err(e) => HttpExecutionResponse { - body: self.error_to_graphql_bytes(e.clone()), + body: error_to_graphql_bytes(&self.endpoint, e.clone()), headers: Default::default(), }, } diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index fe50fb725..aea549989 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -20,6 +20,7 @@ use crate::{ dedupe::{ABuildHasher, SharedResponse}, error::SubgraphExecutorError, http::HTTPSubgraphExecutor, + timeout::TimeoutExecutor, }, response::graphql_error::GraphQLError, }; @@ -130,15 +131,23 @@ impl SubgraphExecutorMap { .map(|cfg| Arc::new(cfg.clone())) .unwrap_or_else(|| global_config_arc.clone()); - let executor = HTTPSubgraphExecutor::new( - endpoint_uri, + let timeout_config = subgraph_config.and_then(|cfg| cfg.timeout.as_ref()); + + let mut executor = HTTPSubgraphExecutor::new( + endpoint_uri.clone(), http_client, semaphore, - config_arc, + config_arc.clone(), inflight_requests, - ); + ) + .to_boxed_arc(); + + if let Some(timeout_config) = timeout_config { + executor = TimeoutExecutor::try_new(endpoint_uri, timeout_config, executor)? + .to_boxed_arc(); + } - Ok((subgraph_name, executor.to_boxed_arc())) + Ok((subgraph_name, executor)) }) .collect::, SubgraphExecutorError>>()?; diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index 96413a85e..7c3308161 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -1,17 +1,15 @@ use std::collections::BTreeMap; use std::time::Duration; -use bytes::Bytes; -use futures::TryFutureExt; -use hive_router_config::traffic_shaping::HTTPTimeoutConfig; -use http::{Request, Response}; -use http_body_util::Full; -use hyper::body::Incoming; +use async_trait::async_trait; +use hive_router_config::traffic_shaping::SubgraphTimeoutConfig; use tracing::warn; use vrl::compiler::Program as VrlProgram; -use vrl::diagnostic::DiagnosticList; -use crate::executors::http::HTTPSubgraphExecutor; +use crate::executors::common::{ + HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc, +}; +use crate::executors::error::error_to_graphql_bytes; use crate::{execution::plan::ClientRequestDetails, executors::error::SubgraphExecutorError}; use vrl::{ compiler::TargetValue as VrlTargetValue, @@ -23,28 +21,11 @@ use vrl::{ use vrl::{compiler::compile as vrl_compile, stdlib::all as vrl_build_functions}; #[derive(Debug)] -pub enum HTTPTimeout { +pub enum TimeoutSource { Expression(Box), Duration(Duration), } -impl TryFrom<&HTTPTimeoutConfig> for HTTPTimeout { - type Error = DiagnosticList; - fn try_from(timeout_config: &HTTPTimeoutConfig) -> Result { - match timeout_config { - HTTPTimeoutConfig::Duration(dur) => Ok(HTTPTimeout::Duration(*dur)), - HTTPTimeoutConfig::Expression(expr) => { - // Compile the VRL expression into a Program - let functions = vrl_build_functions(); - let compilation_result = vrl_compile(expr, &functions)?; - Ok(HTTPTimeout::Expression(Box::new( - compilation_result.program, - ))) - } - } - } -} - pub struct ExpressionContext<'a> { pub client_request: &'a ClientRequestDetails<'a>, } @@ -91,77 +72,136 @@ fn vrl_value_to_duration(value: VrlValue) -> Option { } } -fn get_timeout_duration<'a>( - timeout: &Option, - expression_context: &ExpressionContext<'a>, -) -> Option { - timeout.as_ref().and_then(|timeout| match timeout { - HTTPTimeout::Duration(dur) => Some(*dur), - HTTPTimeout::Expression(program) => { - let mut target = VrlTargetValue { - value: VrlValue::from(expression_context), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - match program.resolve(&mut ctx) { - Ok(resolved) => vrl_value_to_duration(resolved), - Err(err) => { - warn!( - "Failed to evaluate timeout expression: {:#?}, falling back to no timeout.", - err - ); - None - } - } - } - }) +pub struct TimeoutExecutor { + pub endpoint: http::Uri, + pub timeout: TimeoutSource, + pub executor: SubgraphExecutorBoxedArc, } -impl HTTPSubgraphExecutor { +impl TimeoutExecutor { + pub fn try_new( + endpoint: http::Uri, + timeout_config: &SubgraphTimeoutConfig, + executor: SubgraphExecutorBoxedArc, + ) -> Result { + let timeout = match timeout_config { + SubgraphTimeoutConfig::Duration(dur) => TimeoutSource::Duration(*dur), + SubgraphTimeoutConfig::Expression(expr) => { + // Compile the VRL expression into a Program + let functions = vrl_build_functions(); + let compilation_result = vrl_compile(expr, &functions).map_err(|diagnostics| { + SubgraphExecutorError::TimeoutExpressionParseFailure( + diagnostics + .errors() + .into_iter() + .map(|d| d.code.to_string() + ": " + &d.message) + .collect::>() + .join(", "), + ) + })?; + TimeoutSource::Expression(Box::new(compilation_result.program)) + } + }; + Ok(Self { + endpoint, + timeout, + executor, + }) + } pub fn get_timeout_duration<'a>( &self, client_request: &'a ClientRequestDetails<'a>, ) -> Option { let expression_context = ExpressionContext { client_request }; - get_timeout_duration(&self.timeout, &expression_context) + + match &self.timeout { + TimeoutSource::Duration(dur) => Some(*dur), + TimeoutSource::Expression(program) => { + let mut target = VrlTargetValue { + value: VrlValue::from(&expression_context), + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let timezone = VrlTimeZone::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + match program.resolve(&mut ctx) { + Ok(resolved) => vrl_value_to_duration(resolved), + Err(err) => { + warn!( + "Failed to evaluate timeout expression: {:#?}, falling back to no timeout.", + err + ); + None + } + } + } + } } +} - pub async fn send_request_with_timeout( +#[async_trait] +impl SubgraphExecutor for TimeoutExecutor { + async fn execute<'a>( &self, - req: Request>, - timeout: Duration, - ) -> Result, SubgraphExecutorError> { - let request_op = self.send_request_to_client(req); - - tokio::time::timeout(timeout, request_op) - .map_err(|_| SubgraphExecutorError::RequestTimeout(self.endpoint.to_string(), timeout)) - .await? + execution_request: HttpExecutionRequest<'a>, + ) -> HttpExecutionResponse { + let timeout = self.get_timeout_duration(execution_request.client_request); + let execution = self.executor.execute(execution_request); + if let Some(timeout) = timeout { + match tokio::time::timeout(timeout, execution).await { + Ok(response) => response, + Err(_) => HttpExecutionResponse { + body: error_to_graphql_bytes( + &self.endpoint, + SubgraphExecutorError::RequestTimeout(timeout), + ), + headers: Default::default(), + }, + } + } else { + execution.await + } } } #[cfg(test)] mod tests { + use async_trait::async_trait; use http::Method; use ntex_http::HeaderMap; use crate::{ execution::plan::{ClientRequestDetails, OperationDetails}, - executors::timeout::get_timeout_duration, + executors::{ + common::{HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor}, + timeout::TimeoutExecutor, + }, }; + struct MockExecutor {} + + #[async_trait] + impl SubgraphExecutor for MockExecutor { + async fn execute<'a>( + &self, + _execution_request: HttpExecutionRequest<'a>, + ) -> HttpExecutionResponse { + HttpExecutionResponse { + body: Default::default(), + headers: Default::default(), + } + } + } + #[test] fn get_timeout_duration_from_expression() { use std::time::Duration; - use hive_router_config::traffic_shaping::HTTPTimeoutConfig; - - use crate::executors::timeout::HTTPTimeout; + use hive_router_config::traffic_shaping::SubgraphTimeoutConfig; - let timeout_config = HTTPTimeoutConfig::Expression( + let timeout_config = SubgraphTimeoutConfig::Expression( r#" if .request.operation.type == "mutation" { 10000 @@ -172,21 +212,16 @@ mod tests { .to_string(), ); - let timeout = HTTPTimeout::try_from(&timeout_config).expect("Failed to create timeout"); - let headers = HeaderMap::new(); + let mock_executor = MockExecutor {}.to_boxed_arc(); - let client_request_mutation = crate::execution::plan::ClientRequestDetails { - operation: OperationDetails { - name: Some("TestMutation".to_string()), - kind: "mutation", - query: "mutation TestMutation { doSomething }".into(), - }, - url: "http://example.com/graphql".parse().unwrap(), - headers: &headers, - method: Method::POST, - }; + let timeout_executor = TimeoutExecutor::try_new( + "http://example.com/graphql".parse().unwrap(), + &timeout_config, + mock_executor, + ) + .unwrap(); - let timeout = Some(timeout); + let headers = HeaderMap::new(); let client_request_query = ClientRequestDetails { operation: OperationDetails { @@ -198,21 +233,25 @@ mod tests { headers: &headers, method: Method::POST, }; - - let query_ctx = crate::executors::timeout::ExpressionContext { - client_request: &client_request_query, - }; - let duration_query = get_timeout_duration(&timeout, &query_ctx); + let duration_query = timeout_executor.get_timeout_duration(&client_request_query); assert_eq!( duration_query, Some(Duration::from_millis(5000)), "Expected 5000ms for query" ); - let mutation_ctx = crate::executors::timeout::ExpressionContext { - client_request: &client_request_mutation, + let client_request_mutation = crate::execution::plan::ClientRequestDetails { + operation: OperationDetails { + name: Some("TestMutation".to_string()), + kind: "mutation", + query: "mutation TestMutation { doSomething }".into(), + }, + url: "http://example.com/graphql".parse().unwrap(), + headers: &headers, + method: Method::POST, }; - let duration_mutation = get_timeout_duration(&timeout, &mutation_ctx); + + let duration_mutation = timeout_executor.get_timeout_duration(&client_request_mutation); assert_eq!( duration_mutation, Some(Duration::from_millis(10000)), diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index a8cbffbda..46e3fa714 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -39,12 +39,12 @@ pub struct TrafficShapingExecutorConfig { /// } /// ``` #[serde(default, skip_serializing_if = "Option::is_none")] - pub timeout: Option, + pub timeout: Option, } #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] -pub enum HTTPTimeoutConfig { +pub enum SubgraphTimeoutConfig { Expression(String), Duration(Duration), } From 1be63ae836cde7375cf48e4d030a9e859ed2628e Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Sun, 12 Oct 2025 11:54:33 +0300 Subject: [PATCH 15/20] F --- lib/executor/src/executors/map.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index aea549989..1fc47d8a2 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -131,8 +131,6 @@ impl SubgraphExecutorMap { .map(|cfg| Arc::new(cfg.clone())) .unwrap_or_else(|| global_config_arc.clone()); - let timeout_config = subgraph_config.and_then(|cfg| cfg.timeout.as_ref()); - let mut executor = HTTPSubgraphExecutor::new( endpoint_uri.clone(), http_client, @@ -142,7 +140,7 @@ impl SubgraphExecutorMap { ) .to_boxed_arc(); - if let Some(timeout_config) = timeout_config { + if let Some(timeout_config) = &config_arc.timeout { executor = TimeoutExecutor::try_new(endpoint_uri, timeout_config, executor)? .to_boxed_arc(); } From 898827745cb02c07b68b95ed48ec237945887566 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Sun, 12 Oct 2025 12:05:41 +0300 Subject: [PATCH 16/20] Update docs/README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/README.md b/docs/README.md index 408195e89..843c4dd5e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1377,6 +1377,12 @@ all: dedupe_enabled: true max_connections_per_host: 100 pool_idle_timeout_seconds: 50 +subgraphs: + products: + timeout: + duration: 10s + reviews: + max_connections_per_host: 50 ``` From c86414fd8b447be71c89a62c6e2d25cd7be1b828 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Sun, 12 Oct 2025 12:16:55 +0300 Subject: [PATCH 17/20] .. --- lib/executor/src/executors/timeout.rs | 33 ++++++++++++++++++++++++ lib/router-config/src/traffic_shaping.rs | 8 ++++++ 2 files changed, 41 insertions(+) diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index 7c3308161..3b8c55403 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -169,6 +169,7 @@ impl SubgraphExecutor for TimeoutExecutor { #[cfg(test)] mod tests { use async_trait::async_trait; + use hive_router_config::parse_yaml_config; use http::Method; use ntex_http::HeaderMap; @@ -258,4 +259,36 @@ mod tests { "Expected 10000ms for mutation" ); } + + #[test] + fn get_timeout_duration_from_fixed_duration() { + let yaml_str = r#" + traffic_shaping: + all: + timeout: + duration: 7s + "#; + let config = parse_yaml_config(yaml_str.to_string()).unwrap(); + let mock_executor = MockExecutor {}.to_boxed_arc(); + let timeout_executor = TimeoutExecutor::try_new( + "http://example.com/graphql".parse().unwrap(), + &config.traffic_shaping.all.timeout.unwrap(), + mock_executor, + ) + .unwrap(); + + let headers = HeaderMap::new(); + let client_request = ClientRequestDetails { + operation: OperationDetails { + name: Some("TestQuery".to_string()), + kind: "query", + query: "query TestQuery { field }".into(), + }, + url: "http://example.com/graphql".parse().unwrap(), + headers: &headers, + method: Method::POST, + }; + let duration = timeout_executor.get_timeout_duration(&client_request); + assert_eq!(duration, Some(std::time::Duration::from_millis(7000))); + } } diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 46e3fa714..fd709ecde 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -46,9 +46,17 @@ pub struct TrafficShapingExecutorConfig { #[serde(rename_all = "camelCase")] pub enum SubgraphTimeoutConfig { Expression(String), + #[serde(deserialize_with = "humantime_serde")] Duration(Duration), } +fn humantime_serde<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + humantime_serde::deserialize(deserializer) +} + impl Default for TrafficShapingExecutorConfig { fn default() -> Self { Self { From 8ba373bee7f3fb43bdffa970f1bfb70ba4dac57c Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Sun, 12 Oct 2025 12:21:59 +0300 Subject: [PATCH 18/20] F --- docs/README.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/docs/README.md b/docs/README.md index 843c4dd5e..408195e89 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1377,12 +1377,6 @@ all: dedupe_enabled: true max_connections_per_host: 100 pool_idle_timeout_seconds: 50 -subgraphs: - products: - timeout: - duration: 10s - reviews: - max_connections_per_host: 50 ``` From fd447be03022e18af2b72d6ac2c811c3c8e8b7e8 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Sun, 12 Oct 2025 18:33:36 +0300 Subject: [PATCH 19/20] Test timeout on server --- Cargo.lock | 13 +++ lib/executor/Cargo.toml | 1 + lib/executor/src/executors/map.rs | 2 +- lib/executor/src/executors/timeout.rs | 134 ++++++++++++++++++++++++++ 4 files changed, 149 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index fb2ab35a1..8fc6df4de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -357,6 +357,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" dependencies = [ "axum-core", + "axum-macros", "base64 0.22.1", "bytes", "form_urlencoded", @@ -405,6 +406,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "backtrace" version = "0.3.76" @@ -1852,6 +1864,7 @@ version = "3.0.0" dependencies = [ "ahash", "async-trait", + "axum", "bumpalo", "bytes", "criterion", diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index 16d3ed339..03f41bbdf 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -53,6 +53,7 @@ subgraphs = { path = "../../bench/subgraphs" } criterion = { workspace = true } tokio = { workspace = true } insta = { workspace = true } +axum = { version = "0.8.6", features = ["macros", "tokio", "json"]} [[bench]] name = "executor_benches" diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 1fc47d8a2..a2044ebdc 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -156,7 +156,7 @@ impl SubgraphExecutorMap { } // Create a new hyper client based on the traffic shaping config -fn from_traffic_shaping_config_to_client( +pub fn from_traffic_shaping_config_to_client( config: &TrafficShapingExecutorConfig, ) -> Arc, Full>> { Arc::new( diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index 3b8c55403..99a6ffbc3 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -168,7 +168,10 @@ impl SubgraphExecutor for TimeoutExecutor { #[cfg(test)] mod tests { + use std::time::Duration; + use async_trait::async_trait; + use axum::{extract::State, http::Response, Router}; use hive_router_config::parse_yaml_config; use http::Method; use ntex_http::HeaderMap; @@ -177,6 +180,7 @@ mod tests { execution::plan::{ClientRequestDetails, OperationDetails}, executors::{ common::{HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor}, + map::from_traffic_shaping_config_to_client, timeout::TimeoutExecutor, }, }; @@ -291,4 +295,134 @@ mod tests { let duration = timeout_executor.get_timeout_duration(&client_request); assert_eq!(duration, Some(std::time::Duration::from_millis(7000))); } + + #[tokio::test] + async fn cancels_http_request_when_timeout_expires() { + /** + * We will test here that when the timeout expires, the request is cancelled on the server-end as well. + * For that, we will create a server that sets a flag when the request is dropped/cancelled. + */ + use std::sync::Arc; + + use http::Method; + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + + struct AppState { + tx: Arc>, + } + + let app_state = AppState { tx: Arc::new(tx) }; + + let app_state_arc = Arc::new(app_state); + + struct CancelOnDrop { + start: std::time::Instant, + tx: Arc>, + } + + impl Drop for CancelOnDrop { + fn drop(&mut self) { + self.tx.send(self.start.elapsed()).unwrap(); + } + } + + #[axum::debug_handler] + async fn handler(State(state): State>) -> Response { + let _cancel_on_drop = CancelOnDrop { + start: std::time::Instant::now(), + tx: state.tx.clone(), + }; + // Never resolve the request, just wait until it's cancelled + let fut = futures::future::pending::>(); + fut.await + } + + println!("Starting server..."); + let app = Router::new() + .fallback(handler) + .with_state(app_state_arc.clone()); + println!("Router created, binding to port..."); + let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap(); + println!("Listener bound, starting server..."); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + if let Err(e) = axum::serve(listener, app).await { + eprintln!("Server error: {}", e); + } + }); + println!("Server started on {}", addr); + let graphql_path = "graphql"; + let endpoint: http::Uri = format!("http://{}/{}", addr, graphql_path).parse().unwrap(); + println!("Endpoint: {}", endpoint); + + let config = r#" + traffic_shaping: + all: + timeout: + duration: 5s + "#; + + let config = hive_router_config::parse_yaml_config(config.to_string()).unwrap(); + let http_client = from_traffic_shaping_config_to_client(&config.traffic_shaping.all); + let http_executor = crate::executors::http::HTTPSubgraphExecutor::new( + endpoint.clone(), + http_client, + Arc::new(tokio::sync::Semaphore::new(10)), + Arc::new(config.traffic_shaping.all.clone()), + Default::default(), + ); + let timeout_executor = TimeoutExecutor::try_new( + endpoint, + &config.traffic_shaping.all.timeout.unwrap(), + http_executor.to_boxed_arc(), + ) + .unwrap(); + + let headers = HeaderMap::new(); + let client_request = ClientRequestDetails { + operation: OperationDetails { + name: Some("TestQuery".to_string()), + kind: "query", + query: "query TestQuery { field }".into(), + }, + url: "http://example.com/graphql".parse().unwrap(), + headers: &headers, + method: Method::POST, + }; + + let execution_request = HttpExecutionRequest { + operation_name: Some("TestQuery"), + query: r#"{ field }"#, + variables: None, + representations: None, + headers: http::HeaderMap::new(), + client_request: &client_request, + dedupe: true, + }; + + println!("Sending request to executor with 5s timeout..."); + let response = timeout_executor.execute(execution_request).await; + + println!("Received response from executor."); + assert!( + response + .body + .starts_with(b"{\"errors\":[{\"message\":\"Failed to execute request to subgraph"), + "Expected error response due to timeout" + ); + + println!("Waiting to see if server was notified of cancellation..."); + + // Wait for the server to be notified that the request was cancelled + let elapsed = rx.recv().await.unwrap(); + println!("Server was notified of cancellation after {:?}", elapsed); + assert!( + elapsed >= Duration::from_secs_f32(4.9), + "Expected server to be notified of cancellation after at least 5s, but was {:?}", + elapsed + ); + + println!("Test completed."); + } } From a33b2f31475d0845f30da100c07fb5d3a37755a6 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 13 Oct 2025 17:20:50 +0300 Subject: [PATCH 20/20] Go --- docs/README.md | 13 +++--- lib/executor/src/executors/map.rs | 53 ++++++++---------------- lib/executor/src/executors/timeout.rs | 2 +- lib/router-config/src/lib.rs | 14 +------ lib/router-config/src/traffic_shaping.rs | 26 ++++++++++-- 5 files changed, 48 insertions(+), 60 deletions(-) diff --git a/docs/README.md b/docs/README.md index 408195e89..ab535892c 100644 --- a/docs/README.md +++ b/docs/README.md @@ -11,7 +11,7 @@ |[**log**](#log)|`object`|The router logger configuration.
Default: `{"filter":null,"format":"json","level":"info"}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
Default: `{"path":"supergraph.graphql","source":"file"}`
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"dedupe_enabled":true,"pool_idle_timeout_seconds":50},"max_connections_per_host":100}`
|| **Additional Properties:** not allowed **Example** @@ -66,8 +66,8 @@ supergraph: traffic_shaping: all: dedupe_enabled: true - max_connections_per_host: 100 pool_idle_timeout_seconds: 50 + max_connections_per_host: 100 ``` @@ -1367,7 +1367,8 @@ Configuration for the traffic-shaper executor. Use these configurations to contr |Name|Type|Description|Required| |----|----|-----------|--------| -|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"dedupe_enabled":true,"pool_idle_timeout_seconds":50}`
|| +|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
|| **Example** @@ -1375,8 +1376,8 @@ Configuration for the traffic-shaper executor. Use these configurations to contr ```yaml all: dedupe_enabled: true - max_connections_per_host: 100 pool_idle_timeout_seconds: 50 +max_connections_per_host: 100 ``` @@ -1391,7 +1392,6 @@ The default configuration that will be applied to all subgraphs, unless overridd |Name|Type|Description|Required| |----|----|-----------|--------| |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| -|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| |**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
10000
} else {
5000
}
```
|| @@ -1399,7 +1399,6 @@ The default configuration that will be applied to all subgraphs, unless overridd ```yaml dedupe_enabled: true -max_connections_per_host: 100 pool_idle_timeout_seconds: 50 ``` @@ -1424,7 +1423,6 @@ Optional per-subgraph configurations that will override the default configuratio |Name|Type|Description|Required| |----|----|-----------|--------| |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| -|**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| |**timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
10000
} else {
5000
}
```
|| @@ -1432,7 +1430,6 @@ Optional per-subgraph configurations that will override the default configuratio ```yaml dedupe_enabled: true -max_connections_per_host: 100 pool_idle_timeout_seconds: 50 ``` diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index a2044ebdc..555fa4bae 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -2,7 +2,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{traffic_shaping::TrafficShapingExecutorConfig, TrafficShapingConfig}; +use hive_router_config::{ + traffic_shaping::TrafficShapingConfig, traffic_shaping::TrafficShapingExecutorConfig, +}; use http::Uri; use http_body_util::Full; use hyper_tls::HttpsConnector; @@ -78,8 +80,10 @@ impl SubgraphExecutorMap { subgraph_endpoint_map: HashMap, config: TrafficShapingConfig, ) -> Result { - let global_client_arc = from_traffic_shaping_config_to_client(&config.all); - let global_semaphores_by_origin: DashMap> = DashMap::new(); + let max_connections_per_host = config.max_connections_per_host; + let global_client_arc = + from_traffic_shaping_config_to_client(&config.all, max_connections_per_host); + let semaphores_by_origin: DashMap> = DashMap::new(); let global_config_arc = Arc::new(config.all); let global_in_flight_requests: Arc< DashMap>, ABuildHasher>, @@ -107,19 +111,11 @@ impl SubgraphExecutorMap { let subgraph_config = config.subgraphs.get(&subgraph_name); - let semaphore = get_semaphore_for_subgraph( - &origin, - &global_semaphores_by_origin, - subgraph_config - .map(|cfg| cfg.max_connections_per_host) - .unwrap_or(global_config_arc.max_connections_per_host), - global_config_arc.max_connections_per_host, - ); - let http_client = get_http_client_for_subgraph( subgraph_config, &global_config_arc, &global_client_arc, + max_connections_per_host, ); // TODO: Maybe reuse the in-flight requests map in some cases ??? @@ -131,6 +127,11 @@ impl SubgraphExecutorMap { .map(|cfg| Arc::new(cfg.clone())) .unwrap_or_else(|| global_config_arc.clone()); + let semaphore = semaphores_by_origin + .entry(origin.to_string()) + .or_insert_with(|| Arc::new(Semaphore::new(max_connections_per_host))) + .clone(); + let mut executor = HTTPSubgraphExecutor::new( endpoint_uri.clone(), http_client, @@ -158,12 +159,13 @@ impl SubgraphExecutorMap { // Create a new hyper client based on the traffic shaping config pub fn from_traffic_shaping_config_to_client( config: &TrafficShapingExecutorConfig, + max_connections_per_host: usize, ) -> Arc, Full>> { Arc::new( Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) .pool_idle_timeout(Duration::from_secs(config.pool_idle_timeout_seconds)) - .pool_max_idle_per_host(config.max_connections_per_host) + .pool_max_idle_per_host(max_connections_per_host) .build(HttpsConnector::new()), ) } @@ -174,35 +176,16 @@ fn get_http_client_for_subgraph( subgraph_config: Option<&TrafficShapingExecutorConfig>, global_config: &TrafficShapingExecutorConfig, global_client: &Arc, Full>>, + max_connections_per_host: usize, ) -> Arc, Full>> { match subgraph_config { Some(cfg) => { - if global_config.max_connections_per_host == cfg.max_connections_per_host - && global_config.pool_idle_timeout_seconds == cfg.pool_idle_timeout_seconds - { + if global_config.pool_idle_timeout_seconds == cfg.pool_idle_timeout_seconds { global_client.clone() } else { - from_traffic_shaping_config_to_client(cfg) + from_traffic_shaping_config_to_client(cfg, max_connections_per_host) } } None => global_client.clone(), } } - -// If the subgraph has a specific max_connections_per_host, create a new semaphore for it. -// Otherwise, reuse the global semaphore for that origin. -fn get_semaphore_for_subgraph( - origin: &str, - semaphores_by_origin: &DashMap>, - max_connections_per_host: usize, - global_max_connections_per_host: usize, -) -> Arc { - if max_connections_per_host == global_max_connections_per_host { - semaphores_by_origin - .entry(origin.to_string()) - .or_insert_with(|| Arc::new(Semaphore::new(global_max_connections_per_host))) - .clone() - } else { - Arc::new(Semaphore::new(max_connections_per_host)) - } -} diff --git a/lib/executor/src/executors/timeout.rs b/lib/executor/src/executors/timeout.rs index 99a6ffbc3..1f0d276b6 100644 --- a/lib/executor/src/executors/timeout.rs +++ b/lib/executor/src/executors/timeout.rs @@ -364,7 +364,7 @@ mod tests { "#; let config = hive_router_config::parse_yaml_config(config.to_string()).unwrap(); - let http_client = from_traffic_shaping_config_to_client(&config.traffic_shaping.all); + let http_client = from_traffic_shaping_config_to_client(&config.traffic_shaping.all, 10); let http_executor = crate::executors::http::HTTPSubgraphExecutor::new( endpoint.clone(), http_client, diff --git a/lib/router-config/src/lib.rs b/lib/router-config/src/lib.rs index 08af2bacd..1a738bbc8 100644 --- a/lib/router-config/src/lib.rs +++ b/lib/router-config/src/lib.rs @@ -8,15 +8,13 @@ pub mod query_planner; pub mod supergraph; pub mod traffic_shaping; -use std::collections::HashMap; - use config::{Config, Environment, File, FileFormat, FileSourceFile}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use crate::{ http_server::HttpServerConfig, log::LoggingConfig, query_planner::QueryPlannerConfig, - supergraph::SupergraphSource, traffic_shaping::TrafficShapingExecutorConfig, + supergraph::SupergraphSource, traffic_shaping::TrafficShapingConfig, }; #[derive(Deserialize, Serialize, JsonSchema)] @@ -58,16 +56,6 @@ pub struct HiveRouterConfig { pub cors: cors::CORSConfig, } -#[derive(Clone, Deserialize, Serialize, JsonSchema, Default)] -pub struct TrafficShapingConfig { - /// The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration. - #[serde(default)] - pub all: TrafficShapingExecutorConfig, - /// Optional per-subgraph configurations that will override the default configuration for specific subgraphs. - #[serde(default, skip_serializing_if = "HashMap::is_empty")] - pub subgraphs: HashMap, -} - #[derive(Debug, thiserror::Error)] pub enum RouterConfigError { #[error("Failed to load configuration: {0}")] diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index fd709ecde..d634cf83c 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -3,12 +3,23 @@ use std::time::Duration; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] -pub struct TrafficShapingExecutorConfig { +use std::collections::HashMap; + +#[derive(Clone, Deserialize, Serialize, JsonSchema)] +pub struct TrafficShapingConfig { + /// The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration. + #[serde(default)] + pub all: TrafficShapingExecutorConfig, + /// Optional per-subgraph configurations that will override the default configuration for specific subgraphs. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub subgraphs: HashMap, /// Limits the concurrent amount of requests/connections per host/subgraph. #[serde(default = "default_max_connections_per_host")] pub max_connections_per_host: usize, +} +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +pub struct TrafficShapingExecutorConfig { /// Timeout for idle sockets being kept-alive. #[serde(default = "default_pool_idle_timeout_seconds")] pub pool_idle_timeout_seconds: u64, @@ -60,7 +71,6 @@ where impl Default for TrafficShapingExecutorConfig { fn default() -> Self { Self { - max_connections_per_host: default_max_connections_per_host(), pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(), dedupe_enabled: default_dedupe_enabled(), timeout: None, @@ -68,6 +78,16 @@ impl Default for TrafficShapingExecutorConfig { } } +impl Default for TrafficShapingConfig { + fn default() -> Self { + Self { + all: TrafficShapingExecutorConfig::default(), + subgraphs: HashMap::new(), + max_connections_per_host: default_max_connections_per_host(), + } + } +} + fn default_max_connections_per_host() -> usize { 100 }