From f0478796dd2a626572b78a797641719af465c806 Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 26 Apr 2021 09:15:08 +0200 Subject: [PATCH 01/19] WIP --- http-server/src/server.rs | 56 ++++++++++++++++++++++++++++++--------- http-server/src/tests.rs | 36 +++++++++++++++++++++++++ types/src/v2/request.rs | 17 ++++++++++++ 3 files changed, 96 insertions(+), 13 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 648ea5a853..b3cde642d4 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -37,7 +37,7 @@ use hyper::{ Error as HyperError, }; use jsonrpsee_types::error::{Error, GenericTransportError, RpcError}; -use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; +use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest, SingleOrBatch}; use jsonrpsee_types::v2::{error::ErrorCode, params::RpcParams}; use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::send_error}; use serde::Serialize; @@ -175,28 +175,58 @@ impl Server { // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded(); - - match serde_json::from_slice::(&body) { - Ok(req) => { - log::debug!("recv: {:?}", req); - let params = RpcParams::new(req.params.map(|params| params.get())); - if let Some(method) = methods.get(&*req.method) { + use SingleOrBatch::*; + match serde_json::from_slice::(&body) { + Ok(Single(JsonRpcRequest{ id, method: method_name, params, ..})) => { + log::debug!("SINGLE"); + // log::debug!("recv: {:?}", req); + let params = RpcParams::new(params.map(|params| params.get())); + if let Some(method) = methods.get(&*method_name) { // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. - if let Err(err) = (method)(req.id, params, &tx, 0) { - log::error!("method_call: {} failed: {:?}", req.method, err); + if let Err(err) = (method)(id, params, &tx, 0) { + log::error!("method_call: {} failed: {:?}", method_name, err); } } else { - send_error(req.id, &tx, ErrorCode::MethodNotFound); + send_error(id, &tx, ErrorCode::MethodNotFound); } - } - Err(_e) => { + }, + Ok(Batch(_requests)) => { + log::debug!("BATCH"); + }, + Err(e) => { + let bdy = std::str::from_utf8(&body); + log::debug!("recv (err): body={:?}, err={:?}", bdy, e); let (id, err) = match serde_json::from_slice::(&body) { Ok(req) => (req.id, ErrorCode::InvalidRequest), Err(_) => (None, ErrorCode::ParseError), }; send_error(id, &tx, err); } - }; + } + + + // match serde_json::from_slice::(&body) { + // Ok(req) => { + // log::debug!("recv: {:?}", req); + // let params = RpcParams::new(req.params.map(|params| params.get())); + // if let Some(method) = methods.get(&*req.method) { + // // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. + // if let Err(err) = (method)(req.id, params, &tx, 0) { + // log::error!("method_call: {} failed: {:?}", req.method, err); + // } + // } else { + // send_error(req.id, &tx, ErrorCode::MethodNotFound); + // } + // } + // Err(e) => { + // log::debug!("recv (err): body={:?}, err={:?}", body, e); + // let (id, err) = match serde_json::from_slice::(&body) { + // Ok(req) => (req.id, ErrorCode::InvalidRequest), + // Err(_) => (None, ErrorCode::ParseError), + // }; + // send_error(id, &tx, err); + // } + // }; let response = rx.next().await.expect("Sender is still alive managed by us above; qed"); log::debug!("send: {:?}", response); diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index 95c80382cc..bd3c49bf39 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -50,6 +50,42 @@ async fn single_method_call_with_params() { assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1))); } +// --> [ +// {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}, +// {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]}, +// {"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"}, +// {"foo": "boo"}, +// {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"}, +// {"jsonrpc": "2.0", "method": "get_data", "id": "9"} +// ] +// <-- [ +// {"jsonrpc": "2.0", "result": 7, "id": "1"}, +// {"jsonrpc": "2.0", "result": 19, "id": "2"}, +// {"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request"}, "id": null}, +// {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "5"}, +// {"jsonrpc": "2.0", "result": ["hello", 5], "id": "9"} +// ] +#[tokio::test] +async fn batched_method_calls() { + let _ = env_logger::try_init(); + + let addr = server().await; + let uri = to_http_uri(addr); + + let req = r#"{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}"#; + // let req = r#"[{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}]"#; + // let req = r#"[ + // {"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}, + // {"jsonrpc":"2.0","method":"add", "params":[3, 4],"id":2}, + // {"jsonrpc":"2.0","method":"say_hello","id":3}, + // {"jsonrpc":"2.0","method":"add", "params":[5, 6],"id":4} + // ]"#; + let response = http_request(req.into(), uri).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + log::info!("Response body: {:?}", response.body); + // assert_eq!(response.body, method_not_found(Id::Str("foo".into()))); +} + #[tokio::test] async fn should_return_method_not_found() { let addr = server().await; diff --git a/types/src/v2/request.rs b/types/src/v2/request.rs index 8baa1b392c..00563430f2 100644 --- a/types/src/v2/request.rs +++ b/types/src/v2/request.rs @@ -20,6 +20,23 @@ pub struct JsonRpcRequest<'a> { pub params: Option<&'a RawValue>, } +// /// TODO: docs +// #[derive(Deserialize, Debug)] +// pub struct JsonRpcBatchRequest<'a>( +// #[serde(borrow)] +// pub +// Vec> +// ); + +/// TODO: docs +#[derive(Deserialize, Debug)] +#[serde(untagged)] +pub enum SingleOrBatch<'a> { + Single(#[serde(borrow)] JsonRpcRequest<'a>), + Batch(#[serde(borrow)]Vec>), + // Batch(JsonRpcBatchRequest<'a>), +} + /// Invalid request with known request ID. #[derive(Deserialize, Debug)] pub struct JsonRpcInvalidRequest<'a> { From dd70b49e3ddd232afbef2a5161dd9e175a11c7d8 Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 26 Apr 2021 22:37:56 +0200 Subject: [PATCH 02/19] Implement draft of batch requests --- http-server/src/server.rs | 91 +++++++++++++++++---------------------- http-server/src/tests.rs | 20 ++++----- types/src/v2/request.rs | 17 -------- 3 files changed, 49 insertions(+), 79 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 8fe038486b..ec789666fd 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -37,10 +37,11 @@ use hyper::{ Error as HyperError, }; use jsonrpsee_types::error::{Error, GenericTransportError, RpcError}; -use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest, SingleOrBatch}; +use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams}; -use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::send_error}; +use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::{send_error, RpcSender}}; use serde::Serialize; +use serde_json::value::RawValue; use socket2::{Domain, Socket, Type}; use std::{ net::{SocketAddr, TcpListener}, @@ -153,6 +154,18 @@ impl Server { Ok::<_, HyperError>(service_fn(move |request| { let methods = methods.clone(); let access_control = access_control.clone(); + + let execute = move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| { + if let Some(method) = methods.get(method_name) { + let params = RpcParams::new(params.map(|params| params.get())); + // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. + if let Err(err) = (method)(id, params, &tx, 0) { + log::error!("execution of method call {} failed: {:?}, request id={:?}", method_name, err, id); + } + } else { + send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into()); + } + }; async move { if let Err(e) = access_control_is_valid(&access_control, &request) { return Ok::<_, HyperError>(e); @@ -175,58 +188,32 @@ impl Server { // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded(); - use SingleOrBatch::*; - match serde_json::from_slice::(&body) { - Ok(Single(JsonRpcRequest{ id, method: method_name, params, ..})) => { - log::debug!("SINGLE"); - // log::debug!("recv: {:?}", req); - let params = RpcParams::new(params.map(|params| params.get())); - if let Some(method) = methods.get(&*method_name) { - // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. - if let Err(err) = (method)(id, params, &tx, 0) { - log::error!("method_call: {} failed: {:?}", method_name, err); - } - } else { - send_error(id, &tx, JsonRpcErrorCode::MethodNotFound.into()); - } - }, - Ok(Batch(_requests)) => log::debug!("BATCH"), - Err(_e) => { - let (id, code) = match serde_json::from_slice::(&body) { - Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest), - Err(_) => (None, JsonRpcErrorCode::ParseError), - }; - send_error(id, &tx, code.into()); - } - } - - // match serde_json::from_slice::(&body) { - // Ok(req) => { - // log::debug!("recv: {:?}", req); - // let params = RpcParams::new(req.params.map(|params| params.get())); - // if let Some(method) = methods.get(&*req.method) { - // // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. - // if let Err(err) = (method)(req.id, params, &tx, 0) { - // log::error!("method_call: {} failed: {:?}", req.method, err); - // } - // } else { - // send_error(req.id, &tx, ErrorCode::MethodNotFound); - // } - // } - // Err(e) => { - // log::debug!("recv (err): body={:?}, err={:?}", body, e); - // let (id, err) = match serde_json::from_slice::(&body) { - // Ok(req) => (req.id, ErrorCode::InvalidRequest), - // Err(_) => (None, ErrorCode::ParseError), - // }; - // send_error(id, &tx, err); - // } - // }; + if let Ok(JsonRpcRequest{ id, method: method_name, params, ..}) + = serde_json::from_slice::(&body) { + log::debug!("SINGLE"); + execute(id, &tx, &method_name, params); - let response = rx.next().await.expect("Sender is still alive managed by us above; qed"); - log::debug!("send: {:?}", response); - Ok::<_, HyperError>(response::ok_response(response)) + } else if let Ok(batch) = serde_json::from_slice::>(&body) { + log::debug!("BATCH len={}", batch.len()); + for JsonRpcRequest { id, method: method_name, params, .. } in batch { + execute(id, &tx, &method_name, params); + } + } else { + log::error!("[service_fn], Cannot parse request body={:?}", String::from_utf8_lossy(&body)); + let (id, code) = match serde_json::from_slice::(&body) { + Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest), + Err(_) => (None, JsonRpcErrorCode::ParseError), + }; + send_error(id, &tx, code.into()); + } + // TODO: the [docs]() seem to say that it's good practise to close the receiving end before reading all the items from the stream. Is it true? + rx.close(); + // TODO: this allocates a `Vec` even for single requests, which is annoying. Find a better way (reusable Vec? Pre-allocate? Build a `String` directly?) + let responses = rx.collect::>().await; + log::debug!("[service_fn] sending back: {:?}", responses); + // TODO: `join` will loop over the vec of responses again, which is dumb. Build the string directly. + Ok::<_, HyperError>(response::ok_response(responses.join(","))) } })) } diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index bd3c49bf39..082fc7d67d 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -50,6 +50,7 @@ async fn single_method_call_with_params() { assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1))); } +// Batch request example from spec (https://www.jsonrpc.org/specification) // --> [ // {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}, // {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]}, @@ -72,18 +73,17 @@ async fn batched_method_calls() { let addr = server().await; let uri = to_http_uri(addr); - let req = r#"{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}"#; - // let req = r#"[{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}]"#; - // let req = r#"[ - // {"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}, - // {"jsonrpc":"2.0","method":"add", "params":[3, 4],"id":2}, - // {"jsonrpc":"2.0","method":"say_hello","id":3}, - // {"jsonrpc":"2.0","method":"add", "params":[5, 6],"id":4} - // ]"#; + let req = r#"[ + {"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}, + {"jsonrpc":"2.0","method":"add", "params":[3, 4],"id":2}, + {"jsonrpc":"2.0","method":"say_hello","id":3}, + {"jsonrpc":"2.0","method":"add", "params":[5, 6],"id":4} + ]"#; let response = http_request(req.into(), uri).await.unwrap(); assert_eq!(response.status, StatusCode::OK); - log::info!("Response body: {:?}", response.body); - // assert_eq!(response.body, method_not_found(Id::Str("foo".into()))); + log::info!("[test] Response body: {:?}", response.body); + // TODO: the response should be wrapped in `[]`, but it's a straight up `String` + assert_eq!(response.body, r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"#); } #[tokio::test] diff --git a/types/src/v2/request.rs b/types/src/v2/request.rs index 700aa12e9f..8baa1b392c 100644 --- a/types/src/v2/request.rs +++ b/types/src/v2/request.rs @@ -20,23 +20,6 @@ pub struct JsonRpcRequest<'a> { pub params: Option<&'a RawValue>, } -// /// TODO: docs -// #[derive(Deserialize, Debug)] -// pub struct JsonRpcBatchRequest<'a>( -// #[serde(borrow)] -// pub -// Vec> -// ); - -/// TODO: docs -#[derive(Deserialize, Debug)] -#[serde(untagged)] -pub enum SingleOrBatch<'a> { - Single(#[serde(borrow)] JsonRpcRequest<'a>), - Batch(#[serde(borrow)] Vec>), - // Batch(JsonRpcBatchRequest<'a>), -} - /// Invalid request with known request ID. #[derive(Deserialize, Debug)] pub struct JsonRpcInvalidRequest<'a> { From 3b6e47d40e1248435470aa5759cdea5fee8b461e Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 26 Apr 2021 22:39:45 +0200 Subject: [PATCH 03/19] fmt --- http-server/src/server.rs | 37 +++++++++++++++++++++++-------------- http-server/src/tests.rs | 5 ++++- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index ec789666fd..e280d3bff1 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -39,7 +39,10 @@ use hyper::{ use jsonrpsee_types::error::{Error, GenericTransportError, RpcError}; use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams}; -use jsonrpsee_utils::{hyper_helpers::read_response_to_body, server::{send_error, RpcSender}}; +use jsonrpsee_utils::{ + hyper_helpers::read_response_to_body, + server::{send_error, RpcSender}, +}; use serde::Serialize; use serde_json::value::RawValue; use socket2::{Domain, Socket, Type}; @@ -155,17 +158,23 @@ impl Server { let methods = methods.clone(); let access_control = access_control.clone(); - let execute = move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| { - if let Some(method) = methods.get(method_name) { - let params = RpcParams::new(params.map(|params| params.get())); - // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. - if let Err(err) = (method)(id, params, &tx, 0) { - log::error!("execution of method call {} failed: {:?}, request id={:?}", method_name, err, id); + let execute = + move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| { + if let Some(method) = methods.get(method_name) { + let params = RpcParams::new(params.map(|params| params.get())); + // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. + if let Err(err) = (method)(id, params, &tx, 0) { + log::error!( + "execution of method call {} failed: {:?}, request id={:?}", + method_name, + err, + id + ); + } + } else { + send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into()); } - } else { - send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into()); - } - }; + }; async move { if let Err(e) = access_control_is_valid(&access_control, &request) { return Ok::<_, HyperError>(e); @@ -189,11 +198,11 @@ impl Server { // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded(); - if let Ok(JsonRpcRequest{ id, method: method_name, params, ..}) - = serde_json::from_slice::(&body) { + if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) = + serde_json::from_slice::(&body) + { log::debug!("SINGLE"); execute(id, &tx, &method_name, params); - } else if let Ok(batch) = serde_json::from_slice::>(&body) { log::debug!("BATCH len={}", batch.len()); for JsonRpcRequest { id, method: method_name, params, .. } in batch { diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index 082fc7d67d..65b8d5afa9 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -83,7 +83,10 @@ async fn batched_method_calls() { assert_eq!(response.status, StatusCode::OK); log::info!("[test] Response body: {:?}", response.body); // TODO: the response should be wrapped in `[]`, but it's a straight up `String` - assert_eq!(response.body, r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"#); + assert_eq!( + response.body, + r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"# + ); } #[tokio::test] From 3e7d3c90251c8a47858f89b69802b2250affabfb Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 26 Apr 2021 22:43:16 +0200 Subject: [PATCH 04/19] cleanup --- http-server/src/server.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index e280d3bff1..12616d1766 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -201,10 +201,8 @@ impl Server { if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) = serde_json::from_slice::(&body) { - log::debug!("SINGLE"); execute(id, &tx, &method_name, params); } else if let Ok(batch) = serde_json::from_slice::>(&body) { - log::debug!("BATCH len={}", batch.len()); for JsonRpcRequest { id, method: method_name, params, .. } in batch { execute(id, &tx, &method_name, params); } From d45be62a94e7c14a571a0c7ee5fb52b270e65ae1 Mon Sep 17 00:00:00 2001 From: David Palm Date: Tue, 27 Apr 2021 12:25:12 +0200 Subject: [PATCH 05/19] Explain why we don't use an untagged enum --- http-server/src/server.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 12616d1766..212ddc396b 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -198,6 +198,11 @@ impl Server { // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded(); + // For [technical reasons](https://github.com/serde-rs/json/issues/497), `RawValue` can't be + // used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged + // enum here and have to try each case individually: first the single request case, then the + // batch case and lastly the error. For the worst case – unparseable input – we make three calls + // to [`serde_json::from_slice`] which is pretty annoying. if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) = serde_json::from_slice::(&body) { @@ -214,12 +219,17 @@ impl Server { }; send_error(id, &tx, code.into()); } - // TODO: the [docs]() seem to say that it's good practise to close the receiving end before reading all the items from the stream. Is it true? + // TODO: the + // [docs](https://docs.rs/futures/0.3.14/futures/channel/mpsc/struct.Receiver.html#method.close) + // seem to say that it's good practise to close the receiving end before reading all the items + // from the stream. Is it true? rx.close(); - // TODO: this allocates a `Vec` even for single requests, which is annoying. Find a better way (reusable Vec? Pre-allocate? Build a `String` directly?) + // TODO: this allocates a `Vec` even for single requests, which is annoying. Find a better way + // (reusable Vec? Pre-allocate? Build a `String` directly?) let responses = rx.collect::>().await; log::debug!("[service_fn] sending back: {:?}", responses); - // TODO: `join` will loop over the vec of responses again, which is dumb. Build the string directly. + // TODO: `join` will loop over the vec of responses again, which is dumb. Build the string + // directly. Ok::<_, HyperError>(response::ok_response(responses.join(","))) } })) From 2db102825a43e2d27991255b77408a908177fad8 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 29 Apr 2021 14:32:52 +0200 Subject: [PATCH 06/19] Avoid allocating a Vec for single requets --- http-server/src/server.rs | 30 +++++++++++++++++++----------- http-server/src/tests.rs | 2 -- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 212ddc396b..0f41641042 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -203,11 +203,13 @@ impl Server { // enum here and have to try each case individually: first the single request case, then the // batch case and lastly the error. For the worst case – unparseable input – we make three calls // to [`serde_json::from_slice`] which is pretty annoying. + let mut single = true; if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) = serde_json::from_slice::(&body) { execute(id, &tx, &method_name, params); } else if let Ok(batch) = serde_json::from_slice::>(&body) { + single = false; for JsonRpcRequest { id, method: method_name, params, .. } in batch { execute(id, &tx, &method_name, params); } @@ -219,18 +221,24 @@ impl Server { }; send_error(id, &tx, code.into()); } - // TODO: the - // [docs](https://docs.rs/futures/0.3.14/futures/channel/mpsc/struct.Receiver.html#method.close) - // seem to say that it's good practise to close the receiving end before reading all the items - // from the stream. Is it true? rx.close(); - // TODO: this allocates a `Vec` even for single requests, which is annoying. Find a better way - // (reusable Vec? Pre-allocate? Build a `String` directly?) - let responses = rx.collect::>().await; - log::debug!("[service_fn] sending back: {:?}", responses); - // TODO: `join` will loop over the vec of responses again, which is dumb. Build the string - // directly. - Ok::<_, HyperError>(response::ok_response(responses.join(","))) + let response = + if single { + rx.next().await.expect("Sender is still alive managed by us above; qed") + } else { + let mut buf = String::with_capacity(2048); + buf.push('['); + let mut buf = rx.fold(buf, move |mut acc, response| async move { + acc = [acc, response].concat(); + acc.push(','); + acc + }).await; + buf.pop(); + buf.push(']'); + buf + }; + log::debug!("[service_fn] sending back: {:?}", response); + Ok::<_, HyperError>(response::ok_response(response)) } })) } diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index 65b8d5afa9..958a572561 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -81,8 +81,6 @@ async fn batched_method_calls() { ]"#; let response = http_request(req.into(), uri).await.unwrap(); assert_eq!(response.status, StatusCode::OK); - log::info!("[test] Response body: {:?}", response.body); - // TODO: the response should be wrapped in `[]`, but it's a straight up `String` assert_eq!( response.body, r#"[{"jsonrpc":"2.0","result":3,"id":1},{"jsonrpc":"2.0","result":7,"id":2},{"jsonrpc":"2.0","result":"lo","id":3},{"jsonrpc":"2.0","result":11,"id":4}]"# From a229deba4ed5a8cf2cc18eeb3d52fbf438b4e6a0 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 29 Apr 2021 14:37:36 +0200 Subject: [PATCH 07/19] Add comment --- http-server/src/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 0f41641042..633e51cba5 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -197,13 +197,14 @@ impl Server { // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded(); + // Is this a single request or a batch (or error)? + let mut single = true; // For [technical reasons](https://github.com/serde-rs/json/issues/497), `RawValue` can't be // used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged // enum here and have to try each case individually: first the single request case, then the // batch case and lastly the error. For the worst case – unparseable input – we make three calls // to [`serde_json::from_slice`] which is pretty annoying. - let mut single = true; if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) = serde_json::from_slice::(&body) { From 361e6cf4b65715d71a594dfe4c76a031992e4aaf Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 29 Apr 2021 18:34:41 +0200 Subject: [PATCH 08/19] Add a benchmark for batch requests --- benches/bench.rs | 21 ++++++++++++++++++++- types/src/v2/params.rs | 2 +- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 774cfba2e7..26bcb8329c 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -9,11 +9,12 @@ use jsonrpsee::{ ws_client::WsClientBuilder, }; use std::sync::Arc; +use std::time::Instant; use tokio::runtime::Runtime as TokioRuntime; mod helpers; -criterion_group!(benches, http_requests, websocket_requests, jsonrpsee_types_v2); +criterion_group!(benches, http_requests, batched_http_requests, websocket_requests, jsonrpsee_types_v2); criterion_main!(benches); fn v2_serialize<'a>(req: JsonRpcCallSer<'a>) -> String { @@ -47,6 +48,13 @@ pub fn http_requests(crit: &mut Criterion) { run_concurrent_round_trip(&rt, crit, client.clone(), "http_concurrent_round_trip"); } +pub fn batched_http_requests(crit: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + let url = rt.block_on(helpers::http_server()); + let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap()); + run_round_trip_with_batch(&rt, crit, client.clone(), "batched_http_round_trip", 10); +} + pub fn websocket_requests(crit: &mut Criterion) { let rt = TokioRuntime::new().unwrap(); let url = rt.block_on(helpers::ws_server()); @@ -66,6 +74,17 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str, batch_size: usize) { + let batch = vec![("say_hello", JsonRpcParams::NoParams); batch_size]; + crit.bench_function(name, |b| { + b.iter(|| { + rt.block_on(async { + black_box(client.batch_request::(batch.clone()).await.unwrap()); + }) + }) + }); +} + fn run_concurrent_round_trip( rt: &TokioRuntime, crit: &mut Criterion, diff --git a/types/src/v2/params.rs b/types/src/v2/params.rs index 0053e2b35c..0a324de0c6 100644 --- a/types/src/v2/params.rs +++ b/types/src/v2/params.rs @@ -102,7 +102,7 @@ impl<'a> RpcParams<'a> { /// If your type implement `Into` call that favor of `serde_json::to:value` to /// construct the parameters. Because `serde_json::to_value` serializes the type which /// allocates whereas `Into` doesn't in most cases. -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Clone)] #[serde(untagged)] pub enum JsonRpcParams<'a> { /// No params. From baf2d08d398fa3f37cd9c547a90c00ee4d9bd0af Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 29 Apr 2021 19:34:24 +0200 Subject: [PATCH 09/19] Add more tests, noting where we diverge from the spec Fix empty batch case, i.e. `[]` --- http-server/src/server.rs | 12 ++++-- http-server/src/tests.rs | 87 +++++++++++++++++++++++++++++++-------- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 633e51cba5..e64bf0ab21 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -200,7 +200,7 @@ impl Server { // Is this a single request or a batch (or error)? let mut single = true; - // For [technical reasons](https://github.com/serde-rs/json/issues/497), `RawValue` can't be + // For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be // used with untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged // enum here and have to try each case individually: first the single request case, then the // batch case and lastly the error. For the worst case – unparseable input – we make three calls @@ -210,9 +210,13 @@ impl Server { { execute(id, &tx, &method_name, params); } else if let Ok(batch) = serde_json::from_slice::>(&body) { - single = false; - for JsonRpcRequest { id, method: method_name, params, .. } in batch { - execute(id, &tx, &method_name, params); + if batch.len() > 0 { + single = false; + for JsonRpcRequest { id, method: method_name, params, .. } in batch { + execute(id, &tx, &method_name, params); + } + } else { + send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into()); } } else { log::error!("[service_fn], Cannot parse request body={:?}", String::from_utf8_lossy(&body)); diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index 958a572561..bfb3fa11e1 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -37,6 +37,18 @@ async fn single_method_call_works() { } } +#[tokio::test] +async fn invalid_single_method_call() { + let _ = env_logger::try_init(); + let addr = server().await; + let uri = to_http_uri(addr); + + let req = r#"{"jsonrpc":"2.0","method":1, "params": "bar"}"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + assert_eq!(response.body, invalid_request(Id::Null)); +} + #[tokio::test] async fn single_method_call_with_params() { let addr = server().await; @@ -50,24 +62,8 @@ async fn single_method_call_with_params() { assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1))); } -// Batch request example from spec (https://www.jsonrpc.org/specification) -// --> [ -// {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}, -// {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]}, -// {"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"}, -// {"foo": "boo"}, -// {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"}, -// {"jsonrpc": "2.0", "method": "get_data", "id": "9"} -// ] -// <-- [ -// {"jsonrpc": "2.0", "result": 7, "id": "1"}, -// {"jsonrpc": "2.0", "result": 19, "id": "2"}, -// {"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid Request"}, "id": null}, -// {"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "5"}, -// {"jsonrpc": "2.0", "result": ["hello", 5], "id": "9"} -// ] #[tokio::test] -async fn batched_method_calls() { +async fn valid_batched_method_calls() { let _ = env_logger::try_init(); let addr = server().await; @@ -87,6 +83,63 @@ async fn batched_method_calls() { ); } +#[tokio::test] +async fn batched_notifications() { + let _ = env_logger::try_init(); + + let addr = server().await; + let uri = to_http_uri(addr); + + let req = r#"[ + {"jsonrpc": "2.0", "method": "notif", "params": [1,2,4]}, + {"jsonrpc": "2.0", "method": "notif", "params": [7]} + ]"#; + let response = http_request(req.into(), uri).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + // Note: this is *not* according to spec. Response should be the empty string, `""`. + assert_eq!( + response.body, + r#"[{"jsonrpc":"2.0","result":"","id":null},{"jsonrpc":"2.0","result":"","id":null}]"# + ); +} + +#[tokio::test] +async fn invalid_batched_method_calls() { + let _ = env_logger::try_init(); + + let addr = server().await; + let uri = to_http_uri(addr); + + // batch with no requests + let req = r#"[]"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + assert_eq!(response.body, invalid_request(Id::Null)); + + // batch with invalid request + let req = r#"[123]"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + // Note: according to the spec the `id` should be `null` here, not 123. + assert_eq!(response.body, invalid_request(Id::Num(123))); + + // batch with invalid request + let req = r#"[1, 2, 3]"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + // Note: according to the spec this should return an array of three `Invalid Request`s + assert_eq!(response.body, parse_error(Id::Null)); + + // invalid JSON in batch + let req = r#"[ + {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}, + {"jsonrpc": "2.0", "method" + ]"#; + let response = http_request(req.into(), uri.clone()).await.unwrap(); + assert_eq!(response.status, StatusCode::OK); + assert_eq!(response.body, parse_error(Id::Null)); +} + #[tokio::test] async fn should_return_method_not_found() { let addr = server().await; From c106710a2f7c14c2deacd549bcd5d7beccf8d121 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 29 Apr 2021 19:56:11 +0200 Subject: [PATCH 10/19] Obey the fmt --- benches/bench.rs | 8 +++++++- http-server/src/server.rs | 25 +++++++++++++------------ http-server/src/tests.rs | 5 +---- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 26bcb8329c..cc58fb376c 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -74,7 +74,13 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str, batch_size: usize) { +fn run_round_trip_with_batch( + rt: &TokioRuntime, + crit: &mut Criterion, + client: Arc, + name: &str, + batch_size: usize, +) { let batch = vec![("say_hello", JsonRpcParams::NoParams); batch_size]; crit.bench_function(name, |b| { b.iter(|| { diff --git a/http-server/src/server.rs b/http-server/src/server.rs index e64bf0ab21..acf24dd9a6 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -227,21 +227,22 @@ impl Server { send_error(id, &tx, code.into()); } rx.close(); - let response = - if single { - rx.next().await.expect("Sender is still alive managed by us above; qed") - } else { - let mut buf = String::with_capacity(2048); - buf.push('['); - let mut buf = rx.fold(buf, move |mut acc, response| async move { + let response = if single { + rx.next().await.expect("Sender is still alive managed by us above; qed") + } else { + let mut buf = String::with_capacity(2048); + buf.push('['); + let mut buf = rx + .fold(buf, move |mut acc, response| async move { acc = [acc, response].concat(); acc.push(','); acc - }).await; - buf.pop(); - buf.push(']'); - buf - }; + }) + .await; + buf.pop(); + buf.push(']'); + buf + }; log::debug!("[service_fn] sending back: {:?}", response); Ok::<_, HyperError>(response::ok_response(response)) } diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index bfb3fa11e1..318d8192b4 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -97,10 +97,7 @@ async fn batched_notifications() { let response = http_request(req.into(), uri).await.unwrap(); assert_eq!(response.status, StatusCode::OK); // Note: this is *not* according to spec. Response should be the empty string, `""`. - assert_eq!( - response.body, - r#"[{"jsonrpc":"2.0","result":"","id":null},{"jsonrpc":"2.0","result":"","id":null}]"# - ); + assert_eq!(response.body, r#"[{"jsonrpc":"2.0","result":"","id":null},{"jsonrpc":"2.0","result":"","id":null}]"#); } #[tokio::test] From ee40258abdd4b78df36a67a772de33aae69ccb9c Mon Sep 17 00:00:00 2001 From: David Date: Fri, 30 Apr 2021 08:03:22 +0200 Subject: [PATCH 11/19] Update benches/bench.rs Co-authored-by: Andrew Plaza --- benches/bench.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/benches/bench.rs b/benches/bench.rs index cc58fb376c..99a5f8f5cf 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -9,7 +9,6 @@ use jsonrpsee::{ ws_client::WsClientBuilder, }; use std::sync::Arc; -use std::time::Instant; use tokio::runtime::Runtime as TokioRuntime; mod helpers; From 8b6e213561b13c79ea22a5f0eb81d8569081de2c Mon Sep 17 00:00:00 2001 From: David Date: Fri, 30 Apr 2021 08:03:27 +0200 Subject: [PATCH 12/19] Update http-server/src/server.rs Co-authored-by: Andrew Plaza --- http-server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index acf24dd9a6..ad2ba80887 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -210,7 +210,7 @@ impl Server { { execute(id, &tx, &method_name, params); } else if let Ok(batch) = serde_json::from_slice::>(&body) { - if batch.len() > 0 { + if !batch.is_empty() { single = false; for JsonRpcRequest { id, method: method_name, params, .. } in batch { execute(id, &tx, &method_name, params); From ec844083569c52b69815ddec02f9a1f5bef996c6 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 30 Apr 2021 08:24:46 +0200 Subject: [PATCH 13/19] Add link to issue --- http-server/src/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index ad2ba80887..fe9d8affbc 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -205,6 +205,7 @@ impl Server { // enum here and have to try each case individually: first the single request case, then the // batch case and lastly the error. For the worst case – unparseable input – we make three calls // to [`serde_json::from_slice`] which is pretty annoying. + // Our [issue](https://github.com/paritytech/jsonrpsee/issues/296). if let Ok(JsonRpcRequest { id, method: method_name, params, .. }) = serde_json::from_slice::(&body) { From f5212a69c439b457312c71d4e7edc6df9675eb51 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 30 Apr 2021 10:23:47 +0200 Subject: [PATCH 14/19] Explain why we're closing the receiving end of the channel. --- http-server/src/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index fe9d8affbc..bd06d50eec 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -227,6 +227,7 @@ impl Server { }; send_error(id, &tx, code.into()); } + // Closes the receiving half of a channel without dropping it. This prevents any further messages from being sent on the channel. rx.close(); let response = if single { rx.next().await.expect("Sender is still alive managed by us above; qed") From 1fea05e8ed191db4d73122c03638b31afd21e087 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 30 Apr 2021 13:18:35 +0200 Subject: [PATCH 15/19] Limit logging of requests and response to 1kb Add more comments Factor out batch response collection --- http-server/src/server.rs | 47 ++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index bd06d50eec..66b9235ae1 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -47,6 +47,7 @@ use serde::Serialize; use serde_json::value::RawValue; use socket2::{Domain, Socket, Type}; use std::{ + cmp, net::{SocketAddr, TcpListener}, sync::Arc, }; @@ -158,6 +159,9 @@ impl Server { let methods = methods.clone(); let access_control = access_control.clone(); + // Look up the "method" (i.e. function pointer) from the registered methods and run it passing in + // the params from the request. The result of the computation is sent back over the `tx` channel and + // the result(s) are collected into a `String` and sent back over the wire. let execute = move |id: Option<&RawValue>, tx: RpcSender, method_name: &str, params: Option<&RawValue>| { if let Some(method) = methods.get(method_name) { @@ -175,6 +179,9 @@ impl Server { send_error(id, tx, JsonRpcErrorCode::MethodNotFound.into()); } }; + + // Run some validation on the http request, then read the body and try to deserialize it into one of + // two cases: a single RPC request or a batch of RPC requests. async move { if let Err(e) = access_control_is_valid(&access_control, &request) { return Ok::<_, HyperError>(e); @@ -220,32 +227,25 @@ impl Server { send_error(None, &tx, JsonRpcErrorCode::InvalidRequest.into()); } } else { - log::error!("[service_fn], Cannot parse request body={:?}", String::from_utf8_lossy(&body)); + log::error!( + "[service_fn], Cannot parse request body={:?}", + String::from_utf8_lossy(&body[..cmp::min(body.len(), 1024)]) + ); let (id, code) = match serde_json::from_slice::(&body) { Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest), Err(_) => (None, JsonRpcErrorCode::ParseError), }; send_error(id, &tx, code.into()); } - // Closes the receiving half of a channel without dropping it. This prevents any further messages from being sent on the channel. + // Closes the receiving half of a channel without dropping it. This prevents any further + // messages from being sent on the channel. rx.close(); let response = if single { rx.next().await.expect("Sender is still alive managed by us above; qed") } else { - let mut buf = String::with_capacity(2048); - buf.push('['); - let mut buf = rx - .fold(buf, move |mut acc, response| async move { - acc = [acc, response].concat(); - acc.push(','); - acc - }) - .await; - buf.pop(); - buf.push(']'); - buf + collect_batch_responses(rx).await }; - log::debug!("[service_fn] sending back: {:?}", response); + log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]); Ok::<_, HyperError>(response::ok_response(response)) } })) @@ -257,6 +257,23 @@ impl Server { } } +// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately mapped in `[`/`]`. +async fn collect_batch_responses(rx: mpsc::UnboundedReceiver) -> String { + let mut buf = String::with_capacity(2048); + buf.push('['); + let mut buf = rx + .fold(buf, |mut acc, response| async { + acc = [acc, response].concat(); + acc.push(','); + acc + }) + .await; + // Remove trailing comma + buf.pop(); + buf.push(']'); + buf +} + // Checks to that access control of the received request is the same as configured. fn access_control_is_valid( access_control: &AccessControl, From 8debdfc952a8937fe35af793cc375fd53a3301c6 Mon Sep 17 00:00:00 2001 From: David Palm Date: Fri, 30 Apr 2021 17:37:49 +0200 Subject: [PATCH 16/19] Wrap comment --- http-server/src/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 66b9235ae1..004fe5ff19 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -257,7 +257,8 @@ impl Server { } } -// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately mapped in `[`/`]`. +// Collect the results of all computations sent back on the ['Stream'] into a single `String` appropriately wrapped in +// `[`/`]`. async fn collect_batch_responses(rx: mpsc::UnboundedReceiver) -> String { let mut buf = String::with_capacity(2048); buf.push('['); From af873d00cf0c9a89a32474b740624c7dcfbf155f Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 3 May 2021 09:35:41 +0200 Subject: [PATCH 17/19] tweak log line --- http-server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 004fe5ff19..6bc9aff72a 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -169,7 +169,7 @@ impl Server { // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. if let Err(err) = (method)(id, params, &tx, 0) { log::error!( - "execution of method call {} failed: {:?}, request id={:?}", + "execution of method call '{}' failed: {:?}, request id={:?}", method_name, err, id From 78f3b25c3a02277153f2c85e9f6e1e712296faeb Mon Sep 17 00:00:00 2001 From: David Palm Date: Tue, 4 May 2021 09:19:14 +0200 Subject: [PATCH 18/19] Benchmark batch request over different batch sizes --- benches/bench.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 99a5f8f5cf..ad3f5efced 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -51,7 +51,7 @@ pub fn batched_http_requests(crit: &mut Criterion) { let rt = TokioRuntime::new().unwrap(); let url = rt.block_on(helpers::http_server()); let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap()); - run_round_trip_with_batch(&rt, crit, client.clone(), "batched_http_round_trip", 10); + run_round_trip_with_batch(&rt, crit, client.clone(), "http batch requests"); } pub fn websocket_requests(crit: &mut Criterion) { @@ -73,21 +73,26 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str, - batch_size: usize, ) { - let batch = vec![("say_hello", JsonRpcParams::NoParams); batch_size]; - crit.bench_function(name, |b| { - b.iter(|| { - rt.block_on(async { - black_box(client.batch_request::(batch.clone()).await.unwrap()); + let mut group = crit.benchmark_group(name); + for batch_size in [2, 5, 10, 50, 100usize].iter() { + let batch = vec![("say_hello", JsonRpcParams::NoParams); *batch_size]; + group.throughput(Throughput::Elements(*batch_size as u64)); + group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| { + b.iter(|| { + rt.block_on( async { + client.batch_request::(batch.clone()).await.unwrap() + }) }) - }) - }); + }); + } + group.finish(); } fn run_concurrent_round_trip( From e2cd2947de133b6a5a966d87b19ab63c580f583e Mon Sep 17 00:00:00 2001 From: David Palm Date: Tue, 4 May 2021 09:57:48 +0200 Subject: [PATCH 19/19] fmt --- benches/bench.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index ad3f5efced..6669e409ad 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -74,22 +74,13 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, - name: &str, -) { +fn run_round_trip_with_batch(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str) { let mut group = crit.benchmark_group(name); for batch_size in [2, 5, 10, 50, 100usize].iter() { let batch = vec![("say_hello", JsonRpcParams::NoParams); *batch_size]; group.throughput(Throughput::Elements(*batch_size as u64)); group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| { - b.iter(|| { - rt.block_on( async { - client.batch_request::(batch.clone()).await.unwrap() - }) - }) + b.iter(|| rt.block_on(async { client.batch_request::(batch.clone()).await.unwrap() })) }); } group.finish();