From 63286e611053fa54f25b66b8fb8aa6263db1599a Mon Sep 17 00:00:00 2001 From: DiscreteTom Date: Tue, 31 Dec 2024 02:20:35 +0000 Subject: [PATCH 1/5] feat: configurable client idle timeout --- README.md | 3 +++ src/lib.rs | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c85d279f..557b98a0 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,7 @@ The readiness check port/path and traffic port can be configured using environme | AWS_LWA_PASS_THROUGH_PATH | the path for receiving event payloads that are passed through from non-http triggers | "/events" | | AWS_LWA_AUTHORIZATION_SOURCE | a header name to be replaced to `Authorization` | None | | AWS_LWA_ERROR_STATUS_CODES | comma-separated list of HTTP status codes that will cause Lambda invocations to fail (e.g. "500,502-504,422") | None | +| AWS_LWA_CLIENT_IDLE_TIMEOUT_MS | HTTP client idle timeout in milliseconds | "4000" | > **Note:** > We use "AWS_LWA_" prefix to namespacing all environment variables used by Lambda Web Adapter. The original ones will be supported until we reach version 1.0. @@ -140,6 +141,8 @@ Please check out [FastAPI with Response Streaming](examples/fastapi-response-str **AWS_LWA_ERROR_STATUS_CODES** - A comma-separated list of HTTP status codes that will cause Lambda invocations to fail. Supports individual codes and ranges (e.g. "500,502-504,422"). When the web application returns any of these status codes, the Lambda invocation will fail and trigger error handling behaviors like retries or DLQ processing. This is useful for treating certain HTTP errors as Lambda execution failures. This feature is disabled by default. +**AWS_LWA_CLIENT_IDLE_TIMEOUT_MS** - HTTP client idle timeout in milliseconds. The default is 4000 milliseconds. + ## Request Context **Request Context** is metadata API Gateway sends to Lambda for a request. It usually contains requestId, requestTime, apiId, identity, and authorizer. Identity and authorizer are useful to get client identity for authorization. API Gateway Developer Guide contains more details [here](https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format). diff --git a/src/lib.rs b/src/lib.rs index 733c9703..3029311e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,6 +79,7 @@ pub struct AdapterOptions { pub invoke_mode: LambdaInvokeMode, pub authorization_source: Option, pub error_status_codes: Option>, + pub client_idle_timeout_ms: u64, } impl Default for AdapterOptions { @@ -120,6 +121,10 @@ impl Default for AdapterOptions { error_status_codes: env::var("AWS_LWA_ERROR_STATUS_CODES") .ok() .map(|codes| parse_status_codes(&codes)), + client_idle_timeout_ms: env::var("AWS_LWA_CLIENT_IDLE_TIMEOUT_MS") + .ok() + .map(|s| s.parse().unwrap()) + .unwrap_or(4000), } } } @@ -176,7 +181,7 @@ impl Adapter { /// to talk with the web server. pub fn new(options: &AdapterOptions) -> Adapter { let client = Client::builder(hyper_util::rt::TokioExecutor::new()) - .pool_idle_timeout(Duration::from_secs(4)) + .pool_idle_timeout(Duration::from_millis(options.client_idle_timeout_ms)) .build(HttpConnector::new()); let schema = "http"; From 1ffac04b4e257f9703d936da1eaa68c5da926b9c Mon Sep 17 00:00:00 2001 From: DiscreteTom Date: Tue, 31 Dec 2024 02:41:20 +0000 Subject: [PATCH 2/5] fix: prevent connection pool from using a to-be-disconnected connection after restoring from Lambda SnapStart --- src/lib.rs | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3029311e..6cb2b469 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,6 @@ use lambda_http::request::RequestContext; use lambda_http::Body; pub use lambda_http::Error; use lambda_http::{Request, RequestExt, Response}; -use std::fmt::Debug; use std::{ env, future::Future, @@ -24,6 +23,7 @@ use std::{ }, time::Duration, }; +use std::{fmt::Debug, time::SystemTime}; use tokio::net::TcpStream; use tokio::time::timeout; use tokio_retry::{strategy::FixedInterval, Retry}; @@ -173,17 +173,21 @@ pub struct Adapter { invoke_mode: LambdaInvokeMode, authorization_source: Option, error_status_codes: Option>, + client_idle_timeout_ms: u64, + // be sure to use `SystemTime` (CLOCK_REALTIME) instead of `Duration` (CLOCK_MONOTONIC) + // to avoid issues when restored from Lambda SnapStart + last_invoke: SystemTime, } impl Adapter { + fn new_client() -> Arc> { + Arc::new(Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new())) + } + /// Create a new HTTP Adapter instance. /// This function initializes a new HTTP client /// to talk with the web server. pub fn new(options: &AdapterOptions) -> Adapter { - let client = Client::builder(hyper_util::rt::TokioExecutor::new()) - .pool_idle_timeout(Duration::from_millis(options.client_idle_timeout_ms)) - .build(HttpConnector::new()); - let schema = "http"; let healthcheck_url = format!( @@ -198,7 +202,7 @@ impl Adapter { .unwrap(); Adapter { - client: Arc::new(client), + client: Self::new_client(), healthcheck_url, healthcheck_protocol: options.readiness_check_protocol, healthcheck_min_unhealthy_status: options.readiness_check_min_unhealthy_status, @@ -211,6 +215,9 @@ impl Adapter { invoke_mode: options.invoke_mode, authorization_source: options.authorization_source.clone(), error_status_codes: options.error_status_codes.clone(), + client_idle_timeout_ms: options.client_idle_timeout_ms, + // it's ok to use `now` here since there is no connections in the connection pool yet + last_invoke: SystemTime::now(), } } } @@ -414,7 +421,21 @@ impl Service for Adapter { } fn call(&mut self, event: Request) -> Self::Future { + // validate client timeout + if self + .last_invoke + .elapsed() + .map(|d| d.as_millis() > self.client_idle_timeout_ms.into()) + // if the last_invoke is in the future, it's ok to re-use the client + .unwrap_or(false) + { + // client timeout, create a new client with a new connection pool. + // this is to prevent the pool from using a to-be-disconnected connection after restoring from Lambda SnapStart + self.client = Self::new_client(); + } + let adapter = self.clone(); + self.last_invoke = SystemTime::now(); Box::pin(async move { adapter.fetch_response(event).await }) } } From d3a4103776131e3d7e852c6e769d41b7c813dcfa Mon Sep 17 00:00:00 2001 From: DiscreteTom Date: Tue, 31 Dec 2024 02:43:06 +0000 Subject: [PATCH 3/5] chore: add debug tracing when client timeout --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 6cb2b469..02cf343e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -431,6 +431,7 @@ impl Service for Adapter { { // client timeout, create a new client with a new connection pool. // this is to prevent the pool from using a to-be-disconnected connection after restoring from Lambda SnapStart + tracing::debug!("Client timeout, creating a new client"); self.client = Self::new_client(); } From 87d27ed14f2011f894ff7c96db40b36eae265df5 Mon Sep 17 00:00:00 2001 From: DiscreteTom Date: Tue, 31 Dec 2024 03:08:28 +0000 Subject: [PATCH 4/5] fix: apply client timeout to hyper client to clean expired connections in normal cases --- src/lib.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 02cf343e..73c741bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,8 +180,12 @@ pub struct Adapter { } impl Adapter { - fn new_client() -> Arc> { - Arc::new(Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new())) + fn new_client(timeout_ms: u64) -> Arc> { + Arc::new( + Client::builder(hyper_util::rt::TokioExecutor::new()) + .pool_idle_timeout(Duration::from_millis(timeout_ms)) + .build(HttpConnector::new()), + ) } /// Create a new HTTP Adapter instance. @@ -202,7 +206,7 @@ impl Adapter { .unwrap(); Adapter { - client: Self::new_client(), + client: Self::new_client(options.client_idle_timeout_ms), healthcheck_url, healthcheck_protocol: options.readiness_check_protocol, healthcheck_min_unhealthy_status: options.readiness_check_min_unhealthy_status, @@ -432,7 +436,7 @@ impl Service for Adapter { // client timeout, create a new client with a new connection pool. // this is to prevent the pool from using a to-be-disconnected connection after restoring from Lambda SnapStart tracing::debug!("Client timeout, creating a new client"); - self.client = Self::new_client(); + self.client = Self::new_client(self.client_idle_timeout_ms); } let adapter = self.clone(); From b9f4d33e361c3af19932bd0520919de99b77ea3c Mon Sep 17 00:00:00 2001 From: DiscreteTom Date: Fri, 3 Jan 2025 03:06:15 +0000 Subject: [PATCH 5/5] tests: add unit tests for client idle timeout --- src/lib.rs | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 75ec1d54..9e3799aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -420,6 +420,15 @@ impl Adapter { Ok(app_response) } + + /// Return whether the client has been idle for longer than the [`Self::client_idle_timeout_ms`]. + fn client_timeout_has_expired(&self) -> bool { + self.last_invoke + .elapsed() + .map(|d| d.as_millis() > self.client_idle_timeout_ms.into()) + // if the last_invoke is in the future, it's ok to re-use the client + .unwrap_or(false) + } } /// Implement a `Tower.Service` that sends the requests @@ -434,14 +443,7 @@ impl Service for Adapter { } fn call(&mut self, event: Request) -> Self::Future { - // validate client timeout - if self - .last_invoke - .elapsed() - .map(|d| d.as_millis() > self.client_idle_timeout_ms.into()) - // if the last_invoke is in the future, it's ok to re-use the client - .unwrap_or(false) - { + if self.client_timeout_has_expired() { // client timeout, create a new client with a new connection pool. // this is to prevent the pool from using a to-be-disconnected connection after restoring from Lambda SnapStart tracing::debug!("Client timeout, creating a new client"); @@ -569,4 +571,16 @@ mod tests { // Assert app server's healthcheck endpoint got called healthcheck.assert(); } + + #[test] + fn test_client_idle_timeout() { + let mut adapter = Adapter::new(&AdapterOptions::default()); + assert!(!adapter.client_timeout_has_expired()); + + adapter.last_invoke = SystemTime::now() - Duration::from_millis(5000); + assert!(adapter.client_timeout_has_expired()); + + adapter.last_invoke = SystemTime::now() + Duration::from_millis(5000); + assert!(!adapter.client_timeout_has_expired()); + } }