diff --git a/Cargo.lock b/Cargo.lock index 4e97b18..798123e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "anyhow" +version = "1.0.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b602bfe940d21c130f3895acd65221e8a61270debe89d628b9cb4e3ccb8569b" + [[package]] name = "async-mpmc" version = "0.1.0" @@ -1012,6 +1018,7 @@ checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac" name = "rpc" version = "0.1.0" dependencies = [ + "anyhow", "futures-util", "hyper", "reqwest", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 5d2d519..1197462 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -14,6 +14,7 @@ tracing = "0.1.19" futures-util = "0.3.5" reqwest = { version = "0.10.7", features = ["stream"] } tower-util = "0.3.1" +anyhow = "1.0.32" [dev-dependencies] -tokio = { version = "0.2.22", features = ["macros"] } \ No newline at end of file +tokio = { version = "0.2.22", features = ["macros"] } diff --git a/rpc/src/client.rs b/rpc/src/client.rs index cc0eeb8..e90b4d5 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -1,11 +1,12 @@ +pub(crate) mod engines; + use crate::{Receive, Transmit}; -use std::convert::TryInto; use tower_util::ServiceExt; /// RPC Client. `Engine` is something that can actually send requests to the /// RPC server, e.g. hyper::Client or reqwest::Client. #[derive(Clone)] -pub struct Client { +pub struct Client { engine: Engine, base: String, } @@ -107,61 +108,3 @@ where Ok((tx, rx)) } } - -/// Engine based on reqwest. -#[derive(Clone)] -pub struct ReqwestEngine(reqwest::Client); - -impl ReqwestEngine { - pub fn wrap_client(cl: reqwest::Client) -> ReqwestEngine { - ReqwestEngine(cl) - } - - pub fn new() -> ReqwestEngine { - ReqwestEngine(reqwest::Client::new()) - } -} - -impl Default for ReqwestEngine { - fn default() -> Self { - ReqwestEngine::new() - } -} - -/// Possible errors when using Reqwest-based Engine -#[derive(Debug, thiserror::Error)] -pub enum ReqwestError { - #[error("transport error")] - Reqwest(#[from] reqwest::Error), - #[error("error from http crate")] - Http(#[from] hyper::http::Error), -} - -impl hyper::service::Service> for ReqwestEngine { - type Response = hyper::Response; - type Error = ReqwestError; - type Future = futures_util::future::BoxFuture<'static, Result>; - - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } - - fn call(&mut self, hyper_req: hyper::Request) -> Self::Future { - let client = self.0.clone(); - Box::pin(async move { - let reqwest_req: reqwest::Request = - hyper_req.map(reqwest::Body::wrap_stream).try_into()?; - let response = client.execute(reqwest_req).await?; - let mut builder = hyper::Response::builder().status(response.status()); - for (k, v) in response.headers() { - builder = builder.header(k, v); - } - builder - .body(hyper::Body::wrap_stream(response.bytes_stream())) - .map_err(Into::into) - }) - } -} diff --git a/rpc/src/client/engines.rs b/rpc/src/client/engines.rs new file mode 100644 index 0000000..949148a --- /dev/null +++ b/rpc/src/client/engines.rs @@ -0,0 +1,130 @@ +use hyper::service::Service; +use std::{ + convert::TryInto, + fmt::{self, Debug, Display, Formatter}, +}; + +type HttpRequest = hyper::Request; +type HttpResponse = hyper::Response; + +/// Engine based on reqwest. +#[derive(Clone)] +pub struct ReqwestEngine(reqwest::Client); + +impl ReqwestEngine { + pub fn wrap_client(cl: reqwest::Client) -> ReqwestEngine { + ReqwestEngine(cl) + } + + pub fn new() -> ReqwestEngine { + ReqwestEngine(reqwest::Client::new()) + } +} + +impl Default for ReqwestEngine { + fn default() -> Self { + ReqwestEngine::new() + } +} + +/// Possible errors when using Reqwest-based Engine +#[derive(Debug, thiserror::Error)] +pub enum ReqwestError { + #[error("transport error")] + Reqwest(#[from] reqwest::Error), + #[error("error from http crate")] + Http(#[from] hyper::http::Error), +} + +impl Service for ReqwestEngine { + type Response = HttpResponse; + type Error = ReqwestError; + type Future = futures_util::future::BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, hyper_req: HttpRequest) -> Self::Future { + let client = self.0.clone(); + Box::pin(async move { + let reqwest_req: reqwest::Request = + hyper_req.map(reqwest::Body::wrap_stream).try_into()?; + let response = client.execute(reqwest_req).await?; + let mut builder = hyper::Response::builder().status(response.status()); + for (k, v) in response.headers() { + builder = builder.header(k, v); + } + builder + .body(hyper::Body::wrap_stream(response.bytes_stream())) + .map_err(Into::into) + }) + } +} + +/// Type-erased engine +pub type BoxEngine = tower_util::BoxService; + +pub struct BoxEngineError(anyhow::Error); + +impl BoxEngineError { + fn new(e: E) -> Self { + Self(anyhow::Error::new(e)) + } +} + +impl Debug for BoxEngineError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Debug::fmt(&self.0, f) + } +} + +impl Display for BoxEngineError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl std::error::Error for BoxEngineError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.0.source() + } +} + +/// Type-erases provided engine +pub fn box_engine(engine: E) -> BoxEngine +where + E: Service + Send + 'static, + E::Error: std::error::Error + Send + Sync + 'static, + E::Future: Send + 'static, +{ + struct S(E); + + impl Service for S + where + E: Service, + E::Error: std::error::Error + Send + Sync + 'static, + E::Future: Send + 'static, + { + type Response = HttpResponse; + type Error = BoxEngineError; + type Future = futures_util::future::BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.poll_ready(cx).map_err(BoxEngineError::new) + } + + fn call(&mut self, req: HttpRequest) -> Self::Future { + let inner_fut = self.0.call(req); + Box::pin(async move { inner_fut.await.map_err(BoxEngineError::new) }) + } + } + + BoxEngine::new(S(engine)) +} diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 27b5a46..a63cf8b 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -10,7 +10,10 @@ mod server; mod streaming; mod unary; -pub use client::{Client, ReqwestEngine, ReqwestError}; +pub use client::{ + engines::{box_engine, BoxEngine, BoxEngineError, ReqwestEngine, ReqwestError}, + Client, +}; pub use server::{Handler, MakeRouter, Router, RouterBuilder}; pub use streaming::{ RecvError as StreamingRecvError, SendError as StreamingSendError, Streaming, StreamingRx,