From 04913c568d02022f633dbc5952a4f00f47102f21 Mon Sep 17 00:00:00 2001 From: Pana Date: Tue, 25 Feb 2025 14:00:02 +0800 Subject: [PATCH] use oneshot channel to avoid blocking task block async runtime --- Cargo.lock | 19 + Cargo.toml | 6 +- crates/client/Cargo.toml | 1 + crates/client/src/common/mod.rs | 15 + crates/client/src/node_types/archive.rs | 4 + crates/client/src/node_types/full.rs | 4 + .../client/src/rpc/impls/eth/eth_handler.rs | 4 +- crates/client/src/rpc/mod.rs | 17 +- crates/rpc/rpc-builder/Cargo.toml | 1 + crates/rpc/rpc-builder/src/lib.rs | 15 +- crates/rpc/rpc-eth-impl/Cargo.toml | 3 +- crates/rpc/rpc-eth-impl/src/eth.rs | 41 +- crates/rpc/rpc-utils/Cargo.toml | 5 +- crates/rpc/rpc-utils/src/error/api.rs | 67 ++ crates/rpc/rpc-utils/src/error/mod.rs | 6 + .../rpc-utils/src/helpers/blocking_tasks.rs | 42 + crates/rpc/rpc-utils/src/helpers/mod.rs | 3 + crates/rpc/rpc-utils/src/lib.rs | 1 + crates/tasks/Cargo.toml | 30 + crates/tasks/src/lib.rs | 718 ++++++++++++++++++ crates/tasks/src/pool.rs | 203 +++++ crates/tasks/src/shutdown.rs | 193 +++++ 22 files changed, 1380 insertions(+), 18 deletions(-) create mode 100644 crates/rpc/rpc-utils/src/error/api.rs create mode 100644 crates/rpc/rpc-utils/src/helpers/blocking_tasks.rs create mode 100644 crates/rpc/rpc-utils/src/helpers/mod.rs create mode 100644 crates/tasks/Cargo.toml create mode 100644 crates/tasks/src/lib.rs create mode 100644 crates/tasks/src/pool.rs create mode 100644 crates/tasks/src/shutdown.rs diff --git a/Cargo.lock b/Cargo.lock index b1b2ee2936..d2e94ddcd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1341,6 +1341,7 @@ dependencies = [ "cfx-rpc-primitives", "cfx-rpc-utils", "cfx-statedb", + "cfx-tasks", "cfx-types", "cfx-util-macros", "cfx-vm-types", @@ -1372,6 +1373,7 @@ dependencies = [ "cfx-rpc-cfx-types", "cfx-rpc-eth-api", "cfx-rpc-middlewares", + "cfx-tasks", "cfxcore", "jsonrpsee", "jsonrpsee-core", @@ -1504,13 +1506,16 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types", "alloy-sol-types", + "cfx-tasks", "cfx-types", + "futures 0.3.30", "jsonrpc-core 18.0.0", "jsonrpsee", "jsonrpsee-core", "revm", "serde_json", "thiserror 2.0.11", + "tokio 1.40.0", ] [[package]] @@ -1585,6 +1590,19 @@ dependencies = [ "tokio 0.2.25", ] +[[package]] +name = "cfx-tasks" +version = "2.4.0" +dependencies = [ + "auto_impl", + "dyn-clone", + "futures-util", + "pin-project", + "rayon", + "thiserror 2.0.11", + "tokio 1.40.0", +] + [[package]] name = "cfx-types" version = "0.2.0" @@ -1935,6 +1953,7 @@ dependencies = [ "cfx-rpc-utils", "cfx-statedb", "cfx-storage", + "cfx-tasks", "cfx-types", "cfx-util-macros", "cfx-vm-types", diff --git a/Cargo.toml b/Cargo.toml index 0a7055968a..6b6362cc2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,6 +103,7 @@ members = [ "crates/pos/types/types", "crates/pos/config/management/network-address-encryption", "crates/rpc/rpc-middlewares", + "crates/tasks", ] resolver = "2" @@ -236,6 +237,7 @@ move-core-types = { path = "./crates/pos/types/move-core-types" } pow-types = { path = "./crates/pos/types/pow-types" } diem-types = { path = "./crates/pos/types/types" } diem-network-address-encryption = { path = "./crates/pos/config/management/network-address-encryption" } +cfx-tasks = { path = "./crates/tasks" } # basics bytes = "1.9" @@ -387,6 +389,8 @@ synstructure = "0.12" lru-cache = "0.1" lru_time_cache = "0.9.0" slice-group-by = "0.3.1" +auto_impl = "1" +dyn-clone = "1.0.17" # num misc bigdecimal = "0.1.0" @@ -431,4 +435,4 @@ influx_db_client = "0.5.1" rocksdb = { git = "https://github.com/Conflux-Chain/rust-rocksdb.git", rev = "3773afe5b953997188f37c39308105b5deb0faac" } [patch.crates-io] -sqlite3-sys = { git = "https://github.com/Conflux-Chain/sqlite3-sys.git", rev = "1de8e5998f7c2d919336660b8ef4e8f52ac43844" } \ No newline at end of file +sqlite3-sys = { git = "https://github.com/Conflux-Chain/sqlite3-sys.git", rev = "1de8e5998f7c2d919336660b8ef4e8f52ac43844" } diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index a565669e65..936997decd 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -81,6 +81,7 @@ cfx-rpc-builder = { workspace = true } jsonrpsee = { workspace = true } cfx-rpc-common-impl = { workspace = true } cfx-parity-trace-types = { workspace = true } +cfx-tasks = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/crates/client/src/common/mod.rs b/crates/client/src/common/mod.rs index cb83ac84ab..6d95c0bbc8 100644 --- a/crates/client/src/common/mod.rs +++ b/crates/client/src/common/mod.rs @@ -27,6 +27,7 @@ use blockgen::BlockGenerator; use cfx_executor::machine::{Machine, VmFactory}; use cfx_parameters::genesis::DEV_GENESIS_KEY_PAIR_2; use cfx_storage::StorageManager; +use cfx_tasks::TaskManager; use cfx_types::{address_util::AddressUtil, Address, Space, U256}; pub use cfxcore::pos::pos::PosDropHandle; use cfxcore::{ @@ -517,6 +518,7 @@ pub fn initialize_not_light_node_modules( Option, Arc, Option, + TaskManager, ), String, > { @@ -668,6 +670,9 @@ pub fn initialize_not_light_node_modules( accounts, )); + let task_manager = TaskManager::new(tokio_runtime.handle().clone()); + let task_executor = task_manager.executor(); + let debug_rpc_http_server = super::rpc::start_http( conf.local_http_config(), setup_debug_rpc_apis( @@ -676,6 +681,7 @@ pub fn initialize_not_light_node_modules( pubsub.clone(), eth_pubsub.clone(), &conf, + task_executor.clone(), ), )?; @@ -687,6 +693,7 @@ pub fn initialize_not_light_node_modules( pubsub.clone(), eth_pubsub.clone(), &conf, + task_executor.clone(), ), RpcExtractor, )?; @@ -699,6 +706,7 @@ pub fn initialize_not_light_node_modules( pubsub.clone(), eth_pubsub.clone(), &conf, + task_executor.clone(), ), RpcExtractor, )?; @@ -711,6 +719,7 @@ pub fn initialize_not_light_node_modules( pubsub.clone(), eth_pubsub.clone(), &conf, + task_executor.clone(), ), RpcExtractor, )?; @@ -723,6 +732,7 @@ pub fn initialize_not_light_node_modules( pubsub.clone(), eth_pubsub.clone(), &conf, + task_executor.clone(), ), RpcExtractor, )?; @@ -735,6 +745,7 @@ pub fn initialize_not_light_node_modules( pubsub.clone(), eth_pubsub.clone(), &conf, + task_executor.clone(), ), )?; @@ -746,6 +757,7 @@ pub fn initialize_not_light_node_modules( pubsub.clone(), eth_pubsub.clone(), &conf, + task_executor.clone(), ), RpcExtractor, )?; @@ -758,6 +770,7 @@ pub fn initialize_not_light_node_modules( pubsub, eth_pubsub.clone(), &conf, + task_executor.clone(), ), )?; @@ -778,6 +791,7 @@ pub fn initialize_not_light_node_modules( sync.clone(), txpool.clone(), eth_rpc_http_server_addr, + task_executor.clone(), ))?; Ok(( @@ -798,6 +812,7 @@ pub fn initialize_not_light_node_modules( eth_rpc_ws_server, tokio_runtime, async_eth_rpc_http_server, + task_manager, )) } diff --git a/crates/client/src/node_types/archive.rs b/crates/client/src/node_types/archive.rs index 5de730eda3..b0da99ddc5 100644 --- a/crates/client/src/node_types/archive.rs +++ b/crates/client/src/node_types/archive.rs @@ -12,6 +12,7 @@ use crate::{ }; use blockgen::BlockGenerator; use cfx_rpc_builder::RpcServerHandle; +use cfx_tasks::TaskManager; use cfxcore::{ pow::PowComputer, ConsensusGraph, NodeType, SynchronizationService, TransactionPool, @@ -38,6 +39,7 @@ pub struct ArchiveClientExtraComponents { /// Which use Rust async I/O pub eth_rpc_server_handle: Option, pub tokio_runtime: Arc, + pub task_manager: TaskManager, } impl MallocSizeOf for ArchiveClientExtraComponents { @@ -79,6 +81,7 @@ impl ArchiveClient { eth_rpc_ws_server, tokio_runtime, eth_rpc_server_handle, + task_manager, ) = initialize_not_light_node_modules( &mut conf, exit, @@ -103,6 +106,7 @@ impl ArchiveClient { eth_rpc_ws_server, eth_rpc_server_handle, tokio_runtime, + task_manager, }, })) } diff --git a/crates/client/src/node_types/full.rs b/crates/client/src/node_types/full.rs index 23d0ea20fd..4ec12d1d48 100644 --- a/crates/client/src/node_types/full.rs +++ b/crates/client/src/node_types/full.rs @@ -12,6 +12,7 @@ use crate::{ }; use blockgen::BlockGenerator; use cfx_rpc_builder::RpcServerHandle; +use cfx_tasks::TaskManager; use cfxcore::{ pow::PowComputer, ConsensusGraph, NodeType, SynchronizationService, TransactionPool, @@ -38,6 +39,7 @@ pub struct FullClientExtraComponents { /// Which use Rust async I/O pub eth_rpc_server_handle: Option, pub tokio_runtime: Arc, + pub task_manager: TaskManager, } impl MallocSizeOf for FullClientExtraComponents { @@ -72,6 +74,7 @@ impl FullClient { eth_rpc_ws_server, tokio_runtime, eth_rpc_server_handle, + task_manager, ) = initialize_not_light_node_modules(&mut conf, exit, NodeType::Full)?; Ok(Box::new(ClientComponents { data_manager_weak_ptr: Arc::downgrade(&data_man), @@ -92,6 +95,7 @@ impl FullClient { eth_rpc_ws_server, eth_rpc_server_handle, tokio_runtime, + task_manager, }, })) } diff --git a/crates/client/src/rpc/impls/eth/eth_handler.rs b/crates/client/src/rpc/impls/eth/eth_handler.rs index d1468732c9..9b30a7bb77 100644 --- a/crates/client/src/rpc/impls/eth/eth_handler.rs +++ b/crates/client/src/rpc/impls/eth/eth_handler.rs @@ -16,6 +16,7 @@ use crate::rpc::{ }, }; use cfx_rpc::EthApi; +use cfx_tasks::TaskExecutor; use cfx_types::{Address, AddressSpaceUtil, Space, H160, H256, U256, U64}; use cfx_util_macros::bail; use cfxcore::{ @@ -33,9 +34,10 @@ impl EthHandler { pub fn new( config: RpcImplConfiguration, consensus: SharedConsensusGraph, sync: SharedSynchronizationService, tx_pool: SharedTransactionPool, + executor: TaskExecutor, ) -> Self { EthHandler { - inner: EthApi::new(config, consensus, sync, tx_pool), + inner: EthApi::new(config, consensus, sync, tx_pool, executor), } } } diff --git a/crates/client/src/rpc/mod.rs b/crates/client/src/rpc/mod.rs index ca98074f52..f3c809d478 100644 --- a/crates/client/src/rpc/mod.rs +++ b/crates/client/src/rpc/mod.rs @@ -6,6 +6,7 @@ use cfx_rpc_builder::{ RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle, TransportRpcModuleConfig, }; +use cfx_tasks::TaskExecutor; use cfxcore::{ SharedConsensusGraph, SharedSynchronizationService, SharedTransactionPool, }; @@ -97,7 +98,7 @@ use std::collections::HashSet; pub fn setup_public_rpc_apis( common: Arc, rpc: Arc, pubsub: PubSubClient, - eth_pubsub: EthPubSubClient, conf: &Configuration, + eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor, ) -> MetaIoHandler { setup_rpc_apis( common, @@ -107,12 +108,13 @@ pub fn setup_public_rpc_apis( &conf.raw_conf.throttling_conf, "rpc", conf.raw_conf.public_rpc_apis.list_apis(), + executor, ) } pub fn setup_public_eth_rpc_apis( common: Arc, rpc: Arc, pubsub: PubSubClient, - eth_pubsub: EthPubSubClient, conf: &Configuration, + eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor, ) -> MetaIoHandler { setup_rpc_apis( common, @@ -122,12 +124,13 @@ pub fn setup_public_eth_rpc_apis( &conf.raw_conf.throttling_conf, "rpc", conf.raw_conf.public_evm_rpc_apis.list_apis(), + executor, ) } pub fn setup_debug_rpc_apis( common: Arc, rpc: Arc, pubsub: PubSubClient, - eth_pubsub: EthPubSubClient, conf: &Configuration, + eth_pubsub: EthPubSubClient, conf: &Configuration, executor: TaskExecutor, ) -> MetaIoHandler { setup_rpc_apis( common, @@ -137,13 +140,14 @@ pub fn setup_debug_rpc_apis( &conf.raw_conf.throttling_conf, "rpc_local", ApiSet::All.list_apis(), + executor, ) } fn setup_rpc_apis( common: Arc, rpc: Arc, pubsub: PubSubClient, eth_pubsub: EthPubSubClient, throttling_conf: &Option, - throttling_section: &str, apis: HashSet, + throttling_section: &str, apis: HashSet, executor: TaskExecutor, ) -> MetaIoHandler { let mut handler = MetaIoHandler::default(); for api in &apis { @@ -190,6 +194,7 @@ fn setup_rpc_apis( rpc.consensus.clone(), rpc.sync.clone(), rpc.tx_pool.clone(), + executor.clone(), ) .to_delegate(); let evm_trace_handler = EthTraceHandler { @@ -521,7 +526,7 @@ pub async fn launch_async_rpc_servers( rpc_conf: RpcImplConfiguration, throttling_conf_file: Option, apis: RpcModuleSelection, consensus: SharedConsensusGraph, sync: SharedSynchronizationService, tx_pool: SharedTransactionPool, - addr: Option, + addr: Option, executor: TaskExecutor, ) -> Result, String> { if addr.is_none() { return Ok(None); @@ -530,7 +535,7 @@ pub async fn launch_async_rpc_servers( let enable_metrics = rpc_conf.enable_metrics; let rpc_module_builder = - RpcModuleBuilder::new(rpc_conf, consensus, sync, tx_pool); + RpcModuleBuilder::new(rpc_conf, consensus, sync, tx_pool, executor); info!( "Enabled evm async rpc modules: {:?}", diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 46c2d45f9f..55f800c109 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -29,5 +29,6 @@ cfxcore = { workspace = true } cfx-rpc-cfx-types = { workspace = true } log = { workspace = true } cfx-rpc-middlewares = { workspace = true } +cfx-tasks = { workspace = true } [dev-dependencies] diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index adb8941065..24c6609574 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -62,6 +62,7 @@ use std::{ }; pub use tower::layer::util::{Identity, Stack}; // use tower::Layer; +use cfx_tasks::TaskExecutor; /// A builder type to configure the RPC module: See [`RpcModule`] /// @@ -72,18 +73,21 @@ pub struct RpcModuleBuilder { consensus: SharedConsensusGraph, sync: SharedSynchronizationService, tx_pool: SharedTransactionPool, + executor: TaskExecutor, } impl RpcModuleBuilder { pub fn new( config: RpcImplConfiguration, consensus: SharedConsensusGraph, sync: SharedSynchronizationService, tx_pool: SharedTransactionPool, + executor: TaskExecutor, ) -> Self { Self { config, consensus, sync, tx_pool, + executor, } } @@ -103,10 +107,12 @@ impl RpcModuleBuilder { consensus, sync, tx_pool, + executor, } = self; - let mut registry = - RpcRegistryInner::new(config, consensus, sync, tx_pool); + let mut registry = RpcRegistryInner::new( + config, consensus, sync, tx_pool, executor, + ); modules.config = module_config; modules.http = registry.maybe_module(http.as_ref()); @@ -125,12 +131,14 @@ pub struct RpcRegistryInner { sync: SharedSynchronizationService, tx_pool: SharedTransactionPool, modules: HashMap, + executor: TaskExecutor, } impl RpcRegistryInner { pub fn new( config: RpcImplConfiguration, consensus: SharedConsensusGraph, sync: SharedSynchronizationService, tx_pool: SharedTransactionPool, + executor: TaskExecutor, ) -> Self { Self { consensus, @@ -138,6 +146,7 @@ impl RpcRegistryInner { sync, tx_pool, modules: Default::default(), + executor, } } @@ -226,6 +235,7 @@ impl RpcRegistryInner { self.consensus.clone(), self.sync.clone(), self.tx_pool.clone(), + self.executor.clone(), ) .into_rpc() .into(), @@ -250,6 +260,7 @@ impl RpcRegistryInner { self.consensus.clone(), self.sync.clone(), self.tx_pool.clone(), + self.executor.clone(), ); ParityApi::new(eth_api).into_rpc().into() } diff --git a/crates/rpc/rpc-eth-impl/Cargo.toml b/crates/rpc/rpc-eth-impl/Cargo.toml index 70bf3de2cf..6837c80b0f 100644 --- a/crates/rpc/rpc-eth-impl/Cargo.toml +++ b/crates/rpc/rpc-eth-impl/Cargo.toml @@ -47,4 +47,5 @@ tokio-stream = { workspace = true } serde = { workspace = true } cfx-addr = { workspace = true } solidity-abi = { workspace = true } -cfx-rpc-common-impl = { workspace = true } \ No newline at end of file +cfx-rpc-common-impl = { workspace = true } +cfx-tasks = { workspace = true} \ No newline at end of file diff --git a/crates/rpc/rpc-eth-impl/src/eth.rs b/crates/rpc/rpc-eth-impl/src/eth.rs index e59fe82361..95622fff80 100644 --- a/crates/rpc/rpc-eth-impl/src/eth.rs +++ b/crates/rpc/rpc-eth-impl/src/eth.rs @@ -16,11 +16,15 @@ use cfx_rpc_eth_types::{ Transaction, TransactionRequest, }; use cfx_rpc_primitives::{Bytes, Index, U64 as HexU64}; -use cfx_rpc_utils::error::{ - errors::*, jsonrpc_error_helpers::*, - jsonrpsee_error_helpers::internal_error as jsonrpsee_internal_error, +use cfx_rpc_utils::{ + error::{ + errors::*, jsonrpc_error_helpers::*, + jsonrpsee_error_helpers::internal_error as jsonrpsee_internal_error, + }, + helpers::SpawnBlocking, }; use cfx_statedb::StateDbExt; +use cfx_tasks::{TaskExecutor, TaskSpawner}; use cfx_types::{ Address, AddressSpaceUtil, BigEndianHash, Space, H160, H256, H64, U256, U64, }; @@ -32,7 +36,7 @@ use cfxcore::{ SharedSynchronizationService, SharedTransactionPool, }; use jsonrpc_core::Error as RpcError; -use jsonrpsee::core::RpcResult; +use jsonrpsee::{core::RpcResult, types::ErrorObjectOwned}; use primitives::{ filter::LogFilter, receipt::EVM_SPACE_SUCCESS, Action, BlockHashOrEpochNumber, EpochNumber, StorageKey, StorageValue, @@ -40,7 +44,7 @@ use primitives::{ }; use rustc_hex::ToHex; use solidity_abi::string_revert_reason_decode; -use std::collections::HashMap; +use std::{collections::HashMap, future::Future}; type BlockNumber = BlockId; type BlockNumberOrTag = BlockId; @@ -48,18 +52,21 @@ type BlockNumberOrTag = BlockId; type JsonStorageKey = U256; type RpcBlock = Block; +#[derive(Clone)] pub struct EthApi { config: RpcImplConfiguration, consensus: SharedConsensusGraph, sync: SharedSynchronizationService, tx_pool: SharedTransactionPool, fee_history_cache: FeeHistoryCache, + executor: TaskExecutor, } impl EthApi { pub fn new( config: RpcImplConfiguration, consensus: SharedConsensusGraph, sync: SharedSynchronizationService, tx_pool: SharedTransactionPool, + executor: TaskExecutor, ) -> Self { EthApi { config, @@ -67,6 +74,7 @@ impl EthApi { sync, tx_pool, fee_history_cache: FeeHistoryCache::new(), + executor, } } @@ -1047,6 +1055,27 @@ impl EthApi { } } +impl SpawnBlocking for EthApi { + fn io_task_spawner(&self) -> impl TaskSpawner { self.executor.clone() } +} + +impl EthApi { + pub fn async_transaction_by_hash( + &self, hash: H256, + ) -> impl Future, ErrorObjectOwned>> + Send + { + let self_clone = self.clone(); + async move { + let resp = self_clone + .spawn_blocking_io(move |this| { + this.transaction_by_hash(hash).map_err(|err| err.into()) + }) + .await; + resp + } + } +} + impl BlockProvider for &EthApi { fn get_block_epoch_number(&self, hash: &H256) -> Option { self.consensus_graph().get_block_epoch_number(hash) @@ -1174,7 +1203,7 @@ impl EthApiServer for EthApi { async fn transaction_by_hash( &self, hash: H256, ) -> RpcResult> { - self.transaction_by_hash(hash).map_err(|err| err.into()) + self.async_transaction_by_hash(hash).await } /// Returns information about a raw transaction by block hash and diff --git a/crates/rpc/rpc-utils/Cargo.toml b/crates/rpc/rpc-utils/Cargo.toml index 5e9ebfa2d2..feb42f2aae 100644 --- a/crates/rpc/rpc-utils/Cargo.toml +++ b/crates/rpc/rpc-utils/Cargo.toml @@ -22,4 +22,7 @@ serde_json = { workspace = true } alloy-sol-types = { workspace = true } revm = { workspace = true } thiserror = { workspace = true } -jsonrpsee-core = { workspace = true } \ No newline at end of file +jsonrpsee-core = { workspace = true } +futures.workspace = true +tokio.workspace = true +cfx-tasks = { workspace = true } \ No newline at end of file diff --git a/crates/rpc/rpc-utils/src/error/api.rs b/crates/rpc/rpc-utils/src/error/api.rs new file mode 100644 index 0000000000..e2870e2bd9 --- /dev/null +++ b/crates/rpc/rpc-utils/src/error/api.rs @@ -0,0 +1,67 @@ +//! Helper traits to wrap generic l1 errors, in network specific error type +//! configured in `reth_rpc_eth_api::EthApiTypes`. + +use crate::error::EthApiError; + +/// Helper trait to wrap core [`EthApiError`]. +pub trait FromEthApiError: From { + /// Converts from error via [`EthApiError`]. + fn from_eth_err(err: E) -> Self + where EthApiError: From; +} + +impl FromEthApiError for T +where T: From +{ + fn from_eth_err(err: E) -> Self + where EthApiError: From { + T::from(EthApiError::from(err)) + } +} + +/// Helper trait to wrap core [`EthApiError`]. +pub trait IntoEthApiError: Into { + /// Converts into error via [`EthApiError`]. + fn into_eth_err(self) -> E + where E: FromEthApiError; +} + +impl IntoEthApiError for T +where EthApiError: From +{ + fn into_eth_err(self) -> E + where E: FromEthApiError { + E::from_eth_err(self) + } +} + +/// Helper trait to access wrapped core error. +pub trait AsEthApiError { + /// Returns reference to [`EthApiError`], if this an error variant inherited + /// from core functionality. + fn as_err(&self) -> Option<&EthApiError>; + + /// Returns `true` if error is + /// [`RpcInvalidTransactionError::GasTooHigh`]. + fn is_gas_too_high(&self) -> bool { + if let Some(err) = self.as_err() { + return err.is_gas_too_high(); + } + + false + } + + /// Returns `true` if error is + /// [`RpcInvalidTransactionError::GasTooLow`]. + fn is_gas_too_low(&self) -> bool { + if let Some(err) = self.as_err() { + return err.is_gas_too_low(); + } + + false + } +} + +impl AsEthApiError for EthApiError { + fn as_err(&self) -> Option<&EthApiError> { Some(self) } +} diff --git a/crates/rpc/rpc-utils/src/error/mod.rs b/crates/rpc/rpc-utils/src/error/mod.rs index ae3a4db599..c2d4c9a9bc 100644 --- a/crates/rpc/rpc-utils/src/error/mod.rs +++ b/crates/rpc/rpc-utils/src/error/mod.rs @@ -1,4 +1,10 @@ +pub mod api; pub mod error_codes; pub mod errors; pub mod jsonrpc_error_helpers; pub mod jsonrpsee_error_helpers; + +pub use errors::{ + EthApiError, EthResult, RevertError, RpcInvalidTransactionError, + RpcPoolError, +}; diff --git a/crates/rpc/rpc-utils/src/helpers/blocking_tasks.rs b/crates/rpc/rpc-utils/src/helpers/blocking_tasks.rs new file mode 100644 index 0000000000..e07d30c3d1 --- /dev/null +++ b/crates/rpc/rpc-utils/src/helpers/blocking_tasks.rs @@ -0,0 +1,42 @@ +//! Spawns a blocking task. CPU heavy tasks are executed with the `rayon` +//! library. IO heavy tasks are executed on the `tokio` runtime. + +use crate::error::EthApiError; +use cfx_tasks::TaskSpawner; +use futures::Future; +use jsonrpsee::types::ErrorObjectOwned; +use tokio::sync::oneshot; + +/// Executes code on a blocking thread. +pub trait SpawnBlocking: Clone + Send + Sync + 'static { + /// Returns a handle for spawning IO heavy blocking tasks. + /// + /// Runtime access in default trait method implementations. + fn io_task_spawner(&self) -> impl TaskSpawner; + + /// Executes the future on a new blocking task. + /// + /// Note: This is expected for futures that are dominated by blocking IO + /// operations, for tracing or CPU bound operations in general use + /// [`spawn_tracing`](Self::spawn_tracing). + fn spawn_blocking_io( + &self, f: F, + ) -> impl Future> + Send + where + F: FnOnce(Self) -> Result + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let this = self.clone(); + self.io_task_spawner().spawn_blocking(Box::pin(async move { + let res = f(this); + let _ = tx.send(res); + })); + + async move { + rx.await.map_err(|_| { + ErrorObjectOwned::from(EthApiError::InternalEthError) + })? + } + } +} diff --git a/crates/rpc/rpc-utils/src/helpers/mod.rs b/crates/rpc/rpc-utils/src/helpers/mod.rs new file mode 100644 index 0000000000..24f50f1310 --- /dev/null +++ b/crates/rpc/rpc-utils/src/helpers/mod.rs @@ -0,0 +1,3 @@ +pub mod blocking_tasks; + +pub use blocking_tasks::SpawnBlocking; diff --git a/crates/rpc/rpc-utils/src/lib.rs b/crates/rpc/rpc-utils/src/lib.rs index a91e735174..17d5321ccf 100644 --- a/crates/rpc/rpc-utils/src/lib.rs +++ b/crates/rpc/rpc-utils/src/lib.rs @@ -1 +1,2 @@ pub mod error; +pub mod helpers; diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml new file mode 100644 index 0000000000..0347fd9373 --- /dev/null +++ b/crates/tasks/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "cfx-tasks" +edition = "2021" +version.workspace = true +authors.workspace = true +documentation.workspace = true +homepage.workspace = true +keywords.workspace = true +repository.workspace = true +license-file.workspace = true +description = "Task management" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures-util = { workspace = true, features = ["std"] } +tokio = { workspace = true, features = ["sync", "rt"] } +auto_impl = { workspace = true } +dyn-clone = { workspace = true } +thiserror = { workspace = true } + +# feature `rayon` +rayon = { workspace = true, optional = true } +pin-project = { workspace = true, optional = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } + +[features] +rayon = ["dep:rayon", "pin-project"] \ No newline at end of file diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs new file mode 100644 index 0000000000..eea1304551 --- /dev/null +++ b/crates/tasks/src/lib.rs @@ -0,0 +1,718 @@ +// Copyright 2023-2024 Paradigm.xyz +// This file is part of reth. +// Reth is a modular, contributor-friendly and blazing-fast implementation of +// the Ethereum protocol + +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: + +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. + +/// This crate is migrated from the `reth` repository, +/// without the tracing and metrics logic. +use crate::shutdown::{ + signal, GracefulShutdown, GracefulShutdownGuard, Shutdown, Signal, +}; +use dyn_clone::DynClone; +use futures_util::{ + future::{select, BoxFuture}, + Future, FutureExt, TryFutureExt, +}; +use std::{ + any::Any, + fmt::{Display, Formatter}, + pin::{pin, Pin}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + task::{ready, Context, Poll}, +}; +use tokio::{ + runtime::Handle, + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + task::JoinHandle, +}; + +#[cfg(feature = "rayon")] +pub mod pool; +pub mod shutdown; + +#[auto_impl::auto_impl(&, Arc)] +pub trait TaskSpawner: + Send + Sync + Unpin + std::fmt::Debug + DynClone +{ + /// Spawns the task onto the runtime. + /// See also [`Handle::spawn`]. + fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; + + /// This spawns a critical task onto the runtime. + fn spawn_critical( + &self, name: &'static str, fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()>; + + /// Spawns a blocking task onto the runtime. + fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; + + /// This spawns a critical blocking task onto the runtime. + fn spawn_critical_blocking( + &self, name: &'static str, fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()>; +} + +dyn_clone::clone_trait_object!(TaskSpawner); + +/// An [`TaskSpawner`] that uses [`tokio::task::spawn`] to execute tasks +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub struct TokioTaskExecutor; + +impl TokioTaskExecutor { + /// Converts the instance to a boxed [`TaskSpawner`]. + pub fn boxed(self) -> Box { Box::new(self) } +} + +impl TaskSpawner for TokioTaskExecutor { + fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + tokio::task::spawn(fut) + } + + fn spawn_critical( + &self, _name: &'static str, fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()> { + tokio::task::spawn(fut) + } + + fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(fut) + }) + } + + fn spawn_critical_blocking( + &self, _name: &'static str, fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()> { + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(fut) + }) + } +} + +/// Many reth components require to spawn tasks for long-running jobs. For +/// example `discovery` spawns tasks to handle egress and ingress of udp traffic +/// or `network` that spawns session tasks that handle the traffic to and from a +/// peer. +/// +/// To unify how tasks are created, the [`TaskManager`] provides access to the +/// configured Tokio runtime. A [`TaskManager`] stores the +/// [`tokio::runtime::Handle`] it is associated with. In this way it is possible +/// to configure on which runtime a task is executed. +/// +/// The main purpose of this type is to be able to monitor if a critical task +/// panicked, for diagnostic purposes, since tokio task essentially fail +/// silently. Therefore, this type is a Stream that yields the name of panicked +/// task, See [`TaskExecutor::spawn_critical`]. In order to execute Tasks use +/// the [`TaskExecutor`] type [`TaskManager::executor`]. +#[derive(Debug)] +#[must_use = "TaskManager must be polled to monitor critical tasks"] +pub struct TaskManager { + /// Handle to the tokio runtime this task manager is associated with. + /// + /// See [`Handle`] docs. + handle: Handle, + /// Sender half for sending panic signals to this type + panicked_tasks_tx: UnboundedSender, + /// Listens for panicked tasks + panicked_tasks_rx: UnboundedReceiver, + /// The [Signal] to fire when all tasks should be shutdown. + /// + /// This is fired when dropped. + signal: Option, + /// Receiver of the shutdown signal. + on_shutdown: Shutdown, + /// How many [`GracefulShutdown`] tasks are currently active + graceful_tasks: Arc, +} + +// === impl TaskManager === + +impl TaskManager { + /// Returns a a [`TaskManager`] over the currently running Runtime. + /// + /// # Panics + /// + /// This will panic if called outside the context of a Tokio runtime. + pub fn current() -> Self { + let handle = Handle::current(); + Self::new(handle) + } + + /// Create a new instance connected to the given handle's tokio runtime. + pub fn new(handle: Handle) -> Self { + let (panicked_tasks_tx, panicked_tasks_rx) = unbounded_channel(); + let (signal, on_shutdown) = signal(); + Self { + handle, + panicked_tasks_tx, + panicked_tasks_rx, + signal: Some(signal), + on_shutdown, + graceful_tasks: Arc::new(AtomicUsize::new(0)), + } + } + + /// Returns a new [`TaskExecutor`] that can spawn new tasks onto the tokio + /// runtime this type is connected to. + pub fn executor(&self) -> TaskExecutor { + TaskExecutor { + handle: self.handle.clone(), + on_shutdown: self.on_shutdown.clone(), + panicked_tasks_tx: self.panicked_tasks_tx.clone(), + graceful_tasks: Arc::clone(&self.graceful_tasks), + } + } + + /// Fires the shutdown signal and awaits until all tasks are shutdown. + pub fn graceful_shutdown(self) { let _ = self.do_graceful_shutdown(None); } + + /// Fires the shutdown signal and awaits until all tasks are shutdown. + /// + /// Returns true if all tasks were shutdown before the timeout elapsed. + pub fn graceful_shutdown_with_timeout( + self, timeout: std::time::Duration, + ) -> bool { + self.do_graceful_shutdown(Some(timeout)) + } + + fn do_graceful_shutdown( + self, timeout: Option, + ) -> bool { + drop(self.signal); + let when = timeout.map(|t| std::time::Instant::now() + t); + while self.graceful_tasks.load(Ordering::Relaxed) > 0 { + if when + .map(|when| std::time::Instant::now() > when) + .unwrap_or(false) + { + return false; + } + std::hint::spin_loop(); + } + + true + } +} + +/// An endless future that resolves if a critical task panicked. +/// +/// See [`TaskExecutor::spawn_critical`] +impl Future for TaskManager { + type Output = PanickedTaskError; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let err = ready!(self.get_mut().panicked_tasks_rx.poll_recv(cx)); + Poll::Ready(err.expect("stream can not end")) + } +} + +/// Error with the name of the task that panicked and an error downcasted to +/// string, if possible. +#[derive(Debug, thiserror::Error)] +pub struct PanickedTaskError { + task_name: &'static str, + error: Option, +} + +impl Display for PanickedTaskError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let task_name = self.task_name; + if let Some(error) = &self.error { + write!(f, "Critical task `{task_name}` panicked: `{error}`") + } else { + write!(f, "Critical task `{task_name}` panicked") + } + } +} + +impl PanickedTaskError { + fn new(task_name: &'static str, error: Box) -> Self { + let error = match error.downcast::() { + Ok(value) => Some(*value), + Err(error) => match error.downcast::<&str>() { + Ok(value) => Some(value.to_string()), + Err(_) => None, + }, + }; + + Self { task_name, error } + } +} + +/// A type that can spawn new tokio tasks +#[derive(Debug, Clone)] +pub struct TaskExecutor { + /// Handle to the tokio runtime this task manager is associated with. + /// + /// See [`Handle`] docs. + handle: Handle, + /// Receiver of the shutdown signal. + on_shutdown: Shutdown, + /// Sender half for sending panic signals to this type + panicked_tasks_tx: UnboundedSender, + /// Task Executor Metrics + // metrics: TaskExecutorMetrics, + /// How many [`GracefulShutdown`] tasks are currently active + graceful_tasks: Arc, +} + +// === impl TaskExecutor === + +impl TaskExecutor { + /// Returns the [Handle] to the tokio runtime. + pub const fn handle(&self) -> &Handle { &self.handle } + + /// Returns the receiver of the shutdown signal. + pub const fn on_shutdown_signal(&self) -> &Shutdown { &self.on_shutdown } + + /// Spawns a future on the tokio runtime depending on the [`TaskKind`] + fn spawn_on_rt(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()> + where F: Future + Send + 'static { + match task_kind { + TaskKind::Default => self.handle.spawn(fut), + TaskKind::Blocking => { + let handle = self.handle.clone(); + self.handle.spawn_blocking(move || handle.block_on(fut)) + } + } + } + + /// Spawns a regular task depending on the given [`TaskKind`] + fn spawn_task_as(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()> + where F: Future + Send + 'static { + let on_shutdown = self.on_shutdown.clone(); + + // Wrap the original future to increment the finished tasks counter upon + // completion + let task = { + async move { + let fut = pin!(fut); + let _ = select(on_shutdown, fut).await; + } + }; + + self.spawn_on_rt(task, task_kind) + } + + /// Spawns the task onto the runtime. + /// The given future resolves as soon as the [Shutdown] signal is received. + /// + /// See also [`Handle::spawn`]. + pub fn spawn(&self, fut: F) -> JoinHandle<()> + where F: Future + Send + 'static { + self.spawn_task_as(fut, TaskKind::Default) + } + + /// Spawns a blocking task onto the runtime. + /// The given future resolves as soon as the [Shutdown] signal is received. + /// + /// See also [`Handle::spawn_blocking`]. + pub fn spawn_blocking(&self, fut: F) -> JoinHandle<()> + where F: Future + Send + 'static { + self.spawn_task_as(fut, TaskKind::Blocking) + } + + /// Spawns the task onto the runtime. + /// The given future resolves as soon as the [Shutdown] signal is received. + /// + /// See also [`Handle::spawn`]. + pub fn spawn_with_signal( + &self, f: impl FnOnce(Shutdown) -> F, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + let on_shutdown = self.on_shutdown.clone(); + let fut = f(on_shutdown); + + let task = fut; + + self.handle.spawn(task) + } + + /// Spawns a critical task depending on the given [`TaskKind`] + fn spawn_critical_as( + &self, name: &'static str, fut: F, task_kind: TaskKind, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + let panicked_tasks_tx = self.panicked_tasks_tx.clone(); + let on_shutdown = self.on_shutdown.clone(); + + // wrap the task in catch unwind + let task = std::panic::AssertUnwindSafe(fut).catch_unwind().map_err( + move |error| { + let task_error = PanickedTaskError::new(name, error); + let _ = panicked_tasks_tx.send(task_error); + }, + ); + + let task = async move { + let task = pin!(task); + let _ = select(on_shutdown, task).await; + }; + + self.spawn_on_rt(task, task_kind) + } + + /// This spawns a critical blocking task onto the runtime. + /// The given future resolves as soon as the [Shutdown] signal is received. + /// + /// If this task panics, the [`TaskManager`] is notified. + pub fn spawn_critical_blocking( + &self, name: &'static str, fut: F, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + self.spawn_critical_as(name, fut, TaskKind::Blocking) + } + + /// This spawns a critical task onto the runtime. + /// The given future resolves as soon as the [Shutdown] signal is received. + /// + /// If this task panics, the [`TaskManager`] is notified. + pub fn spawn_critical( + &self, name: &'static str, fut: F, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + self.spawn_critical_as(name, fut, TaskKind::Default) + } + + /// This spawns a critical task onto the runtime. + /// + /// If this task panics, the [`TaskManager`] is notified. + pub fn spawn_critical_with_shutdown_signal( + &self, name: &'static str, f: impl FnOnce(Shutdown) -> F, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + let panicked_tasks_tx = self.panicked_tasks_tx.clone(); + let on_shutdown = self.on_shutdown.clone(); + let fut = f(on_shutdown); + + // wrap the task in catch unwind + let task = std::panic::AssertUnwindSafe(fut) + .catch_unwind() + .map_err(move |error| { + let task_error = PanickedTaskError::new(name, error); + let _ = panicked_tasks_tx.send(task_error); + }) + .map(drop); + + self.handle.spawn(task) + } + + /// This spawns a critical task onto the runtime. + /// + /// If this task panics, the [`TaskManager`] is notified. + /// The [`TaskManager`] will wait until the given future has completed + /// before shutting down. + /// + /// # Example + /// + /// ```no_run + /// # async fn t(executor: cfx_tasks::TaskExecutor) { + /// + /// executor.spawn_critical_with_graceful_shutdown_signal( + /// "grace", + /// |shutdown| async move { + /// // await the shutdown signal + /// let guard = shutdown.await; + /// // do work before exiting the program + /// tokio::time::sleep(std::time::Duration::from_secs(1)).await; + /// // allow graceful shutdown + /// drop(guard); + /// }, + /// ); + /// # } + /// ``` + pub fn spawn_critical_with_graceful_shutdown_signal( + &self, name: &'static str, f: impl FnOnce(GracefulShutdown) -> F, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + let panicked_tasks_tx = self.panicked_tasks_tx.clone(); + let on_shutdown = GracefulShutdown::new( + self.on_shutdown.clone(), + GracefulShutdownGuard::new(Arc::clone(&self.graceful_tasks)), + ); + let fut = f(on_shutdown); + + // wrap the task in catch unwind + let task = std::panic::AssertUnwindSafe(fut) + .catch_unwind() + .map_err(move |error| { + let task_error = PanickedTaskError::new(name, error); + let _ = panicked_tasks_tx.send(task_error); + }) + .map(drop); + + self.handle.spawn(task) + } + + /// This spawns a regular task onto the runtime. + /// + /// The [`TaskManager`] will wait until the given future has completed + /// before shutting down. + /// + /// # Example + /// + /// ```no_run + /// # async fn t(executor: cfx_tasks::TaskExecutor) { + /// + /// executor.spawn_with_graceful_shutdown_signal(|shutdown| async move { + /// // await the shutdown signal + /// let guard = shutdown.await; + /// // do work before exiting the program + /// tokio::time::sleep(std::time::Duration::from_secs(1)).await; + /// // allow graceful shutdown + /// drop(guard); + /// }); + /// # } + /// ``` + pub fn spawn_with_graceful_shutdown_signal( + &self, f: impl FnOnce(GracefulShutdown) -> F, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + let on_shutdown = GracefulShutdown::new( + self.on_shutdown.clone(), + GracefulShutdownGuard::new(Arc::clone(&self.graceful_tasks)), + ); + let fut = f(on_shutdown); + + self.handle.spawn(fut) + } +} + +impl TaskSpawner for TaskExecutor { + fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + self.spawn(fut) + } + + fn spawn_critical( + &self, name: &'static str, fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()> { + Self::spawn_critical(self, name, fut) + } + + fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + self.spawn_blocking(fut) + } + + fn spawn_critical_blocking( + &self, name: &'static str, fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()> { + Self::spawn_critical_blocking(self, name, fut) + } +} + +/// `TaskSpawner` with extended behaviour +#[auto_impl::auto_impl(&, Arc)] +pub trait TaskSpawnerExt: + Send + Sync + Unpin + std::fmt::Debug + DynClone +{ + /// This spawns a critical task onto the runtime. + /// + /// If this task panics, the [`TaskManager`] is notified. + /// The [`TaskManager`] will wait until the given future has completed + /// before shutting down. + fn spawn_critical_with_graceful_shutdown_signal( + &self, name: &'static str, f: impl FnOnce(GracefulShutdown) -> F, + ) -> JoinHandle<()> + where F: Future + Send + 'static; + + /// This spawns a regular task onto the runtime. + /// + /// The [`TaskManager`] will wait until the given future has completed + /// before shutting down. + fn spawn_with_graceful_shutdown_signal( + &self, f: impl FnOnce(GracefulShutdown) -> F, + ) -> JoinHandle<()> + where F: Future + Send + 'static; +} + +impl TaskSpawnerExt for TaskExecutor { + fn spawn_critical_with_graceful_shutdown_signal( + &self, name: &'static str, f: impl FnOnce(GracefulShutdown) -> F, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + Self::spawn_critical_with_graceful_shutdown_signal(self, name, f) + } + + fn spawn_with_graceful_shutdown_signal( + &self, f: impl FnOnce(GracefulShutdown) -> F, + ) -> JoinHandle<()> + where F: Future + Send + 'static { + Self::spawn_with_graceful_shutdown_signal(self, f) + } +} + +/// Determines how a task is spawned +enum TaskKind { + /// Spawn the task to the default executor [`Handle::spawn`] + Default, + /// Spawn the task to the blocking executor [`Handle::spawn_blocking`] + Blocking, +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{sync::atomic::AtomicBool, time::Duration}; + + #[test] + fn test_cloneable() { + #[derive(Clone)] + struct ExecutorWrapper { + _e: Box, + } + + let executor: Box = + Box::::default(); + let _e = dyn_clone::clone_box(&*executor); + + let e = ExecutorWrapper { _e }; + let _e2 = e; + } + + #[test] + fn test_critical() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let manager = TaskManager::new(handle); + let executor = manager.executor(); + + executor.spawn_critical("this is a critical task", async { + panic!("intentionally panic") + }); + + runtime.block_on(async move { + let err = manager.await; + assert_eq!(err.task_name, "this is a critical task"); + assert_eq!(err.error, Some("intentionally panic".to_string())); + }) + } + + // Tests that spawned tasks are terminated if the `TaskManager` drops + #[test] + fn test_manager_shutdown_critical() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let manager = TaskManager::new(handle.clone()); + let executor = manager.executor(); + + let (signal, shutdown) = signal(); + + executor.spawn_critical("this is a critical task", async move { + tokio::time::sleep(Duration::from_millis(200)).await; + drop(signal); + }); + + drop(manager); + + handle.block_on(shutdown); + } + + // Tests that spawned tasks are terminated if the `TaskManager` drops + #[test] + fn test_manager_shutdown() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let manager = TaskManager::new(handle.clone()); + let executor = manager.executor(); + + let (signal, shutdown) = signal(); + + executor.spawn(Box::pin(async move { + tokio::time::sleep(Duration::from_millis(200)).await; + drop(signal); + })); + + drop(manager); + + handle.block_on(shutdown); + } + + #[test] + fn test_manager_graceful_shutdown() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let manager = TaskManager::new(handle); + let executor = manager.executor(); + + let val = Arc::new(AtomicBool::new(false)); + let c = val.clone(); + executor.spawn_critical_with_graceful_shutdown_signal( + "grace", + |shutdown| async move { + let _guard = shutdown.await; + tokio::time::sleep(Duration::from_millis(200)).await; + c.store(true, Ordering::Relaxed); + }, + ); + + manager.graceful_shutdown(); + assert!(val.load(Ordering::Relaxed)); + } + + #[test] + fn test_manager_graceful_shutdown_many() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let manager = TaskManager::new(handle); + let executor = manager.executor(); + + let counter = Arc::new(AtomicUsize::new(0)); + let num = 10; + for _ in 0..num { + let c = counter.clone(); + executor.spawn_critical_with_graceful_shutdown_signal( + "grace", + move |shutdown| async move { + let _guard = shutdown.await; + tokio::time::sleep(Duration::from_millis(200)).await; + c.fetch_add(1, Ordering::SeqCst); + }, + ); + } + + manager.graceful_shutdown(); + assert_eq!(counter.load(Ordering::Relaxed), num); + } + + #[test] + fn test_manager_graceful_shutdown_timeout() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let manager = TaskManager::new(handle); + let executor = manager.executor(); + + let timeout = Duration::from_millis(500); + let val = Arc::new(AtomicBool::new(false)); + let val2 = val.clone(); + executor.spawn_critical_with_graceful_shutdown_signal( + "grace", + |shutdown| async move { + let _guard = shutdown.await; + tokio::time::sleep(timeout * 3).await; + val2.store(true, Ordering::Relaxed); + unreachable!("should not be reached"); + }, + ); + + manager.graceful_shutdown_with_timeout(timeout); + assert!(!val.load(Ordering::Relaxed)); + } +} diff --git a/crates/tasks/src/pool.rs b/crates/tasks/src/pool.rs new file mode 100644 index 0000000000..519ecc4bae --- /dev/null +++ b/crates/tasks/src/pool.rs @@ -0,0 +1,203 @@ +// Copyright 2023-2024 Paradigm.xyz +// This file is part of reth. +// Reth is a modular, contributor-friendly and blazing-fast implementation of +// the Ethereum protocol + +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: + +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. + +//! Additional helpers for executing tracing calls + +use std::{ + future::Future, + panic::{catch_unwind, AssertUnwindSafe}, + pin::Pin, + sync::Arc, + task::{ready, Context, Poll}, + thread, +}; +use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore}; + +/// RPC Tracing call guard semaphore. +/// +/// This is used to restrict the number of concurrent RPC requests to tracing +/// methods like `debug_traceTransaction` as well as `eth_getProof` because they +/// can consume a lot of memory and CPU. +/// +/// This types serves as an entry guard for the [`BlockingTaskPool`] and is used +/// to rate limit parallel blocking tasks in the pool. +#[derive(Clone, Debug)] +pub struct BlockingTaskGuard(Arc); + +impl BlockingTaskGuard { + /// Create a new `BlockingTaskGuard` with the given maximum number of + /// blocking tasks in parallel. + pub fn new(max_blocking_tasks: usize) -> Self { + Self(Arc::new(Semaphore::new(max_blocking_tasks))) + } + + /// See also [`Semaphore::acquire_owned`] + pub async fn acquire_owned( + self, + ) -> Result { + self.0.acquire_owned().await + } + + /// See also [`Semaphore::acquire_many_owned`] + pub async fn acquire_many_owned( + self, n: u32, + ) -> Result { + self.0.acquire_many_owned(n).await + } +} + +/// Used to execute blocking tasks on a rayon threadpool from within a tokio +/// runtime. +/// +/// This is a dedicated threadpool for blocking tasks which are CPU bound. +/// RPC calls that perform blocking IO (disk lookups) are not executed on this +/// pool but on the tokio runtime's blocking pool, which performs poorly with +/// CPU bound tasks (see ). Once the tokio blocking +/// pool is saturated it is converted into a queue, blocking tasks could then +/// interfere with the queue and block other RPC calls. +/// +/// See also [tokio-docs] for more information. +/// +/// [tokio-docs]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code +#[derive(Clone, Debug)] +pub struct BlockingTaskPool { + pool: Arc, +} + +impl BlockingTaskPool { + /// Create a new `BlockingTaskPool` with the given threadpool. + pub fn new(pool: rayon::ThreadPool) -> Self { + Self { + pool: Arc::new(pool), + } + } + + /// Convenience function to start building a new threadpool. + pub fn builder() -> rayon::ThreadPoolBuilder { + rayon::ThreadPoolBuilder::new() + } + + /// Convenience function to build a new threadpool with the default + /// configuration. + /// + /// Uses [`rayon::ThreadPoolBuilder::build`](rayon::ThreadPoolBuilder::build) defaults but + /// increases the stack size to 8MB. + pub fn build() -> Result { + Self::builder().build().map(Self::new) + } + + /// Asynchronous wrapper around Rayon's + /// [`ThreadPool::spawn`](rayon::ThreadPool::spawn). + /// + /// Runs a function on the configured threadpool, returning a future that + /// resolves with the function's return value. + /// + /// If the function panics, the future will resolve to an error. + pub fn spawn(&self, func: F) -> BlockingTaskHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + + self.pool.spawn(move || { + let _result = tx.send(catch_unwind(AssertUnwindSafe(func))); + }); + + BlockingTaskHandle { rx } + } + + /// Asynchronous wrapper around Rayon's + /// [`ThreadPool::spawn_fifo`](rayon::ThreadPool::spawn_fifo). + /// + /// Runs a function on the configured threadpool, returning a future that + /// resolves with the function's return value. + /// + /// If the function panics, the future will resolve to an error. + pub fn spawn_fifo(&self, func: F) -> BlockingTaskHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + + self.pool.spawn_fifo(move || { + let _result = tx.send(catch_unwind(AssertUnwindSafe(func))); + }); + + BlockingTaskHandle { rx } + } +} + +/// Async handle for a blocking task running in a Rayon thread pool. +/// +/// ## Panics +/// +/// If polled from outside a tokio runtime. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[pin_project::pin_project] +pub struct BlockingTaskHandle { + #[pin] + pub(crate) rx: oneshot::Receiver>, +} + +impl Future for BlockingTaskHandle { + type Output = thread::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match ready!(self.project().rx.poll(cx)) { + Ok(res) => Poll::Ready(res), + Err(_) => { + Poll::Ready(Err(Box::::default())) + } + } + } +} + +/// An error returned when the Tokio channel is dropped while awaiting a result. +/// +/// This should only happen +#[derive(Debug, Default, thiserror::Error)] +#[error("tokio channel dropped while awaiting result")] +#[non_exhaustive] +pub struct TokioBlockingTaskError; + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn blocking_pool() { + let pool = BlockingTaskPool::build().unwrap(); + let res = pool.spawn(move || 5); + let res = res.await.unwrap(); + assert_eq!(res, 5); + } + + #[tokio::test] + async fn blocking_pool_panic() { + let pool = BlockingTaskPool::build().unwrap(); + let res = pool.spawn(move || -> i32 { + panic!(); + }); + let res = res.await; + assert!(res.is_err()); + } +} diff --git a/crates/tasks/src/shutdown.rs b/crates/tasks/src/shutdown.rs new file mode 100644 index 0000000000..c5464f6559 --- /dev/null +++ b/crates/tasks/src/shutdown.rs @@ -0,0 +1,193 @@ +// Copyright 2023-2024 Paradigm.xyz +// This file is part of reth. +// Reth is a modular, contributor-friendly and blazing-fast implementation of +// the Ethereum protocol + +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: + +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. + +//! Helper for shutdown signals + +use futures_util::{ + future::{FusedFuture, Shared}, + FutureExt, +}; +use std::{ + future::Future, + pin::Pin, + sync::{atomic::AtomicUsize, Arc}, + task::{ready, Context, Poll}, +}; +use tokio::sync::oneshot; + +/// A Future that resolves when the shutdown event has been fired. +#[derive(Debug)] +pub struct GracefulShutdown { + shutdown: Shutdown, + guard: Option, +} + +impl GracefulShutdown { + pub(crate) const fn new( + shutdown: Shutdown, guard: GracefulShutdownGuard, + ) -> Self { + Self { + shutdown, + guard: Some(guard), + } + } + + /// Returns a new shutdown future that is ignores the returned + /// [`GracefulShutdownGuard`]. + /// + /// This just maps the return value of the future to `()`, it does not drop + /// the guard. + pub fn ignore_guard( + self, + ) -> impl Future + Send + Sync + Unpin + 'static { + self.map(drop) + } +} + +impl Future for GracefulShutdown { + type Output = GracefulShutdownGuard; + + fn poll( + mut self: Pin<&mut Self>, cx: &mut Context<'_>, + ) -> Poll { + ready!(self.shutdown.poll_unpin(cx)); + Poll::Ready( + self.get_mut() + .guard + .take() + .expect("Future polled after completion"), + ) + } +} + +impl Clone for GracefulShutdown { + fn clone(&self) -> Self { + Self { + shutdown: self.shutdown.clone(), + guard: self + .guard + .as_ref() + .map(|g| GracefulShutdownGuard::new(Arc::clone(&g.0))), + } + } +} + +/// A guard that fires once dropped to signal the +/// [`TaskManager`](crate::TaskManager) that the [`GracefulShutdown`] has +/// completed. +#[derive(Debug)] +#[must_use = "if unused the task will not be gracefully shutdown"] +pub struct GracefulShutdownGuard(Arc); + +impl GracefulShutdownGuard { + pub(crate) fn new(counter: Arc) -> Self { + counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Self(counter) + } +} + +impl Drop for GracefulShutdownGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } +} + +/// A Future that resolves when the shutdown event has been fired. +#[derive(Debug, Clone)] +pub struct Shutdown(Shared>); + +impl Future for Shutdown { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pin = self.get_mut(); + if pin.0.is_terminated() || pin.0.poll_unpin(cx).is_ready() { + Poll::Ready(()) + } else { + Poll::Pending + } + } +} + +/// Shutdown signal that fires either manually or on drop by closing the channel +#[derive(Debug)] +pub struct Signal(oneshot::Sender<()>); + +impl Signal { + /// Fire the signal manually. + pub fn fire(self) { let _ = self.0.send(()); } +} + +/// Create a channel pair that's used to propagate shutdown event +pub fn signal() -> (Signal, Shutdown) { + let (sender, receiver) = oneshot::channel(); + (Signal(sender), Shutdown(receiver.shared())) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::future::join_all; + use std::time::Duration; + + #[tokio::test(flavor = "multi_thread")] + async fn test_shutdown() { let (_signal, _shutdown) = signal(); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_drop_signal() { + let (signal, shutdown) = signal(); + + tokio::task::spawn(async move { + tokio::time::sleep(Duration::from_millis(500)).await; + drop(signal) + }); + + shutdown.await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_multi_shutdowns() { + let (signal, shutdown) = signal(); + + let mut tasks = Vec::with_capacity(100); + for _ in 0..100 { + let shutdown = shutdown.clone(); + let task = tokio::task::spawn(async move { + shutdown.await; + }); + tasks.push(task); + } + + drop(signal); + + join_all(tasks).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_drop_signal_from_thread() { + let (signal, shutdown) = signal(); + + let _thread = std::thread::spawn(|| { + std::thread::sleep(Duration::from_millis(500)); + drop(signal) + }); + + shutdown.await; + } +}