diff --git a/Cargo.lock b/Cargo.lock index f622d477bb..1dbd41b122 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2940,6 +2940,7 @@ dependencies = [ "kaspa-muhash", "kaspa-notify", "kaspa-p2p-lib", + "kaspa-p2p-mining", "kaspa-utils", "kaspa-utils-tower", "log", @@ -2982,6 +2983,23 @@ dependencies = [ "uuid 1.10.0", ] +[[package]] +name = "kaspa-p2p-mining" +version = "0.16.1" +dependencies = [ + "kaspa-consensus-core", + "kaspa-consensusmanager", + "kaspa-core", + "kaspa-hashes", + "kaspa-math", + "kaspa-mining-errors", + "kaspa-p2p-lib", + "kaspa-utils", + "kaspa-utils-tower", + "log", + "tokio", +] + [[package]] name = "kaspa-perf-monitor" version = "0.16.1" @@ -3081,6 +3099,7 @@ dependencies = [ "kaspa-notify", "kaspa-p2p-flows", "kaspa-p2p-lib", + "kaspa-p2p-mining", "kaspa-perf-monitor", "kaspa-rpc-core", "kaspa-txscript", @@ -3649,6 +3668,8 @@ dependencies = [ "kaspa-mining", "kaspa-notify", "kaspa-p2p-flows", + "kaspa-p2p-lib", + "kaspa-p2p-mining", "kaspa-perf-monitor", "kaspa-rpc-core", "kaspa-rpc-service", diff --git a/Cargo.toml b/Cargo.toml index c1955f268a..6e89e3f0f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ kaspa-muhash = { version = "0.16.1", path = "crypto/muhash" } kaspa-notify = { version = "0.16.1", path = "notify" } kaspa-p2p-flows = { version = "0.16.1", path = "protocol/flows" } kaspa-p2p-lib = { version = "0.16.1", path = "protocol/p2p" } +kaspa-p2p-mining = { version = "0.16.1", path = "protocol/mining" } kaspa-perf-monitor = { version = "0.16.1", path = "metrics/perf_monitor" } kaspa-pow = { version = "0.16.1", path = "consensus/pow" } kaspa-rpc-core = { version = "0.16.1", path = "rpc/core" } diff --git a/components/consensusmanager/src/session.rs b/components/consensusmanager/src/session.rs index c67caf07d4..eeb37859b4 100644 --- a/components/consensusmanager/src/session.rs +++ b/components/consensusmanager/src/session.rs @@ -249,6 +249,10 @@ impl ConsensusSessionOwned { self.clone().spawn_blocking(|c| c.get_sink_timestamp()).await } + pub async fn async_get_sink_daa_score_timestamp(&self) -> DaaScoreTimestamp { + self.clone().spawn_blocking(|c| c.get_sink_daa_score_timestamp()).await + } + pub async fn async_get_current_block_color(&self, hash: Hash) -> Option { self.clone().spawn_blocking(move |c| c.get_current_block_color(hash)).await } @@ -262,13 +266,6 @@ impl ConsensusSessionOwned { self.clone().spawn_blocking(|c| c.estimate_block_count()).await } - /// Returns whether this consensus is considered synced or close to being synced. - /// - /// This info is used to determine if it's ok to use a block template from this node for mining purposes. - pub async fn async_is_nearly_synced(&self) -> bool { - self.clone().spawn_blocking(|c| c.is_nearly_synced()).await - } - pub async fn async_get_virtual_chain_from_block( &self, low: Hash, diff --git a/consensus/core/src/api/mod.rs b/consensus/core/src/api/mod.rs index f8df0c0e14..caa0d72db5 100644 --- a/consensus/core/src/api/mod.rs +++ b/consensus/core/src/api/mod.rs @@ -134,6 +134,10 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } + fn get_sink_daa_score_timestamp(&self) -> DaaScoreTimestamp { + unimplemented!() + } + fn get_current_block_color(&self, hash: Hash) -> Option { unimplemented!() } @@ -151,13 +155,6 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } - /// Returns whether this consensus is considered synced or close to being synced. - /// - /// This info is used to determine if it's ok to use a block template from this node for mining purposes. - fn is_nearly_synced(&self) -> bool { - unimplemented!() - } - /// Gets the virtual chain paths from `low` to the `sink` hash, or until `chain_path_added_limit` is reached /// /// Note: diff --git a/consensus/core/src/config/params.rs b/consensus/core/src/config/params.rs index b0ab02e98e..931409bbdc 100644 --- a/consensus/core/src/config/params.rs +++ b/consensus/core/src/config/params.rs @@ -10,10 +10,7 @@ use crate::{ }; use kaspa_addresses::Prefix; use kaspa_math::Uint256; -use std::{ - cmp::min, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::cmp::min; #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub struct ForkActivation(u64); @@ -136,10 +133,6 @@ pub struct Params { pub runtime_sig_op_counting: ForkActivation, } -fn unix_now() -> u64 { - SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 -} - impl Params { /// Returns the size of the full blocks window that is inspected to calculate the past median time (legacy) #[inline] @@ -238,7 +231,7 @@ impl Params { } } - fn expected_daa_window_duration_in_milliseconds(&self, selected_parent_daa_score: u64) -> u64 { + pub fn expected_daa_window_duration_in_milliseconds(&self, selected_parent_daa_score: u64) -> u64 { if self.sampling_activation.is_active(selected_parent_daa_score) { self.target_time_per_block * self.difficulty_sample_rate * self.sampled_difficulty_window_size as u64 } else { @@ -264,26 +257,6 @@ impl Params { min(self.pruning_depth, anticone_finalization_depth) } - /// Returns whether the sink timestamp is recent enough and the node is considered synced or nearly synced. - pub fn is_nearly_synced(&self, sink_timestamp: u64, sink_daa_score: u64) -> bool { - if self.net.is_mainnet() { - // We consider the node close to being synced if the sink (virtual selected parent) block - // timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would - // enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty - unix_now() < sink_timestamp + self.expected_daa_window_duration_in_milliseconds(sink_daa_score) - } else { - // For testnets we consider the node to be synced if the sink timestamp is within a time range which - // is overwhelmingly unlikely to pass without mined blocks even if net hashrate decreased dramatically. - // - // This period is smaller than the above mainnet calculation in order to ensure that an IBDing miner - // with significant testnet hashrate does not overwhelm the network with deep side-DAGs. - // - // We use DAA duration as baseline and scale it down with BPS (and divide by 3 for mining only when very close to current time on TN11) - let max_expected_duration_without_blocks_in_milliseconds = self.target_time_per_block * NEW_DIFFICULTY_WINDOW_DURATION / 3; // = DAA duration in milliseconds / bps / 3 - unix_now() < sink_timestamp + max_expected_duration_without_blocks_in_milliseconds - } - } - pub fn network_name(&self) -> String { self.net.to_prefixed() } diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 21e5bf5573..d5edf74b63 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -502,6 +502,12 @@ impl ConsensusApi for Consensus { self.headers_store.get_timestamp(self.get_sink()).unwrap() } + fn get_sink_daa_score_timestamp(&self) -> DaaScoreTimestamp { + let sink = self.get_sink(); + let compact = self.headers_store.get_compact_header_data(sink).unwrap(); + DaaScoreTimestamp { daa_score: compact.daa_score, timestamp: compact.timestamp } + } + fn get_current_block_color(&self, hash: Hash) -> Option { let _guard = self.pruning_lock.blocking_read(); @@ -592,13 +598,6 @@ impl ConsensusApi for Consensus { BlockCount { header_count, block_count } } - fn is_nearly_synced(&self) -> bool { - // See comment within `config.is_nearly_synced` - let sink = self.get_sink(); - let compact = self.headers_store.get_compact_header_data(sink).unwrap(); - self.config.is_nearly_synced(compact.timestamp, compact.daa_score) - } - fn get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: Option) -> ConsensusResult { // Calculate chain changes between the given `low` and the current sink hash (up to `limit` amount of block hashes). // Note: diff --git a/kaspad/Cargo.toml b/kaspad/Cargo.toml index 3507339f29..d79ee36a63 100644 --- a/kaspad/Cargo.toml +++ b/kaspad/Cargo.toml @@ -31,6 +31,8 @@ kaspa-index-processor.workspace = true kaspa-mining.workspace = true kaspa-notify.workspace = true kaspa-p2p-flows.workspace = true +kaspa-p2p-lib.workspace = true +kaspa-p2p-mining.workspace = true kaspa-perf-monitor.workspace = true kaspa-rpc-core.workspace = true kaspa-rpc-service.workspace = true diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index db9f32c165..2e8ced2218 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -14,6 +14,8 @@ use kaspa_database::{ }; use kaspa_grpc_server::service::GrpcService; use kaspa_notify::{address::tracker::Tracker, subscription::context::SubscriptionContext}; +use kaspa_p2p_lib::Hub; +use kaspa_p2p_mining::rule_engine::MiningRuleEngine; use kaspa_rpc_service::service::RpcCoreService; use kaspa_txscript::caches::TxScriptCacheCounters; use kaspa_utils::git; @@ -537,6 +539,14 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm let mining_monitor = Arc::new(MiningMonitor::new(mining_manager.clone(), mining_counters, tx_script_cache_counters.clone(), tick_service.clone())); + let hub = Hub::new(); + let mining_rule_engine = Arc::new(MiningRuleEngine::new( + consensus_manager.clone(), + config.clone(), + processing_counters.clone(), + tick_service.clone(), + hub.clone(), + )); let flow_context = Arc::new(FlowContext::new( consensus_manager.clone(), address_manager, @@ -544,6 +554,8 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm mining_manager.clone(), tick_service.clone(), notification_root, + hub.clone(), + mining_rule_engine.clone(), )); let p2p_service = Arc::new(P2pService::new( flow_context.clone(), @@ -574,6 +586,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm p2p_tower_counters.clone(), grpc_tower_counters.clone(), system_info, + mining_rule_engine.clone(), )); let grpc_service_broadcasters: usize = 3; // TODO: add a command line argument or derive from other arg/config/host-related fields let grpc_service = if !args.disable_grpc { @@ -607,6 +620,8 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm async_runtime.register(consensus_monitor); async_runtime.register(mining_monitor); async_runtime.register(perf_monitor); + async_runtime.register(mining_rule_engine); + let wrpc_service_tasks: usize = 2; // num_cpus::get() / 2; // Register wRPC servers based on command line arguments [ diff --git a/protocol/flows/Cargo.toml b/protocol/flows/Cargo.toml index ff282d4e79..12bab051fe 100644 --- a/protocol/flows/Cargo.toml +++ b/protocol/flows/Cargo.toml @@ -14,6 +14,7 @@ kaspa-core.workspace = true kaspa-consensus-core.workspace = true kaspa-consensus-notify.workspace = true kaspa-p2p-lib.workspace = true +kaspa-p2p-mining.workspace = true kaspa-utils.workspace = true kaspa-utils-tower.workspace = true kaspa-hashes.workspace = true diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index 14d4168aca..3f4d6e7d10 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -17,7 +17,7 @@ use kaspa_consensus_notify::{ notification::{Notification, PruningPointUtxoSetOverrideNotification}, root::ConsensusNotificationRoot, }; -use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusInstance, ConsensusManager, ConsensusProxy}; +use kaspa_consensusmanager::{BlockProcessingBatch, ConsensusInstance, ConsensusManager, ConsensusProxy, ConsensusSessionOwned}; use kaspa_core::{ debug, info, kaspad_env::{name, version}, @@ -35,6 +35,7 @@ use kaspa_p2p_lib::{ pb::{kaspad_message::Payload, InvRelayBlockMessage}, ConnectionInitializer, Hub, KaspadHandshake, PeerKey, PeerProperties, Router, }; +use kaspa_p2p_mining::rule_engine::MiningRuleEngine; use kaspa_utils::iter::IterExtensions; use kaspa_utils::networking::PeerId; use parking_lot::{Mutex, RwLock}; @@ -231,6 +232,9 @@ pub struct FlowContextInner { // Orphan parameters orphan_resolution_range: u32, max_orphans: usize, + + // Mining rule engine + mining_rule_engine: Arc, } #[derive(Clone)] @@ -303,9 +307,9 @@ impl FlowContext { mining_manager: MiningManagerProxy, tick_service: Arc, notification_root: Arc, + hub: Hub, + mining_rule_engine: Arc, ) -> Self { - let hub = Hub::new(); - let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().ceil() as u32; // The maximum amount of orphans allowed in the orphans pool. This number is an approximation @@ -331,6 +335,7 @@ impl FlowContext { orphan_resolution_range, max_orphans, config, + mining_rule_engine, }), } } @@ -556,7 +561,7 @@ impl FlowContext { } // Transaction relay is disabled if the node is out of sync and thus not mining - if !consensus.async_is_nearly_synced().await { + if !self.is_nearly_synced(consensus).await { return; } @@ -595,6 +600,11 @@ impl FlowContext { } } + pub async fn is_nearly_synced(&self, session: &ConsensusSessionOwned) -> bool { + let sink_daa_score_and_timestamp = session.async_get_sink_daa_score_timestamp().await; + self.mining_rule_engine.is_nearly_synced(sink_daa_score_and_timestamp) + } + /// Notifies that the UTXO set was reset due to pruning point change via IBD. pub fn on_pruning_point_utxoset_override(&self) { // Notifications from the flow context might be ignored if the inner channel is already closing diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index 49353c2865..d31d464fd2 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -116,7 +116,7 @@ impl HandleRelayInvsFlow { } } - if self.ctx.is_ibd_running() && !session.async_is_nearly_synced().await { + if self.ctx.is_ibd_running() && !self.ctx.is_nearly_synced(&session).await { // Note: If the node is considered nearly synced we continue processing relay blocks even though an IBD is in progress. // For instance this means that downloading a side-chain from a delayed node does not interop the normal flow of live blocks. debug!("Got relay block {} while in IBD and the node is out of sync, continuing...", inv.hash); diff --git a/protocol/flows/src/v5/txrelay/flow.rs b/protocol/flows/src/v5/txrelay/flow.rs index af7e2b6c7d..25e6dd6172 100644 --- a/protocol/flows/src/v5/txrelay/flow.rs +++ b/protocol/flows/src/v5/txrelay/flow.rs @@ -116,7 +116,7 @@ impl RelayTransactionsFlow { let session = self.ctx.consensus().unguarded_session(); // Transaction relay is disabled if the node is out of sync and thus not mining - if !session.async_is_nearly_synced().await { + if !self.ctx.is_nearly_synced(&session).await { continue; } diff --git a/protocol/mining/Cargo.toml b/protocol/mining/Cargo.toml new file mode 100644 index 0000000000..03f1266324 --- /dev/null +++ b/protocol/mining/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "kaspa-p2p-mining" +description = "Kaspa p2p mining" +rust-version.workspace = true +version.workspace = true +edition.workspace = true +authors.workspace = true +include.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +kaspa-core.workspace = true +kaspa-consensus-core.workspace = true +kaspa-consensusmanager.workspace = true +kaspa-mining-errors.workspace = true +kaspa-hashes.workspace = true +kaspa-math.workspace = true +kaspa-p2p-lib.workspace = true +kaspa-utils.workspace = true +kaspa-utils-tower.workspace = true +log.workspace = true +tokio.workspace = true diff --git a/protocol/mining/src/lib.rs b/protocol/mining/src/lib.rs new file mode 100644 index 0000000000..4603c3a8b7 --- /dev/null +++ b/protocol/mining/src/lib.rs @@ -0,0 +1 @@ +pub mod rule_engine; diff --git a/protocol/mining/src/rule_engine.rs b/protocol/mining/src/rule_engine.rs new file mode 100644 index 0000000000..de62281055 --- /dev/null +++ b/protocol/mining/src/rule_engine.rs @@ -0,0 +1,192 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use kaspa_consensus_core::{ + api::counters::ProcessingCounters, + config::{params::NEW_DIFFICULTY_WINDOW_DURATION, Config}, + daa_score_timestamp::DaaScoreTimestamp, + network::NetworkType::{Mainnet, Testnet}, +}; +use kaspa_consensusmanager::ConsensusManager; +use kaspa_core::{ + task::{ + service::{AsyncService, AsyncServiceFuture}, + tick::{TickReason, TickService}, + }, + time::unix_now, + trace, warn, +}; +use kaspa_p2p_lib::Hub; + +const RULE_ENGINE: &str = "mining-rule-engine"; +const SYNC_RATE_THRESHOLD: f64 = 0.10; + +#[derive(Clone)] +pub struct MiningRuleEngine { + config: Arc, + processing_counters: Arc, + tick_service: Arc, + // Sync Rate Rule: Allow mining if sync rate is below threshold AND finality point is "recent" (defined below) + use_sync_rate_rule: Arc, + consensus_manager: Arc, + hub: Hub, +} + +impl MiningRuleEngine { + pub async fn worker(self: &Arc) { + println!(module_path!()); + let snapshot_interval = 10; + let mut last_snapshot = self.processing_counters.snapshot(); + let mut last_log_time = Instant::now(); + loop { + if let TickReason::Shutdown = self.tick_service.tick(Duration::from_secs(snapshot_interval)).await { + // Let the system print final logs before exiting + tokio::time::sleep(Duration::from_millis(500)).await; + break; + } + + let now = Instant::now(); + let elapsed_time = now - last_log_time; + if elapsed_time.as_secs() == 0 { + continue; + } + + let snapshot = self.processing_counters.snapshot(); + + // Subtract the snapshots + let delta = &snapshot - &last_snapshot; + + if elapsed_time.as_secs() > 0 { + let expected_blocks = (elapsed_time.as_millis() as u64) / self.config.target_time_per_block; + let received_blocks = delta.body_counts.max(delta.header_counts); + let rate: f64 = (received_blocks as f64) / (expected_blocks as f64); + + let session = self.consensus_manager.consensus().unguarded_session(); + + let finality_point = session.async_finality_point().await; + let finality_point_timestamp = session.async_get_header(finality_point).await.unwrap().timestamp; + // Finality point is considered "recent" if it is within 3 finality durations from the current time + let is_finality_recent = finality_point_timestamp >= unix_now().saturating_sub(self.config.finality_duration() * 3); + + trace!( + "Sync rate: {:.2} | Finality point recent: {} | Elapsed time: {}s | Found/Expected blocks: {}/{}", + rate, + is_finality_recent, + elapsed_time.as_secs(), + delta.body_counts, + expected_blocks, + ); + + if is_finality_recent && rate < SYNC_RATE_THRESHOLD { + // if sync rate rule conditions are met: + if let Ok(false) = self.use_sync_rate_rule.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) { + warn!("Sync rate {:.2} is below threshold: {}", rate, SYNC_RATE_THRESHOLD); + } + } else { + // else when sync rate conditions are not met: + if let Ok(true) = self.use_sync_rate_rule.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) { + if !is_finality_recent { + warn!("Sync rate {:.2} recovered: {} by entering IBD", rate, SYNC_RATE_THRESHOLD); + } else { + warn!("Sync rate {:.2} recovered: {}", rate, SYNC_RATE_THRESHOLD); + } + } else if !is_finality_recent { + trace!("Finality period is old. Timestamp: {}. Sync rate: {:.2}", finality_point_timestamp, rate); + } + } + } + + last_snapshot = snapshot; + last_log_time = now; + } + } + + pub fn new( + consensus_manager: Arc, + config: Arc, + processing_counters: Arc, + tick_service: Arc, + hub: Hub, + ) -> Self { + Self { + consensus_manager, + config, + processing_counters, + tick_service, + hub, + use_sync_rate_rule: Arc::new(AtomicBool::new(false)), + } + } + + pub fn should_mine(&self, sink_daa_score_timestamp: DaaScoreTimestamp) -> bool { + if !self.has_sufficient_peer_connectivity() { + return false; + } + + let is_nearly_synced = self.is_nearly_synced(sink_daa_score_timestamp); + + is_nearly_synced || self.use_sync_rate_rule.load(std::sync::atomic::Ordering::Relaxed) + } + + /// Returns whether the sink timestamp is recent enough and the node is considered synced or nearly synced. + /// + /// This info is used to determine if it's ok to use a block template from this node for mining purposes. + pub fn is_nearly_synced(&self, sink_daa_score_timestamp: DaaScoreTimestamp) -> bool { + let sink_timestamp = sink_daa_score_timestamp.timestamp; + + if self.config.net.is_mainnet() { + // We consider the node close to being synced if the sink (virtual selected parent) block + // timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would + // enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty + let daa_window_duration = self.config.expected_daa_window_duration_in_milliseconds(sink_daa_score_timestamp.daa_score); + + unix_now() < sink_timestamp + daa_window_duration + } else { + // For testnets we consider the node to be synced if the sink timestamp is within a time range which + // is overwhelmingly unlikely to pass without mined blocks even if net hashrate decreased dramatically. + // + // This period is smaller than the above mainnet calculation in order to ensure that an IBDing miner + // with significant testnet hashrate does not overwhelm the network with deep side-DAGs. + // + // We use DAA duration as baseline and scale it down with BPS (and divide by 3 for mining only when very close to current time on TN11) + let max_expected_duration_without_blocks_in_milliseconds = + self.config.target_time_per_block * NEW_DIFFICULTY_WINDOW_DURATION / 3; // = DAA duration in milliseconds / bps / 3 + + unix_now() < sink_timestamp + max_expected_duration_without_blocks_in_milliseconds + } + } + + fn has_sufficient_peer_connectivity(&self) -> bool { + // Other network types can be used in an isolated environment without peers + !matches!(self.config.net.network_type, Mainnet | Testnet) || self.hub.has_peers() + } +} + +impl AsyncService for MiningRuleEngine { + fn ident(self: Arc) -> &'static str { + RULE_ENGINE + } + + fn start(self: Arc) -> AsyncServiceFuture { + Box::pin(async move { + self.worker().await; + Ok(()) + }) + } + + fn signal_exit(self: Arc) { + trace!("sending an exit signal to {}", RULE_ENGINE); + } + + fn stop(self: Arc) -> AsyncServiceFuture { + Box::pin(async move { + trace!("{} stopped", RULE_ENGINE); + Ok(()) + }) + } +} diff --git a/rpc/service/Cargo.toml b/rpc/service/Cargo.toml index 54e9764088..9da3b10702 100644 --- a/rpc/service/Cargo.toml +++ b/rpc/service/Cargo.toml @@ -22,6 +22,7 @@ kaspa-mining.workspace = true kaspa-notify.workspace = true kaspa-p2p-flows.workspace = true kaspa-p2p-lib.workspace = true +kaspa-p2p-mining.workspace = true kaspa-perf-monitor.workspace = true kaspa-rpc-core.workspace = true kaspa-txscript.workspace = true diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index c8c40c7707..eec507b2bc 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -3,9 +3,9 @@ use super::collector::{CollectorFromConsensus, CollectorFromIndex}; use crate::converter::feerate_estimate::{FeeEstimateConverter, FeeEstimateVerboseConverter}; use crate::converter::{consensus::ConsensusConverter, index::IndexConverter, protocol::ProtocolConverter}; -use crate::service::NetworkType::{Mainnet, Testnet}; use async_trait::async_trait; use kaspa_consensus_core::api::counters::ProcessingCounters; +use kaspa_consensus_core::daa_score_timestamp::DaaScoreTimestamp; use kaspa_consensus_core::errors::block::RuleError; use kaspa_consensus_core::utxo::utxo_inquirer::UtxoInquirerError; use kaspa_consensus_core::{ @@ -53,6 +53,7 @@ use kaspa_notify::{ }; use kaspa_p2p_flows::flow_context::FlowContext; use kaspa_p2p_lib::common::ProtocolError; +use kaspa_p2p_mining::rule_engine::MiningRuleEngine; use kaspa_perf_monitor::{counters::CountersSnapshot, Monitor as PerfMonitor}; use kaspa_rpc_core::{ api::{ @@ -119,6 +120,7 @@ pub struct RpcCoreService { system_info: SystemInfo, fee_estimate_cache: ExpiringCache, fee_estimate_verbose_cache: ExpiringCache>, + mining_rule_engine: Arc, } const RPC_CORE: &str = "rpc-core"; @@ -144,6 +146,7 @@ impl RpcCoreService { p2p_tower_counters: Arc, grpc_tower_counters: Arc, system_info: SystemInfo, + mining_rule_engine: Arc, ) -> Self { // This notifier UTXOs subscription granularity to index-processor or consensus notifier let policies = match index_notifier { @@ -222,6 +225,7 @@ impl RpcCoreService { system_info, fee_estimate_cache: ExpiringCache::new(Duration::from_millis(500), Duration::from_millis(1000)), fee_estimate_verbose_cache: ExpiringCache::new(Duration::from_millis(500), Duration::from_millis(1000)), + mining_rule_engine, } } @@ -270,11 +274,6 @@ impl RpcCoreService { .unwrap_or_default() } - fn has_sufficient_peer_connectivity(&self) -> bool { - // Other network types can be used in an isolated environment without peers - !matches!(self.flow_context.config.net.network_type, Mainnet | Testnet) || self.flow_context.hub().has_peers() - } - fn extract_tx_query(&self, filter_transaction_pool: bool, include_orphan_pool: bool) -> RpcResult { match (filter_transaction_pool, include_orphan_pool) { (true, true) => Ok(TransactionQuery::OrphansOnly), @@ -295,9 +294,10 @@ impl RpcApi for RpcCoreService { request: SubmitBlockRequest, ) -> RpcResult { let session = self.consensus_manager.consensus().unguarded_session(); + let sink_daa_score_timestamp = session.async_get_sink_daa_score_timestamp().await; // TODO: consider adding an error field to SubmitBlockReport to document both the report and error fields - let is_synced: bool = self.has_sufficient_peer_connectivity() && session.async_is_nearly_synced().await; + let is_synced: bool = self.mining_rule_engine.should_mine(sink_daa_score_timestamp); if !self.config.enable_unsynced_mining && !is_synced { // error = "Block not submitted - node is not synced" @@ -379,11 +379,12 @@ NOTE: This error usually indicates an RPC conversion error between the node and return Err(RpcError::CoinbasePayloadLengthAboveMax(self.config.max_coinbase_payload_len)); } - let is_nearly_synced = - self.config.is_nearly_synced(block_template.selected_parent_timestamp, block_template.selected_parent_daa_score); Ok(GetBlockTemplateResponse { block: block_template.block.into(), - is_synced: self.has_sufficient_peer_connectivity() && is_nearly_synced, + is_synced: self.mining_rule_engine.should_mine(DaaScoreTimestamp { + timestamp: block_template.selected_parent_timestamp, + daa_score: block_template.selected_parent_daa_score, + }), }) } @@ -466,13 +467,14 @@ NOTE: This error usually indicates an RPC conversion error between the node and } async fn get_info_call(&self, _connection: Option<&DynRpcConnection>, _request: GetInfoRequest) -> RpcResult { - let is_nearly_synced = self.consensus_manager.consensus().unguarded_session().async_is_nearly_synced().await; + let sink_daa_score_timestamp = + self.consensus_manager.consensus().unguarded_session().async_get_sink_daa_score_timestamp().await; Ok(GetInfoResponse { p2p_id: self.flow_context.node_id.to_string(), mempool_size: self.mining_manager.transaction_count_sample(TransactionQuery::TransactionsOnly), server_version: version().to_string(), is_utxo_indexed: self.config.utxoindex, - is_synced: self.has_sufficient_peer_connectivity() && is_nearly_synced, + is_synced: self.mining_rule_engine.should_mine(sink_daa_score_timestamp), has_notify_command: true, has_message_id: true, }) @@ -1120,7 +1122,8 @@ NOTE: This error usually indicates an RPC conversion error between the node and _request: GetServerInfoRequest, ) -> RpcResult { let session = self.consensus_manager.consensus().unguarded_session(); - let is_synced: bool = self.has_sufficient_peer_connectivity() && session.async_is_nearly_synced().await; + let sink_daa_score_timestamp = session.async_get_sink_daa_score_timestamp().await; + let is_synced: bool = self.mining_rule_engine.should_mine(sink_daa_score_timestamp); let virtual_daa_score = session.get_virtual_daa_score(); Ok(GetServerInfoResponse { @@ -1139,8 +1142,9 @@ NOTE: This error usually indicates an RPC conversion error between the node and _connection: Option<&DynRpcConnection>, _request: GetSyncStatusRequest, ) -> RpcResult { - let session = self.consensus_manager.consensus().unguarded_session(); - let is_synced: bool = self.has_sufficient_peer_connectivity() && session.async_is_nearly_synced().await; + let sink_daa_score_timestamp = + self.consensus_manager.consensus().unguarded_session().async_get_sink_daa_score_timestamp().await; + let is_synced: bool = self.mining_rule_engine.should_mine(sink_daa_score_timestamp); Ok(GetSyncStatusResponse { is_synced }) }