diff --git a/benches/bench.rs b/benches/bench.rs index 774cfba2e7..6669e409ad 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -13,7 +13,7 @@ use tokio::runtime::Runtime as TokioRuntime; mod helpers; -criterion_group!(benches, http_requests, websocket_requests, jsonrpsee_types_v2); +criterion_group!(benches, http_requests, batched_http_requests, websocket_requests, jsonrpsee_types_v2); criterion_main!(benches); fn v2_serialize<'a>(req: JsonRpcCallSer<'a>) -> String { @@ -47,6 +47,13 @@ pub fn http_requests(crit: &mut Criterion) { run_concurrent_round_trip(&rt, crit, client.clone(), "http_concurrent_round_trip"); } +pub fn batched_http_requests(crit: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + let url = rt.block_on(helpers::http_server()); + let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap()); + run_round_trip_with_batch(&rt, crit, client.clone(), "http batch requests"); +} + pub fn websocket_requests(crit: &mut Criterion) { let rt = TokioRuntime::new().unwrap(); let url = rt.block_on(helpers::ws_server()); @@ -66,6 +73,19 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str) { + let mut group = crit.benchmark_group(name); + for batch_size in [2, 5, 10, 50, 100usize].iter() { + let batch = vec![("say_hello", JsonRpcParams::NoParams); *batch_size]; + group.throughput(Throughput::Elements(*batch_size as u64)); + group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| { + b.iter(|| rt.block_on(async { client.batch_request::(batch.clone()).await.unwrap() })) + }); + } + group.finish(); +} + fn run_concurrent_round_trip( rt: &TokioRuntime, crit: &mut Criterion, diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 69a119f9b1..6bc9aff72a 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -39,10 +39,15 @@ use hyper::{ use jsonrpsee_types::error::{Error, GenericTransportError, RpcError}; use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams}; -use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::send_error}; +use jsonrpsee_utils::{ + hyper_helpers::read_response_to_body, + server::{send_error, RpcSender}, +}; use serde::Serialize; +use serde_json::value::RawValue; use socket2::{Domain, Socket, Type}; use std::{ + cmp, net::{SocketAddr, TcpListener}, sync::Arc, }; @@ -153,6 +158,30 @@ impl Server { Ok::<_, HyperError>(service_fn(move |request| { let methods = methods.clone(); let access_control = access_control.clone(); + + // Look up the "method" (i.e. function pointer) from the registered methods and run it passing in + // the params from the request. The result of the computation is sent back over the `tx` channel and + // the result(s) are collected into a `String` and sent back over the wire. + let execute = + move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| { + if let Some(method) = methods.get(method_name) { + let params = RpcParams::new(params.map(|params| params.get())); + // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. + if let Err(err) = (method)(id, params, &tx, 0) { + log::error!( + "execution of method call '{}' failed: {:?}, request id={:?}", + method_name, + err, + id + ); + } + } else { + send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into()); + } + }; + + // Run some validation on the http request, then read the body and try to deserialize it into one of + // two cases: a single RPC request or a batch of RPC requests. async move { if let Err(e) = access_control_is_valid(&access_control, &request) { return Ok::<_, HyperError>(e); @@ -175,31 +204,48 @@ impl Server { // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded(); + // Is this a single request or a batch (or error)? + let mut single = true; - match serde_json::from_slice::(&body) { - Ok(req) => { - log::debug!("recv: {:?}", req); - let params = RpcParams::new(req.params.map(|params| params.get())); - if let Some(method) = methods.get(&*req.method) { - // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. - if let Err(err) = (method)(req.id, params, &tx, 0) { - log::error!("method_call: {} failed: {:?}", req.method, err); - } - } else { - send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into()); + // For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be + // used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged + // enum here and have to try each case individually: first the single request case, then the + // batch case and lastly the error. For the worst case – unparseable input – we make three calls + // to [`serde_json::from_slice`] which is pretty annoying. + // Our [issue](https://github.com/paritytech/jsonrpsee/issues/296). + if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) = + serde_json::from_slice::(&body) + { + execute(id, &tx, &method_name, params); + } else if let Ok(batch) = serde_json::from_slice::>(&body) { + if !batch.is_empty() { + single = false; + for JsonRpcRequest { id, method: method_name, params, .. } in batch { + execute(id, &tx, &method_name, params); } + } else { + send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into()); } - Err(_e) => { - let (id, code) = match serde_json::from_slice::(&body) { - Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest), - Err(_) => (None, JsonRpcErrorCode::ParseError), - }; - send_error(id, &tx, code.into()); - } + } else { + log::error!( + "[service_fn], Cannot parse request body={:?}", + String::from_utf8_lossy(&body[..cmp::min(body.len(), 1024)]) + ); + let (id, code) = match serde_json::from_slice::(&body) { + Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest), + Err(_) => (None, JsonRpcErrorCode::ParseError), + }; + send_error(id, &tx, code.into()); + } + // Closes the receiving half of a channel without dropping it. This prevents any further + // messages from being sent on the channel. + rx.close(); + let response = if single { + rx.next().await.expect("Sender is still alive managed by us above; qed") + } else { + collect_batch_responses(rx).await }; - - let response = rx.next().await.expect("Sender is still alive managed by us above; qed"); - log::debug!("send: {:?}", response); + log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]); Ok::<_, HyperError>(response::ok_response(response)) } })) @@ -211,6 +257,24 @@ impl Server { } } +// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately wrapped in +// `[`/`]`. +async fn collect_batch_responses(rx: mpsc::UnboundedReceiver) -> String { + let mut buf = String::with_capacity(2048); + buf.push('['); + let mut buf = rx + .fold(buf, |mut acc, response| async { + acc = [acc, response].concat(); + acc.push(','); + acc + }) + .await; + // Remove trailing comma + buf.pop(); + buf.push(']'); + buf +} + // Checks to that access control of the received request is the same as configured. fn access_control_is_valid( access_control: &AccessControl, diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index 95c80382cc..318d8192b4 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -37,6 +37,18 @@ async fn single_method_call_works() { } } +#[tokio::test] +async fn invalid_single_method_call() { + let _ = env_logger::try_init(); + let addr = server().await; + let uri = to_http_uri(addr); + + let req = r#"{"jsonrpc":"2.0","method":1, "params": "bar"}"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + assert_eq!(response.body, invalid_request(Id::Null)); +} + #[tokio::test] async fn single_method_call_with_params() { let addr = server().await; @@ -50,6 +62,81 @@ async fn single_method_call_with_params() { assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1))); } +#[tokio::test] +async fn valid_batched_method_calls() { + let _ = env_logger::try_init(); + + let addr = server().await; + let uri = to_http_uri(addr); + + let req = r#"[ + {"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}, + {"jsonrpc":"2.0","method":"add", "params":[3, 4],"id":2}, + {"jsonrpc":"2.0","method":"say_hello","id":3}, + {"jsonrpc":"2.0","method":"add", "params":[5, 6],"id":4} + ]"#; + let response = http_request(req.into(), uri).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + assert_eq!( + response.body, + r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"# + ); +} + +#[tokio::test] +async fn batched_notifications() { + let _ = env_logger::try_init(); + + let addr = server().await; + let uri = to_http_uri(addr); + + let req = r#"[ + {"jsonrpc": "2.0", "method": "notif", "params": [1,2,4]}, + {"jsonrpc": "2.0", "method": "notif", "params": [7]} + ]"#; + let response = http_request(req.into(), uri).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + // Note: this is *not* according to spec. Response should be the empty string, `""`. + assert_eq!(response.body, r#"[{"jsonrpc":"2.0","result":"","id":null},{"jsonrpc":"2.0","result":"","id":null}]"#); +} + +#[tokio::test] +async fn invalid_batched_method_calls() { + let _ = env_logger::try_init(); + + let addr = server().await; + let uri = to_http_uri(addr); + + // batch with no requests + let req = r#"[]"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + assert_eq!(response.body, invalid_request(Id::Null)); + + // batch with invalid request + let req = r#"[123]"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + // Note: according to the spec the `id` should be `null` here, not 123. + assert_eq!(response.body, invalid_request(Id::Num(123))); + + // batch with invalid request + let req = r#"[1, 2, 3]"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + // Note: according to the spec this should return an array of three `Invalid Request`s + assert_eq!(response.body, parse_error(Id::Null)); + + // invalid JSON in batch + let req = r#"[ + {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}, + {"jsonrpc": "2.0", "method" + ]"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + assert_eq!(response.body, parse_error(Id::Null)); +} + #[tokio::test] async fn should_return_method_not_found() { let addr = server().await; diff --git a/types/src/v2/params.rs b/types/src/v2/params.rs index 0053e2b35c..0a324de0c6 100644 --- a/types/src/v2/params.rs +++ b/types/src/v2/params.rs @@ -102,7 +102,7 @@ impl<'a> RpcParams<'a> { /// If your type implement `Into` call that favor of `serde_json::to:value` to /// construct the parameters. Because `serde_json::to_value` serializes the type which /// allocates whereas `Into` doesn't in most cases. -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Clone)] #[serde(untagged)] pub enum JsonRpcParams<'a> { /// No params.