Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use oneshot channel to bridge async and blocking-io tasks, this can avoid blocking task block async runtime #3093

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ members = [
"crates/pos/types/types",
"crates/pos/config/management/network-address-encryption",
"crates/rpc/rpc-middlewares",
"crates/tasks",
]

resolver = "2"
Expand Down Expand Up @@ -236,6 +237,7 @@ move-core-types = { path = "./crates/pos/types/move-core-types" }
pow-types = { path = "./crates/pos/types/pow-types" }
diem-types = { path = "./crates/pos/types/types" }
diem-network-address-encryption = { path = "./crates/pos/config/management/network-address-encryption" }
cfx-tasks = { path = "./crates/tasks" }

# basics
bytes = "1.9"
Expand Down Expand Up @@ -387,6 +389,8 @@ synstructure = "0.12"
lru-cache = "0.1"
lru_time_cache = "0.9.0"
slice-group-by = "0.3.1"
auto_impl = "1"
dyn-clone = "1.0.17"

# num misc
bigdecimal = "0.1.0"
Expand Down Expand Up @@ -431,4 +435,4 @@ influx_db_client = "0.5.1"
rocksdb = { git = "https://github.com/Conflux-Chain/rust-rocksdb.git", rev = "3773afe5b953997188f37c39308105b5deb0faac" }

[patch.crates-io]
sqlite3-sys = { git = "https://github.com/Conflux-Chain/sqlite3-sys.git", rev = "1de8e5998f7c2d919336660b8ef4e8f52ac43844" }
sqlite3-sys = { git = "https://github.com/Conflux-Chain/sqlite3-sys.git", rev = "1de8e5998f7c2d919336660b8ef4e8f52ac43844" }
1 change: 1 addition & 0 deletions crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cfx-rpc-builder = { workspace = true }
jsonrpsee = { workspace = true }
cfx-rpc-common-impl = { workspace = true }
cfx-parity-trace-types = { workspace = true }
cfx-tasks = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
Expand Down
15 changes: 15 additions & 0 deletions crates/client/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use blockgen::BlockGenerator;
use cfx_executor::machine::{Machine, VmFactory};
use cfx_parameters::genesis::DEV_GENESIS_KEY_PAIR_2;
use cfx_storage::StorageManager;
use cfx_tasks::TaskManager;
use cfx_types::{address_util::AddressUtil, Address, Space, U256};
pub use cfxcore::pos::pos::PosDropHandle;
use cfxcore::{
Expand Down Expand Up @@ -517,6 +518,7 @@ pub fn initialize_not_light_node_modules(
Option<WSServer>,
Arc<TokioRuntime>,
Option<RpcServerHandle>,
TaskManager,
),
String,
> {
Expand Down Expand Up @@ -668,6 +670,9 @@ pub fn initialize_not_light_node_modules(
accounts,
));

let task_manager = TaskManager::new(tokio_runtime.handle().clone());
let task_executor = task_manager.executor();

let debug_rpc_http_server = super::rpc::start_http(
conf.local_http_config(),
setup_debug_rpc_apis(
Expand All @@ -676,6 +681,7 @@ pub fn initialize_not_light_node_modules(
pubsub.clone(),
eth_pubsub.clone(),
&conf,
task_executor.clone(),
),
)?;

Expand All @@ -687,6 +693,7 @@ pub fn initialize_not_light_node_modules(
pubsub.clone(),
eth_pubsub.clone(),
&conf,
task_executor.clone(),
),
RpcExtractor,
)?;
Expand All @@ -699,6 +706,7 @@ pub fn initialize_not_light_node_modules(
pubsub.clone(),
eth_pubsub.clone(),
&conf,
task_executor.clone(),
),
RpcExtractor,
)?;
Expand All @@ -711,6 +719,7 @@ pub fn initialize_not_light_node_modules(
pubsub.clone(),
eth_pubsub.clone(),
&conf,
task_executor.clone(),
),
RpcExtractor,
)?;
Expand All @@ -723,6 +732,7 @@ pub fn initialize_not_light_node_modules(
pubsub.clone(),
eth_pubsub.clone(),
&conf,
task_executor.clone(),
),
RpcExtractor,
)?;
Expand All @@ -735,6 +745,7 @@ pub fn initialize_not_light_node_modules(
pubsub.clone(),
eth_pubsub.clone(),
&conf,
task_executor.clone(),
),
)?;

Expand All @@ -746,6 +757,7 @@ pub fn initialize_not_light_node_modules(
pubsub.clone(),
eth_pubsub.clone(),
&conf,
task_executor.clone(),
),
RpcExtractor,
)?;
Expand All @@ -758,6 +770,7 @@ pub fn initialize_not_light_node_modules(
pubsub,
eth_pubsub.clone(),
&conf,
task_executor.clone(),
),
)?;

Expand All @@ -778,6 +791,7 @@ pub fn initialize_not_light_node_modules(
sync.clone(),
txpool.clone(),
eth_rpc_http_server_addr,
task_executor.clone(),
))?;

Ok((
Expand All @@ -798,6 +812,7 @@ pub fn initialize_not_light_node_modules(
eth_rpc_ws_server,
tokio_runtime,
async_eth_rpc_http_server,
task_manager,
))
}

Expand Down
4 changes: 4 additions & 0 deletions crates/client/src/node_types/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
};
use blockgen::BlockGenerator;
use cfx_rpc_builder::RpcServerHandle;
use cfx_tasks::TaskManager;
use cfxcore::{
pow::PowComputer, ConsensusGraph, NodeType, SynchronizationService,
TransactionPool,
Expand All @@ -38,6 +39,7 @@ pub struct ArchiveClientExtraComponents {
/// Which use Rust async I/O
pub eth_rpc_server_handle: Option<RpcServerHandle>,
pub tokio_runtime: Arc<TokioRuntime>,
pub task_manager: TaskManager,
}

impl MallocSizeOf for ArchiveClientExtraComponents {
Expand Down Expand Up @@ -79,6 +81,7 @@ impl ArchiveClient {
eth_rpc_ws_server,
tokio_runtime,
eth_rpc_server_handle,
task_manager,
) = initialize_not_light_node_modules(
&mut conf,
exit,
Expand All @@ -103,6 +106,7 @@ impl ArchiveClient {
eth_rpc_ws_server,
eth_rpc_server_handle,
tokio_runtime,
task_manager,
},
}))
}
Expand Down
4 changes: 4 additions & 0 deletions crates/client/src/node_types/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
};
use blockgen::BlockGenerator;
use cfx_rpc_builder::RpcServerHandle;
use cfx_tasks::TaskManager;
use cfxcore::{
pow::PowComputer, ConsensusGraph, NodeType, SynchronizationService,
TransactionPool,
Expand All @@ -38,6 +39,7 @@ pub struct FullClientExtraComponents {
/// Which use Rust async I/O
pub eth_rpc_server_handle: Option<RpcServerHandle>,
pub tokio_runtime: Arc<TokioRuntime>,
pub task_manager: TaskManager,
}

impl MallocSizeOf for FullClientExtraComponents {
Expand Down Expand Up @@ -72,6 +74,7 @@ impl FullClient {
eth_rpc_ws_server,
tokio_runtime,
eth_rpc_server_handle,
task_manager,
) = initialize_not_light_node_modules(&mut conf, exit, NodeType::Full)?;
Ok(Box::new(ClientComponents {
data_manager_weak_ptr: Arc::downgrade(&data_man),
Expand All @@ -92,6 +95,7 @@ impl FullClient {
eth_rpc_ws_server,
eth_rpc_server_handle,
tokio_runtime,
task_manager,
},
}))
}
Expand Down
4 changes: 3 additions & 1 deletion crates/client/src/rpc/impls/eth/eth_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::rpc::{
},
};
use cfx_rpc::EthApi;
use cfx_tasks::TaskExecutor;
use cfx_types::{Address, AddressSpaceUtil, Space, H160, H256, U256, U64};
use cfx_util_macros::bail;
use cfxcore::{
Expand All @@ -33,9 +34,10 @@ impl EthHandler {
pub fn new(
config: RpcImplConfiguration, consensus: SharedConsensusGraph,
sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
executor: TaskExecutor,
) -> Self {
EthHandler {
inner: EthApi::new(config, consensus, sync, tx_pool),
inner: EthApi::new(config, consensus, sync, tx_pool, executor),
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions crates/client/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use cfx_rpc_builder::{
RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle,
TransportRpcModuleConfig,
};
use cfx_tasks::TaskExecutor;
use cfxcore::{
SharedConsensusGraph, SharedSynchronizationService, SharedTransactionPool,
};
Expand Down Expand Up @@ -97,7 +98,7 @@ use std::collections::HashSet;

pub fn setup_public_rpc_apis(
common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
eth_pubsub: EthPubSubClient, conf: &Configuration,
eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor,
) -> MetaIoHandler<Metadata> {
setup_rpc_apis(
common,
Expand All @@ -107,12 +108,13 @@ pub fn setup_public_rpc_apis(
&conf.raw_conf.throttling_conf,
"rpc",
conf.raw_conf.public_rpc_apis.list_apis(),
executor,
)
}

pub fn setup_public_eth_rpc_apis(
common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
eth_pubsub: EthPubSubClient, conf: &Configuration,
eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor,
) -> MetaIoHandler<Metadata> {
setup_rpc_apis(
common,
Expand All @@ -122,12 +124,13 @@ pub fn setup_public_eth_rpc_apis(
&conf.raw_conf.throttling_conf,
"rpc",
conf.raw_conf.public_evm_rpc_apis.list_apis(),
executor,
)
}

pub fn setup_debug_rpc_apis(
common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
eth_pubsub: EthPubSubClient, conf: &Configuration,
eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor,
) -> MetaIoHandler<Metadata> {
setup_rpc_apis(
common,
Expand All @@ -137,13 +140,14 @@ pub fn setup_debug_rpc_apis(
&conf.raw_conf.throttling_conf,
"rpc_local",
ApiSet::All.list_apis(),
executor,
)
}

fn setup_rpc_apis(
common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
eth_pubsub: EthPubSubClient, throttling_conf: &Option<String>,
throttling_section: &str, apis: HashSet<Api>,
throttling_section: &str, apis: HashSet<Api>, executor: TaskExecutor,
) -> MetaIoHandler<Metadata> {
let mut handler = MetaIoHandler::default();
for api in &apis {
Expand Down Expand Up @@ -190,6 +194,7 @@ fn setup_rpc_apis(
rpc.consensus.clone(),
rpc.sync.clone(),
rpc.tx_pool.clone(),
executor.clone(),
)
.to_delegate();
let evm_trace_handler = EthTraceHandler {
Expand Down Expand Up @@ -521,7 +526,7 @@ pub async fn launch_async_rpc_servers(
rpc_conf: RpcImplConfiguration, throttling_conf_file: Option<String>,
apis: RpcModuleSelection, consensus: SharedConsensusGraph,
sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
addr: Option<SocketAddr>,
addr: Option<SocketAddr>, executor: TaskExecutor,
) -> Result<Option<RpcServerHandle>, String> {
if addr.is_none() {
return Ok(None);
Expand All @@ -530,7 +535,7 @@ pub async fn launch_async_rpc_servers(
let enable_metrics = rpc_conf.enable_metrics;

let rpc_module_builder =
RpcModuleBuilder::new(rpc_conf, consensus, sync, tx_pool);
RpcModuleBuilder::new(rpc_conf, consensus, sync, tx_pool, executor);

info!(
"Enabled evm async rpc modules: {:?}",
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ cfxcore = { workspace = true }
cfx-rpc-cfx-types = { workspace = true }
log = { workspace = true }
cfx-rpc-middlewares = { workspace = true }
cfx-tasks = { workspace = true }

[dev-dependencies]
Loading