diff --git a/integration-tests/tests/pool_integration.rs b/integration-tests/tests/pool_integration.rs index 9fe7cbde..474dae52 100644 --- a/integration-tests/tests/pool_integration.rs +++ b/integration-tests/tests/pool_integration.rs @@ -1,11 +1,15 @@ // This file contains integration tests for the `PoolSv2` module. // // `PoolSv2` is a module that implements the Pool role in the Stratum V2 protocol. -use integration_tests_sv2::{interceptor::MessageDirection, template_provider::DifficultyLevel, *}; +use integration_tests_sv2::{ + interceptor::{MessageDirection, ReplaceMessage}, + template_provider::DifficultyLevel, + *, +}; use stratum_apps::stratum_core::{ common_messages_sv2::{has_work_selection, Protocol, SetupConnection, *}, mining_sv2::*, - parsers_sv2::{AnyMessage, CommonMessages, Mining, TemplateDistribution}, + parsers_sv2::{self, AnyMessage, CommonMessages, Mining, TemplateDistribution}, template_distribution_sv2::*, }; @@ -339,3 +343,68 @@ async fn pool_does_not_send_jobs_to_jdc() { "Pool should NOT send non-future NewExtendedMiningJob messages to JDC" ); } + +// The test runs pool and translator, with translator sending a SetupConnection message +// with a wrong protocol, this test asserts whether pool sends SetupConnection error or +// not to such downstream. +#[tokio::test] +async fn pool_reject_setup_connection_with_non_mining_protocol() { + start_tracing(); + let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low); + let (_pool, pool_addr) = start_pool(sv2_tp_config(tp_addr), vec![], vec![]).await; + let endpoint_host = "127.0.0.1".to_string().into_bytes().try_into().unwrap(); + let vendor = String::new().try_into().unwrap(); + let hardware_version = String::new().try_into().unwrap(); + let firmware = String::new().try_into().unwrap(); + let device_id = String::new().try_into().unwrap(); + + let setup_connection_replace = ReplaceMessage::new( + MessageDirection::ToUpstream, + MESSAGE_TYPE_SETUP_CONNECTION, + AnyMessage::Common(parsers_sv2::CommonMessages::SetupConnection( + SetupConnection { + protocol: Protocol::TemplateDistributionProtocol, + min_version: 2, + max_version: 2, + flags: 0b0000_0000_0000_0000_0000_0000_0000_0000, + endpoint_host, + endpoint_port: 1212, + vendor, + hardware_version, + firmware, + device_id, + }, + )), + ); + let (pool_translator_sniffer, pool_translator_sniffer_addr) = start_sniffer( + "0", + pool_addr, + false, + vec![setup_connection_replace.into()], + None, + ); + let (_tproxy, _) = + start_sv2_translator(&[pool_translator_sniffer_addr], false, vec![], vec![], None).await; + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + pool_translator_sniffer + .wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION) + .await; + pool_translator_sniffer + .wait_for_message_type( + MessageDirection::ToDownstream, + MESSAGE_TYPE_SETUP_CONNECTION_ERROR, + ) + .await; + let setup_connection_error = pool_translator_sniffer.next_message_from_upstream(); + let setup_connection_error = match setup_connection_error { + Some((_, AnyMessage::Common(CommonMessages::SetupConnectionError(msg)))) => msg, + msg => panic!("Expected SetupConnectionError message, found: {:?}", msg), + }; + assert_eq!( + setup_connection_error.error_code.as_utf8_or_hex(), + "unsupported-protocol", + "SetupConnectionError message error code should be unsupported-protocol" + ); +} diff --git a/miner-apps/jd-client/src/lib/downstream/mod.rs b/miner-apps/jd-client/src/lib/downstream/mod.rs index c5e4f99e..b08461bd 100644 --- a/miner-apps/jd-client/src/lib/downstream/mod.rs +++ b/miner-apps/jd-client/src/lib/downstream/mod.rs @@ -165,6 +165,11 @@ impl Downstream { // Setup initial connection if let Err(e) = self.setup_connection_with_downstream().await { error!(?e, "Failed to set up downstream connection"); + + // sleep to make sure SetupConnectionError is sent + // before we break the TCP connection + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + handle_error(&status_sender, e).await; return; } diff --git a/miner-apps/jd-client/src/lib/upstream/mod.rs b/miner-apps/jd-client/src/lib/upstream/mod.rs index 1116745c..11576fa9 100644 --- a/miner-apps/jd-client/src/lib/upstream/mod.rs +++ b/miner-apps/jd-client/src/lib/upstream/mod.rs @@ -69,6 +69,8 @@ pub struct Upstream { upstream_channel: UpstreamChannel, /// Protocol extensions that the JDC requires required_extensions: Vec, + /// Upstream address + address: SocketAddr, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -129,6 +131,7 @@ impl Upstream { upstream_data, upstream_channel, required_extensions, + address: *addr, }) } @@ -142,7 +145,8 @@ impl Upstream { ) -> JDCResult<(), error::Upstream> { info!("Upstream: initiating SV2 handshake..."); let setup_connection = - get_setup_connection_message(min_version, max_version).map_err(JDCError::shutdown)?; + get_setup_connection_message(min_version, max_version, &self.address) + .map_err(JDCError::shutdown)?; debug!(?setup_connection, "Prepared `SetupConnection` message"); let sv2_frame: Sv2Frame = Message::Common(setup_connection.into()) .try_into() diff --git a/miner-apps/jd-client/src/lib/utils.rs b/miner-apps/jd-client/src/lib/utils.rs index 58b61e22..4d4adc4c 100644 --- a/miner-apps/jd-client/src/lib/utils.rs +++ b/miner-apps/jd-client/src/lib/utils.rs @@ -54,8 +54,9 @@ pub enum ShutdownMessage { pub fn get_setup_connection_message( min_version: u16, max_version: u16, + address: &SocketAddr, ) -> Result, JDCErrorKind> { - let endpoint_host = "0.0.0.0".to_string().into_bytes().try_into()?; + let endpoint_host = address.ip().to_string().into_bytes().try_into()?; let vendor = String::new().try_into()?; let hardware_version = String::new().try_into()?; let firmware = String::new().try_into()?; @@ -67,7 +68,7 @@ pub fn get_setup_connection_message( max_version, flags, endpoint_host, - endpoint_port: 50, + endpoint_port: address.port(), vendor, hardware_version, firmware, diff --git a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs index 5676f23d..d0877562 100644 --- a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs +++ b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs @@ -6,7 +6,7 @@ use crate::{ utils::{ShutdownMessage, UpstreamEntry}, }; use async_channel::{unbounded, Receiver, Sender}; -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; use stratum_apps::{ network_helpers::noise_stream::NoiseTcpStream, stratum_core::{ @@ -48,6 +48,7 @@ pub struct Upstream { pub upstream_channel_state: UpstreamChannelState, /// Extensions that the translator requires (must be supported by server) pub required_extensions: Vec, + address: SocketAddr, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -130,6 +131,7 @@ impl Upstream { return Ok(Self { upstream_channel_state, required_extensions: required_extensions.clone(), + address: upstream.addr, }); } Err(e) => { @@ -228,8 +230,8 @@ impl Upstream { pub async fn setup_connection(&mut self) -> TproxyResult<(), error::Upstream> { debug!("Upstream: initiating SV2 handshake..."); // Build SetupConnection message - let setup_conn_msg = - Self::get_setup_connection_message(2, 2, false).map_err(TproxyError::shutdown)?; + let setup_conn_msg = Self::get_setup_connection_message(2, 2, &self.address, false) + .map_err(TproxyError::shutdown)?; let sv2_frame: Sv2Frame = Message::Common(setup_conn_msg.into()) .try_into() @@ -482,9 +484,10 @@ impl Upstream { fn get_setup_connection_message( min_version: u16, max_version: u16, + address: &SocketAddr, is_work_selection_enabled: bool, ) -> Result, TproxyErrorKind> { - let endpoint_host = "0.0.0.0".to_string().into_bytes().try_into()?; + let endpoint_host = address.ip().to_string().into_bytes().try_into()?; let vendor = "SRI".to_string().try_into()?; let hardware_version = "Translator Proxy".to_string().try_into()?; let firmware = String::new().try_into()?; @@ -501,7 +504,7 @@ impl Upstream { max_version, flags, endpoint_host, - endpoint_port: 50, + endpoint_port: address.port(), vendor, hardware_version, firmware, diff --git a/pool-apps/pool/src/lib/downstream/common_message_handler.rs b/pool-apps/pool/src/lib/downstream/common_message_handler.rs index 3759f4ee..faafe5a5 100644 --- a/pool-apps/pool/src/lib/downstream/common_message_handler.rs +++ b/pool-apps/pool/src/lib/downstream/common_message_handler.rs @@ -6,7 +6,8 @@ use std::{convert::TryInto, sync::atomic::Ordering}; use stratum_apps::{ stratum_core::{ common_messages_sv2::{ - has_requires_std_job, has_work_selection, SetupConnection, SetupConnectionSuccess, + has_requires_std_job, has_work_selection, Protocol, SetupConnection, + SetupConnectionError, SetupConnectionSuccess, }, handlers_sv2::HandleCommonMessagesFromClientAsync, parsers_sv2::{AnyMessage, Tlv}, @@ -30,7 +31,7 @@ impl HandleCommonMessagesFromClientAsync for Downstream { async fn handle_setup_connection( &mut self, - _client_id: Option, + client_id: Option, msg: SetupConnection<'_>, _tlv_fields: Option<&[Tlv]>, ) -> Result<(), Self::Error> { @@ -39,6 +40,34 @@ impl HandleCommonMessagesFromClientAsync for Downstream { msg.min_version, msg.flags ); + let downstream_id = client_id.expect("downstream id should be present"); + + if msg.protocol != Protocol::MiningProtocol { + info!("Rejecting connection from {downstream_id}: SetupConnection asking for other protocols than mining protocol."); + let response = SetupConnectionError { + flags: 0, + error_code: "unsupported-protocol" + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + let frame: Sv2Frame = AnyMessage::Common(response.into_static().into()) + .try_into() + .map_err(PoolError::shutdown)?; + self.downstream_channel + .downstream_sender + .send(frame) + .await + .map_err(|_| { + PoolError::disconnect(PoolErrorKind::ChannelErrorSender, downstream_id) + })?; + + return Err(PoolError::disconnect( + PoolErrorKind::UnsupportedProtocol, + downstream_id, + )); + } + self.requires_custom_work .store(has_work_selection(msg.flags), Ordering::SeqCst); self.requires_standard_jobs diff --git a/pool-apps/pool/src/lib/downstream/mod.rs b/pool-apps/pool/src/lib/downstream/mod.rs index a876482b..e84f31b6 100644 --- a/pool-apps/pool/src/lib/downstream/mod.rs +++ b/pool-apps/pool/src/lib/downstream/mod.rs @@ -172,6 +172,11 @@ impl Downstream { // Setup initial connection if let Err(e) = self.setup_connection_with_downstream().await { error!(?e, "Failed to set up downstream connection"); + + // sleep to make sure SetupConnectionError is sent + // before we break the TCP connection + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + handle_error(&status_sender, e).await; return; } @@ -234,8 +239,12 @@ impl Downstream { // The first ever message received on a new downstream connection // should always be a setup connection message. if header.msg_type() == MESSAGE_TYPE_SETUP_CONNECTION { - self.handle_common_message_frame_from_client(None, header, frame.payload()) - .await?; + self.handle_common_message_frame_from_client( + Some(self.downstream_id), + header, + frame.payload(), + ) + .await?; return Ok(()); } Err(PoolError::disconnect( diff --git a/pool-apps/pool/src/lib/error.rs b/pool-apps/pool/src/lib/error.rs index c078ce99..73291a69 100644 --- a/pool-apps/pool/src/lib/error.rs +++ b/pool-apps/pool/src/lib/error.rs @@ -179,6 +179,8 @@ pub enum PoolErrorKind { FailedToSendCoinbaseOutputConstraints, /// BitcoinCoreSv2 cancellation token activated BitcoinCoreSv2CancellationTokenActivated, + /// Unsupported Protocol + UnsupportedProtocol, /// Setup connection error SetupConnectionError, /// endpoint change error @@ -268,7 +270,8 @@ impl std::fmt::Display for PoolErrorKind { } BitcoinCoreSv2CancellationTokenActivated => { write!(f, "BitcoinCoreSv2 cancellation token activated") - } + }, + UnsupportedProtocol => write!(f, "Protocol not supported"), SetupConnectionError => { write!(f, "Failed to Setup connection") } diff --git a/pool-apps/pool/src/lib/utils.rs b/pool-apps/pool/src/lib/utils.rs index ffe98495..9cf3d0f3 100644 --- a/pool-apps/pool/src/lib/utils.rs +++ b/pool-apps/pool/src/lib/utils.rs @@ -26,8 +26,9 @@ pub enum ShutdownMessage { pub fn get_setup_connection_message( min_version: u16, max_version: u16, + address: &SocketAddr, ) -> Result, PoolErrorKind> { - let endpoint_host = "0.0.0.0".to_string().into_bytes().try_into()?; + let endpoint_host = address.ip().to_string().into_bytes().try_into()?; let vendor = String::new().try_into()?; let hardware_version = String::new().try_into()?; let firmware = String::new().try_into()?; @@ -39,7 +40,7 @@ pub fn get_setup_connection_message( max_version, flags, endpoint_host, - endpoint_port: 50, + endpoint_port: address.port(), vendor, hardware_version, firmware,