Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions integration-tests/tests/pool_integration.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// This file contains integration tests for the `PoolSv2` module.
//
// `PoolSv2` is a module that implements the Pool role in the Stratum V2 protocol.
use integration_tests_sv2::{interceptor::MessageDirection, template_provider::DifficultyLevel, *};
use integration_tests_sv2::{
interceptor::{MessageDirection, ReplaceMessage},
template_provider::DifficultyLevel,
*,
};
use stratum_apps::stratum_core::{
common_messages_sv2::{has_work_selection, Protocol, SetupConnection, *},
mining_sv2::*,
parsers_sv2::{AnyMessage, CommonMessages, Mining, TemplateDistribution},
parsers_sv2::{self, AnyMessage, CommonMessages, Mining, TemplateDistribution},
template_distribution_sv2::*,
};

Expand Down Expand Up @@ -339,3 +343,68 @@ async fn pool_does_not_send_jobs_to_jdc() {
"Pool should NOT send non-future NewExtendedMiningJob messages to JDC"
);
}

// The test runs pool and translator, with translator sending a SetupConnection message
// with a wrong protocol, this test asserts whether pool sends SetupConnection error or
// not to such downstream.
#[tokio::test]
async fn pool_reject_setup_connection_with_non_mining_protocol() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low);
let (_pool, pool_addr) = start_pool(sv2_tp_config(tp_addr), vec![], vec![]).await;
let endpoint_host = "127.0.0.1".to_string().into_bytes().try_into().unwrap();
let vendor = String::new().try_into().unwrap();
let hardware_version = String::new().try_into().unwrap();
let firmware = String::new().try_into().unwrap();
let device_id = String::new().try_into().unwrap();

let setup_connection_replace = ReplaceMessage::new(
MessageDirection::ToUpstream,
MESSAGE_TYPE_SETUP_CONNECTION,
AnyMessage::Common(parsers_sv2::CommonMessages::SetupConnection(
SetupConnection {
protocol: Protocol::TemplateDistributionProtocol,
min_version: 2,
max_version: 2,
flags: 0b0000_0000_0000_0000_0000_0000_0000_0000,
endpoint_host,
endpoint_port: 1212,
vendor,
hardware_version,
firmware,
device_id,
},
)),
);
let (pool_translator_sniffer, pool_translator_sniffer_addr) = start_sniffer(
"0",
pool_addr,
false,
vec![setup_connection_replace.into()],
None,
);
let (_tproxy, _) =
start_sv2_translator(&[pool_translator_sniffer_addr], false, vec![], vec![], None).await;

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

pool_translator_sniffer
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_ERROR,
)
.await;
let setup_connection_error = pool_translator_sniffer.next_message_from_upstream();
let setup_connection_error = match setup_connection_error {
Some((_, AnyMessage::Common(CommonMessages::SetupConnectionError(msg)))) => msg,
msg => panic!("Expected SetupConnectionError message, found: {:?}", msg),
};
assert_eq!(
setup_connection_error.error_code.as_utf8_or_hex(),
"unsupported-protocol",
"SetupConnectionError message error code should be unsupported-protocol"
);
}
5 changes: 5 additions & 0 deletions miner-apps/jd-client/src/lib/downstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ impl Downstream {
// Setup initial connection
if let Err(e) = self.setup_connection_with_downstream().await {
error!(?e, "Failed to set up downstream connection");

// sleep to make sure SetupConnectionError is sent
// before we break the TCP connection
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

handle_error(&status_sender, e).await;
return;
}
Expand Down
6 changes: 5 additions & 1 deletion miner-apps/jd-client/src/lib/upstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct Upstream {
upstream_channel: UpstreamChannel,
/// Protocol extensions that the JDC requires
required_extensions: Vec<u16>,
/// Upstream address
address: SocketAddr,
}

#[cfg_attr(not(test), hotpath::measure_all)]
Expand Down Expand Up @@ -129,6 +131,7 @@ impl Upstream {
upstream_data,
upstream_channel,
required_extensions,
address: *addr,
})
}

Expand All @@ -142,7 +145,8 @@ impl Upstream {
) -> JDCResult<(), error::Upstream> {
info!("Upstream: initiating SV2 handshake...");
let setup_connection =
get_setup_connection_message(min_version, max_version).map_err(JDCError::shutdown)?;
get_setup_connection_message(min_version, max_version, &self.address)
.map_err(JDCError::shutdown)?;
debug!(?setup_connection, "Prepared `SetupConnection` message");
let sv2_frame: Sv2Frame = Message::Common(setup_connection.into())
.try_into()
Expand Down
5 changes: 3 additions & 2 deletions miner-apps/jd-client/src/lib/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ pub enum ShutdownMessage {
pub fn get_setup_connection_message(
min_version: u16,
max_version: u16,
address: &SocketAddr,
) -> Result<SetupConnection<'static>, JDCErrorKind> {
let endpoint_host = "0.0.0.0".to_string().into_bytes().try_into()?;
let endpoint_host = address.ip().to_string().into_bytes().try_into()?;
let vendor = String::new().try_into()?;
let hardware_version = String::new().try_into()?;
let firmware = String::new().try_into()?;
Expand All @@ -67,7 +68,7 @@ pub fn get_setup_connection_message(
max_version,
flags,
endpoint_host,
endpoint_port: 50,
endpoint_port: address.port(),
vendor,
hardware_version,
firmware,
Expand Down
13 changes: 8 additions & 5 deletions miner-apps/translator/src/lib/sv2/upstream/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
utils::{ShutdownMessage, UpstreamEntry},
};
use async_channel::{unbounded, Receiver, Sender};
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};
use stratum_apps::{
network_helpers::noise_stream::NoiseTcpStream,
stratum_core::{
Expand Down Expand Up @@ -48,6 +48,7 @@ pub struct Upstream {
pub upstream_channel_state: UpstreamChannelState,
/// Extensions that the translator requires (must be supported by server)
pub required_extensions: Vec<u16>,
address: SocketAddr,
}

#[cfg_attr(not(test), hotpath::measure_all)]
Expand Down Expand Up @@ -130,6 +131,7 @@ impl Upstream {
return Ok(Self {
upstream_channel_state,
required_extensions: required_extensions.clone(),
address: upstream.addr,
});
}
Err(e) => {
Expand Down Expand Up @@ -228,8 +230,8 @@ impl Upstream {
pub async fn setup_connection(&mut self) -> TproxyResult<(), error::Upstream> {
debug!("Upstream: initiating SV2 handshake...");
// Build SetupConnection message
let setup_conn_msg =
Self::get_setup_connection_message(2, 2, false).map_err(TproxyError::shutdown)?;
let setup_conn_msg = Self::get_setup_connection_message(2, 2, &self.address, false)
.map_err(TproxyError::shutdown)?;
let sv2_frame: Sv2Frame =
Message::Common(setup_conn_msg.into())
.try_into()
Expand Down Expand Up @@ -482,9 +484,10 @@ impl Upstream {
fn get_setup_connection_message(
min_version: u16,
max_version: u16,
address: &SocketAddr,
is_work_selection_enabled: bool,
) -> Result<SetupConnection<'static>, TproxyErrorKind> {
let endpoint_host = "0.0.0.0".to_string().into_bytes().try_into()?;
let endpoint_host = address.ip().to_string().into_bytes().try_into()?;
let vendor = "SRI".to_string().try_into()?;
let hardware_version = "Translator Proxy".to_string().try_into()?;
let firmware = String::new().try_into()?;
Expand All @@ -501,7 +504,7 @@ impl Upstream {
max_version,
flags,
endpoint_host,
endpoint_port: 50,
endpoint_port: address.port(),
vendor,
hardware_version,
firmware,
Expand Down
33 changes: 31 additions & 2 deletions pool-apps/pool/src/lib/downstream/common_message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{convert::TryInto, sync::atomic::Ordering};
use stratum_apps::{
stratum_core::{
common_messages_sv2::{
has_requires_std_job, has_work_selection, SetupConnection, SetupConnectionSuccess,
has_requires_std_job, has_work_selection, Protocol, SetupConnection,
SetupConnectionError, SetupConnectionSuccess,
},
handlers_sv2::HandleCommonMessagesFromClientAsync,
parsers_sv2::{AnyMessage, Tlv},
Expand All @@ -30,7 +31,7 @@ impl HandleCommonMessagesFromClientAsync for Downstream {

async fn handle_setup_connection(
&mut self,
_client_id: Option<usize>,
client_id: Option<usize>,
msg: SetupConnection<'_>,
_tlv_fields: Option<&[Tlv]>,
) -> Result<(), Self::Error> {
Expand All @@ -39,6 +40,34 @@ impl HandleCommonMessagesFromClientAsync for Downstream {
msg.min_version, msg.flags
);

let downstream_id = client_id.expect("downstream id should be present");

if msg.protocol != Protocol::MiningProtocol {
info!("Rejecting connection from {downstream_id}: SetupConnection asking for other protocols than mining protocol.");
let response = SetupConnectionError {
flags: 0,
error_code: "unsupported-protocol"
.to_string()
.try_into()
.expect("error code must be valid string"),
};
let frame: Sv2Frame = AnyMessage::Common(response.into_static().into())
.try_into()
.map_err(PoolError::shutdown)?;
self.downstream_channel
.downstream_sender
.send(frame)
.await
.map_err(|_| {
PoolError::disconnect(PoolErrorKind::ChannelErrorSender, downstream_id)
})?;

return Err(PoolError::disconnect(
PoolErrorKind::UnsupportedProtocol,
downstream_id,
));
}

self.requires_custom_work
.store(has_work_selection(msg.flags), Ordering::SeqCst);
self.requires_standard_jobs
Expand Down
13 changes: 11 additions & 2 deletions pool-apps/pool/src/lib/downstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ impl Downstream {
// Setup initial connection
if let Err(e) = self.setup_connection_with_downstream().await {
error!(?e, "Failed to set up downstream connection");

// sleep to make sure SetupConnectionError is sent
// before we break the TCP connection
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

handle_error(&status_sender, e).await;
return;
}
Expand Down Expand Up @@ -234,8 +239,12 @@ impl Downstream {
// The first ever message received on a new downstream connection
// should always be a setup connection message.
if header.msg_type() == MESSAGE_TYPE_SETUP_CONNECTION {
self.handle_common_message_frame_from_client(None, header, frame.payload())
.await?;
self.handle_common_message_frame_from_client(
Some(self.downstream_id),
header,
frame.payload(),
)
.await?;
return Ok(());
}
Err(PoolError::disconnect(
Expand Down
5 changes: 4 additions & 1 deletion pool-apps/pool/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ pub enum PoolErrorKind {
FailedToSendCoinbaseOutputConstraints,
/// BitcoinCoreSv2 cancellation token activated
BitcoinCoreSv2CancellationTokenActivated,
/// Unsupported Protocol
UnsupportedProtocol,
/// Setup connection error
SetupConnectionError,
/// endpoint change error
Expand Down Expand Up @@ -268,7 +270,8 @@ impl std::fmt::Display for PoolErrorKind {
}
BitcoinCoreSv2CancellationTokenActivated => {
write!(f, "BitcoinCoreSv2 cancellation token activated")
}
},
UnsupportedProtocol => write!(f, "Protocol not supported"),
SetupConnectionError => {
write!(f, "Failed to Setup connection")
}
Expand Down
5 changes: 3 additions & 2 deletions pool-apps/pool/src/lib/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ pub enum ShutdownMessage {
pub fn get_setup_connection_message(
min_version: u16,
max_version: u16,
address: &SocketAddr,
) -> Result<SetupConnection<'static>, PoolErrorKind> {
let endpoint_host = "0.0.0.0".to_string().into_bytes().try_into()?;
let endpoint_host = address.ip().to_string().into_bytes().try_into()?;
let vendor = String::new().try_into()?;
let hardware_version = String::new().try_into()?;
let firmware = String::new().try_into()?;
Expand All @@ -39,7 +40,7 @@ pub fn get_setup_connection_message(
max_version,
flags,
endpoint_host,
endpoint_port: 50,
endpoint_port: address.port(),
vendor,
hardware_version,
firmware,
Expand Down
Loading