Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tracing = "0.1.19"
futures-util = "0.3.5"
reqwest = { version = "0.10.7", features = ["stream"] }
tower-util = "0.3.1"
anyhow = "1.0.32"

[dev-dependencies]
tokio = { version = "0.2.22", features = ["macros"] }
tokio = { version = "0.2.22", features = ["macros"] }
63 changes: 3 additions & 60 deletions rpc/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
pub(crate) mod engines;

use crate::{Receive, Transmit};
use std::convert::TryInto;
use tower_util::ServiceExt;

/// RPC Client. `Engine` is something that can actually send requests to the
/// RPC server, e.g. hyper::Client or reqwest::Client.
#[derive(Clone)]
pub struct Client<Engine = ReqwestEngine> {
pub struct Client<Engine = engines::ReqwestEngine> {
engine: Engine,
base: String,
}
Expand Down Expand Up @@ -107,61 +108,3 @@ where
Ok((tx, rx))
}
}

/// Engine based on reqwest.
#[derive(Clone)]
pub struct ReqwestEngine(reqwest::Client);

impl ReqwestEngine {
pub fn wrap_client(cl: reqwest::Client) -> ReqwestEngine {
ReqwestEngine(cl)
}

pub fn new() -> ReqwestEngine {
ReqwestEngine(reqwest::Client::new())
}
}

impl Default for ReqwestEngine {
fn default() -> Self {
ReqwestEngine::new()
}
}

/// Possible errors when using Reqwest-based Engine
#[derive(Debug, thiserror::Error)]
pub enum ReqwestError {
#[error("transport error")]
Reqwest(#[from] reqwest::Error),
#[error("error from http crate")]
Http(#[from] hyper::http::Error),
}

impl hyper::service::Service<hyper::Request<hyper::Body>> for ReqwestEngine {
type Response = hyper::Response<hyper::Body>;
type Error = ReqwestError;
type Future = futures_util::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, hyper_req: hyper::Request<hyper::Body>) -> Self::Future {
let client = self.0.clone();
Box::pin(async move {
let reqwest_req: reqwest::Request =
hyper_req.map(reqwest::Body::wrap_stream).try_into()?;
let response = client.execute(reqwest_req).await?;
let mut builder = hyper::Response::builder().status(response.status());
for (k, v) in response.headers() {
builder = builder.header(k, v);
}
builder
.body(hyper::Body::wrap_stream(response.bytes_stream()))
.map_err(Into::into)
})
}
}
130 changes: 130 additions & 0 deletions rpc/src/client/engines.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use hyper::service::Service;
use std::{
convert::TryInto,
fmt::{self, Debug, Display, Formatter},
};

type HttpRequest = hyper::Request<hyper::Body>;
type HttpResponse = hyper::Response<hyper::Body>;

/// Engine based on reqwest.
#[derive(Clone)]
pub struct ReqwestEngine(reqwest::Client);

impl ReqwestEngine {
pub fn wrap_client(cl: reqwest::Client) -> ReqwestEngine {
ReqwestEngine(cl)
}

pub fn new() -> ReqwestEngine {
ReqwestEngine(reqwest::Client::new())
}
}

impl Default for ReqwestEngine {
fn default() -> Self {
ReqwestEngine::new()
}
}

/// Possible errors when using Reqwest-based Engine
#[derive(Debug, thiserror::Error)]
pub enum ReqwestError {
#[error("transport error")]
Reqwest(#[from] reqwest::Error),
#[error("error from http crate")]
Http(#[from] hyper::http::Error),
}

impl Service<HttpRequest> for ReqwestEngine {
type Response = HttpResponse;
type Error = ReqwestError;
type Future = futures_util::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, hyper_req: HttpRequest) -> Self::Future {
let client = self.0.clone();
Box::pin(async move {
let reqwest_req: reqwest::Request =
hyper_req.map(reqwest::Body::wrap_stream).try_into()?;
let response = client.execute(reqwest_req).await?;
let mut builder = hyper::Response::builder().status(response.status());
for (k, v) in response.headers() {
builder = builder.header(k, v);
}
builder
.body(hyper::Body::wrap_stream(response.bytes_stream()))
.map_err(Into::into)
})
}
}

/// Type-erased engine
pub type BoxEngine = tower_util::BoxService<HttpRequest, HttpResponse, BoxEngineError>;

pub struct BoxEngineError(anyhow::Error);

impl BoxEngineError {
fn new<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self(anyhow::Error::new(e))
}
}

impl Debug for BoxEngineError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Debug::fmt(&self.0, f)
}
}

impl Display for BoxEngineError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(&self.0, f)
}
}

impl std::error::Error for BoxEngineError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

/// Type-erases provided engine
pub fn box_engine<E>(engine: E) -> BoxEngine
where
E: Service<HttpRequest, Response = HttpResponse> + Send + 'static,
E::Error: std::error::Error + Send + Sync + 'static,
E::Future: Send + 'static,
{
struct S<E>(E);

impl<E> Service<HttpRequest> for S<E>
where
E: Service<HttpRequest, Response = HttpResponse>,
E::Error: std::error::Error + Send + Sync + 'static,
E::Future: Send + 'static,
{
type Response = HttpResponse;
type Error = BoxEngineError;
type Future = futures_util::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(BoxEngineError::new)
}

fn call(&mut self, req: HttpRequest) -> Self::Future {
let inner_fut = self.0.call(req);
Box::pin(async move { inner_fut.await.map_err(BoxEngineError::new) })
}
}

BoxEngine::new(S(engine))
}
5 changes: 4 additions & 1 deletion rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ mod server;
mod streaming;
mod unary;

pub use client::{Client, ReqwestEngine, ReqwestError};
pub use client::{
engines::{box_engine, BoxEngine, BoxEngineError, ReqwestEngine, ReqwestError},
Client,
};
pub use server::{Handler, MakeRouter, Router, RouterBuilder};
pub use streaming::{
RecvError as StreamingRecvError, SendError as StreamingSendError, Streaming, StreamingRx,
Expand Down