Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e329576

Browse files
committedFeb 25, 2025·
use oneshot channel to avoid blocking task block async runtime
1 parent 9dee81c commit e329576

File tree

22 files changed

+1380
-18
lines changed

22 files changed

+1380
-18
lines changed
 

‎Cargo.lock

+19
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ members = [
103103
"crates/pos/types/types",
104104
"crates/pos/config/management/network-address-encryption",
105105
"crates/rpc/rpc-middlewares",
106+
"crates/tasks",
106107
]
107108

108109
resolver = "2"
@@ -236,6 +237,7 @@ move-core-types = { path = "./crates/pos/types/move-core-types" }
236237
pow-types = { path = "./crates/pos/types/pow-types" }
237238
diem-types = { path = "./crates/pos/types/types" }
238239
diem-network-address-encryption = { path = "./crates/pos/config/management/network-address-encryption" }
240+
cfx-tasks = { path = "./crates/tasks" }
239241

240242
# basics
241243
bytes = "1.9"
@@ -387,6 +389,8 @@ synstructure = "0.12"
387389
lru-cache = "0.1"
388390
lru_time_cache = "0.9.0"
389391
slice-group-by = "0.3.1"
392+
auto_impl = "1"
393+
dyn-clone = "1.0.17"
390394

391395
# num misc
392396
bigdecimal = "0.1.0"
@@ -431,4 +435,4 @@ influx_db_client = "0.5.1"
431435
rocksdb = { git = "https://github.com/Conflux-Chain/rust-rocksdb.git", rev = "3773afe5b953997188f37c39308105b5deb0faac" }
432436

433437
[patch.crates-io]
434-
sqlite3-sys = { git = "https://github.com/Conflux-Chain/sqlite3-sys.git", rev = "1de8e5998f7c2d919336660b8ef4e8f52ac43844" }
438+
sqlite3-sys = { git = "https://github.com/Conflux-Chain/sqlite3-sys.git", rev = "1de8e5998f7c2d919336660b8ef4e8f52ac43844" }

‎crates/client/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ cfx-rpc-builder = { workspace = true }
8181
jsonrpsee = { workspace = true }
8282
cfx-rpc-common-impl = { workspace = true }
8383
cfx-parity-trace-types = { workspace = true }
84+
cfx-tasks = { workspace = true }
8485

8586
[dev-dependencies]
8687
criterion = { workspace = true }

‎crates/client/src/common/mod.rs

+15
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use blockgen::BlockGenerator;
2727
use cfx_executor::machine::{Machine, VmFactory};
2828
use cfx_parameters::genesis::DEV_GENESIS_KEY_PAIR_2;
2929
use cfx_storage::StorageManager;
30+
use cfx_tasks::TaskManager;
3031
use cfx_types::{address_util::AddressUtil, Address, Space, U256};
3132
pub use cfxcore::pos::pos::PosDropHandle;
3233
use cfxcore::{
@@ -517,6 +518,7 @@ pub fn initialize_not_light_node_modules(
517518
Option<WSServer>,
518519
Arc<TokioRuntime>,
519520
Option<RpcServerHandle>,
521+
TaskManager,
520522
),
521523
String,
522524
> {
@@ -668,6 +670,9 @@ pub fn initialize_not_light_node_modules(
668670
accounts,
669671
));
670672

673+
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
674+
let task_executor = task_manager.executor();
675+
671676
let debug_rpc_http_server = super::rpc::start_http(
672677
conf.local_http_config(),
673678
setup_debug_rpc_apis(
@@ -676,6 +681,7 @@ pub fn initialize_not_light_node_modules(
676681
pubsub.clone(),
677682
eth_pubsub.clone(),
678683
&conf,
684+
task_executor.clone(),
679685
),
680686
)?;
681687

@@ -687,6 +693,7 @@ pub fn initialize_not_light_node_modules(
687693
pubsub.clone(),
688694
eth_pubsub.clone(),
689695
&conf,
696+
task_executor.clone(),
690697
),
691698
RpcExtractor,
692699
)?;
@@ -699,6 +706,7 @@ pub fn initialize_not_light_node_modules(
699706
pubsub.clone(),
700707
eth_pubsub.clone(),
701708
&conf,
709+
task_executor.clone(),
702710
),
703711
RpcExtractor,
704712
)?;
@@ -711,6 +719,7 @@ pub fn initialize_not_light_node_modules(
711719
pubsub.clone(),
712720
eth_pubsub.clone(),
713721
&conf,
722+
task_executor.clone(),
714723
),
715724
RpcExtractor,
716725
)?;
@@ -723,6 +732,7 @@ pub fn initialize_not_light_node_modules(
723732
pubsub.clone(),
724733
eth_pubsub.clone(),
725734
&conf,
735+
task_executor.clone(),
726736
),
727737
RpcExtractor,
728738
)?;
@@ -735,6 +745,7 @@ pub fn initialize_not_light_node_modules(
735745
pubsub.clone(),
736746
eth_pubsub.clone(),
737747
&conf,
748+
task_executor.clone(),
738749
),
739750
)?;
740751

@@ -746,6 +757,7 @@ pub fn initialize_not_light_node_modules(
746757
pubsub.clone(),
747758
eth_pubsub.clone(),
748759
&conf,
760+
task_executor.clone(),
749761
),
750762
RpcExtractor,
751763
)?;
@@ -758,6 +770,7 @@ pub fn initialize_not_light_node_modules(
758770
pubsub,
759771
eth_pubsub.clone(),
760772
&conf,
773+
task_executor.clone(),
761774
),
762775
)?;
763776

@@ -778,6 +791,7 @@ pub fn initialize_not_light_node_modules(
778791
sync.clone(),
779792
txpool.clone(),
780793
eth_rpc_http_server_addr,
794+
task_executor.clone(),
781795
))?;
782796

783797
Ok((
@@ -798,6 +812,7 @@ pub fn initialize_not_light_node_modules(
798812
eth_rpc_ws_server,
799813
tokio_runtime,
800814
async_eth_rpc_http_server,
815+
task_manager,
801816
))
802817
}
803818

‎crates/client/src/node_types/archive.rs

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{
1212
};
1313
use blockgen::BlockGenerator;
1414
use cfx_rpc_builder::RpcServerHandle;
15+
use cfx_tasks::TaskManager;
1516
use cfxcore::{
1617
pow::PowComputer, ConsensusGraph, NodeType, SynchronizationService,
1718
TransactionPool,
@@ -38,6 +39,7 @@ pub struct ArchiveClientExtraComponents {
3839
/// Which use Rust async I/O
3940
pub eth_rpc_server_handle: Option<RpcServerHandle>,
4041
pub tokio_runtime: Arc<TokioRuntime>,
42+
pub task_manager: TaskManager,
4143
}
4244

4345
impl MallocSizeOf for ArchiveClientExtraComponents {
@@ -79,6 +81,7 @@ impl ArchiveClient {
7981
eth_rpc_ws_server,
8082
tokio_runtime,
8183
eth_rpc_server_handle,
84+
task_manager,
8285
) = initialize_not_light_node_modules(
8386
&mut conf,
8487
exit,
@@ -103,6 +106,7 @@ impl ArchiveClient {
103106
eth_rpc_ws_server,
104107
eth_rpc_server_handle,
105108
tokio_runtime,
109+
task_manager,
106110
},
107111
}))
108112
}

‎crates/client/src/node_types/full.rs

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{
1212
};
1313
use blockgen::BlockGenerator;
1414
use cfx_rpc_builder::RpcServerHandle;
15+
use cfx_tasks::TaskManager;
1516
use cfxcore::{
1617
pow::PowComputer, ConsensusGraph, NodeType, SynchronizationService,
1718
TransactionPool,
@@ -38,6 +39,7 @@ pub struct FullClientExtraComponents {
3839
/// Which use Rust async I/O
3940
pub eth_rpc_server_handle: Option<RpcServerHandle>,
4041
pub tokio_runtime: Arc<TokioRuntime>,
42+
pub task_manager: TaskManager,
4143
}
4244

4345
impl MallocSizeOf for FullClientExtraComponents {
@@ -72,6 +74,7 @@ impl FullClient {
7274
eth_rpc_ws_server,
7375
tokio_runtime,
7476
eth_rpc_server_handle,
77+
task_manager,
7578
) = initialize_not_light_node_modules(&mut conf, exit, NodeType::Full)?;
7679
Ok(Box::new(ClientComponents {
7780
data_manager_weak_ptr: Arc::downgrade(&data_man),
@@ -92,6 +95,7 @@ impl FullClient {
9295
eth_rpc_ws_server,
9396
eth_rpc_server_handle,
9497
tokio_runtime,
98+
task_manager,
9599
},
96100
}))
97101
}

‎crates/client/src/rpc/impls/eth/eth_handler.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::rpc::{
1616
},
1717
};
1818
use cfx_rpc::EthApi;
19+
use cfx_tasks::TaskExecutor;
1920
use cfx_types::{Address, AddressSpaceUtil, Space, H160, H256, U256, U64};
2021
use cfx_util_macros::bail;
2122
use cfxcore::{
@@ -33,9 +34,10 @@ impl EthHandler {
3334
pub fn new(
3435
config: RpcImplConfiguration, consensus: SharedConsensusGraph,
3536
sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
37+
executor: TaskExecutor,
3638
) -> Self {
3739
EthHandler {
38-
inner: EthApi::new(config, consensus, sync, tx_pool),
40+
inner: EthApi::new(config, consensus, sync, tx_pool, executor),
3941
}
4042
}
4143
}

‎crates/client/src/rpc/mod.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use cfx_rpc_builder::{
66
RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle,
77
TransportRpcModuleConfig,
88
};
9+
use cfx_tasks::TaskExecutor;
910
use cfxcore::{
1011
SharedConsensusGraph, SharedSynchronizationService, SharedTransactionPool,
1112
};
@@ -97,7 +98,7 @@ use std::collections::HashSet;
9798

9899
pub fn setup_public_rpc_apis(
99100
common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
100-
eth_pubsub: EthPubSubClient, conf: &Configuration,
101+
eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor,
101102
) -> MetaIoHandler<Metadata> {
102103
setup_rpc_apis(
103104
common,
@@ -107,12 +108,13 @@ pub fn setup_public_rpc_apis(
107108
&conf.raw_conf.throttling_conf,
108109
"rpc",
109110
conf.raw_conf.public_rpc_apis.list_apis(),
111+
executor,
110112
)
111113
}
112114

113115
pub fn setup_public_eth_rpc_apis(
114116
common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
115-
eth_pubsub: EthPubSubClient, conf: &Configuration,
117+
eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor,
116118
) -> MetaIoHandler<Metadata> {
117119
setup_rpc_apis(
118120
common,
@@ -122,12 +124,13 @@ pub fn setup_public_eth_rpc_apis(
122124
&conf.raw_conf.throttling_conf,
123125
"rpc",
124126
conf.raw_conf.public_evm_rpc_apis.list_apis(),
127+
executor,
125128
)
126129
}
127130

128131
pub fn setup_debug_rpc_apis(
129132
common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
130-
eth_pubsub: EthPubSubClient, conf: &Configuration,
133+
eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor,
131134
) -> MetaIoHandler<Metadata> {
132135
setup_rpc_apis(
133136
common,
@@ -137,13 +140,14 @@ pub fn setup_debug_rpc_apis(
137140
&conf.raw_conf.throttling_conf,
138141
"rpc_local",
139142
ApiSet::All.list_apis(),
143+
executor,
140144
)
141145
}
142146

143147
fn setup_rpc_apis(
144148
common: Arc<CommonImpl>, rpc: Arc<RpcImpl>, pubsub: PubSubClient,
145149
eth_pubsub: EthPubSubClient, throttling_conf: &Option<String>,
146-
throttling_section: &str, apis: HashSet<Api>,
150+
throttling_section: &str, apis: HashSet<Api>, executor: TaskExecutor,
147151
) -> MetaIoHandler<Metadata> {
148152
let mut handler = MetaIoHandler::default();
149153
for api in &apis {
@@ -190,6 +194,7 @@ fn setup_rpc_apis(
190194
rpc.consensus.clone(),
191195
rpc.sync.clone(),
192196
rpc.tx_pool.clone(),
197+
executor.clone(),
193198
)
194199
.to_delegate();
195200
let evm_trace_handler = EthTraceHandler {
@@ -521,7 +526,7 @@ pub async fn launch_async_rpc_servers(
521526
rpc_conf: RpcImplConfiguration, throttling_conf_file: Option<String>,
522527
apis: RpcModuleSelection, consensus: SharedConsensusGraph,
523528
sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
524-
addr: Option<SocketAddr>,
529+
addr: Option<SocketAddr>, executor: TaskExecutor,
525530
) -> Result<Option<RpcServerHandle>, String> {
526531
if addr.is_none() {
527532
return Ok(None);
@@ -530,7 +535,7 @@ pub async fn launch_async_rpc_servers(
530535
let enable_metrics = rpc_conf.enable_metrics;
531536

532537
let rpc_module_builder =
533-
RpcModuleBuilder::new(rpc_conf, consensus, sync, tx_pool);
538+
RpcModuleBuilder::new(rpc_conf, consensus, sync, tx_pool, executor);
534539

535540
info!(
536541
"Enabled evm async rpc modules: {:?}",

‎crates/rpc/rpc-builder/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ cfxcore = { workspace = true }
2929
cfx-rpc-cfx-types = { workspace = true }
3030
log = { workspace = true }
3131
cfx-rpc-middlewares = { workspace = true }
32+
cfx-tasks = { workspace = true }
3233

3334
[dev-dependencies]

0 commit comments

Comments
 (0)
Please sign in to comment.