Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions miner-apps/jd-client/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::Parser;
use ext_config::{Config, File, FileFormat};
use jd_client_sv2::{config::JobDeclaratorClientConfig, error::JDCError};
use jd_client_sv2::{config::JobDeclaratorClientConfig, error::JDCErrorKind};

use std::path::PathBuf;
use tracing::error;
Expand All @@ -23,12 +23,12 @@ pub struct Args {
}

#[allow(clippy::result_large_err)]
pub fn process_cli_args() -> Result<JobDeclaratorClientConfig, JDCError> {
pub fn process_cli_args() -> Result<JobDeclaratorClientConfig, JDCErrorKind> {
let args = Args::parse();

let config_path = args.config_path.to_str().ok_or_else(|| {
error!("Invalid configuration path.");
JDCError::BadCliArgs
JDCErrorKind::BadCliArgs
})?;

let settings = Config::builder()
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{channel_manager::ChannelManager, error::JDCError};
use crate::{
channel_manager::ChannelManager,
error::{self, JDCError, JDCErrorKind},
};
use stratum_apps::{
stratum_core::{
binary_sv2::Seq064K,
Expand All @@ -12,7 +15,7 @@ use tracing::{error, info};

#[cfg_attr(not(test), hotpath::measure_all)]
impl HandleExtensionsFromServerAsync for ChannelManager {
type Error = JDCError;
type Error = JDCError<error::ChannelManager>;

fn get_negotiated_extensions_with_server(
&self,
Expand Down Expand Up @@ -47,7 +50,9 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
"Server does not support our required extensions {:?}. Connection should fail over to another upstream.",
missing_required
);
return Err(JDCError::RequiredExtensionsNotSupported(missing_required));
return Err(JDCError::fallback(
JDCErrorKind::RequiredExtensionsNotSupported(missing_required),
));
}

// Store the negotiated extensions in the shared channel manager data
Expand Down Expand Up @@ -88,7 +93,9 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
"Server does not support our required extensions {:?}. Connection should fail over to another upstream.",
missing_required
);
return Err(JDCError::RequiredExtensionsNotSupported(missing_required));
return Err(JDCError::fallback(
JDCErrorKind::RequiredExtensionsNotSupported(missing_required),
));
}

// Check if server requires extensions - if we support them, we should retry with them
Expand Down Expand Up @@ -117,8 +124,8 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
"Server requires extensions {:?} that we don't support. Connection should fail over to another upstream.",
cannot_support
);
return Err(JDCError::ServerRequiresUnsupportedExtensions(
cannot_support,
return Err(JDCError::fallback(
JDCErrorKind::ServerRequiresUnsupportedExtensions(cannot_support),
));
}

Expand All @@ -134,15 +141,17 @@ impl HandleExtensionsFromServerAsync for ChannelManager {
};

let sv2_frame: Sv2Frame =
AnyMessage::Extensions(new_require_extensions.into_static().into()).try_into()?;
AnyMessage::Extensions(new_require_extensions.into_static().into())
.try_into()
.map_err(JDCError::shutdown)?;

self.channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await
.map_err(|e| {
error!("Failed to send message to upstream: {:?}", e);
JDCError::ChannelErrorSender
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
})?;
}

Expand Down
51 changes: 26 additions & 25 deletions miner-apps/jd-client/src/lib/channel_manager/jd_message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ use tracing::{debug, error, info, warn};

use crate::{
channel_manager::ChannelManager,
error::JDCError,
status::{State, Status},
error::{self, JDCError, JDCErrorKind},
};

#[cfg_attr(not(test), hotpath::measure_all)]
impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
type Error = JDCError;
type Error = JDCError<error::ChannelManager>;

fn get_negotiated_extensions_with_server(
&self,
Expand Down Expand Up @@ -65,7 +64,7 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
info!("Coinbase outputs from JDS changed, recalculating constraints");
let deserialized_jds_coinbase_outputs: Vec<TxOut> =
bitcoin::consensus::deserialize(&msg.coinbase_outputs.to_vec())
.map_err(JDCError::BitcoinEncodeError)?;
.map_err(JDCError::shutdown)?;

let max_additional_size: usize = deserialized_jds_coinbase_outputs
.iter()
Expand Down Expand Up @@ -103,7 +102,7 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
.tp_sender
.send(coinbase_output_constraints_message)
.await
.map_err(|_e| JDCError::ChannelErrorSender)?;
.map_err(|_e| JDCError::shutdown(JDCErrorKind::ChannelErrorSender))?;

info!("Sent updated CoinbaseOutputConstraints to TP channel");
} else {
Expand Down Expand Up @@ -132,15 +131,7 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
) -> Result<(), Self::Error> {
warn!("Received: {}", msg);
warn!("⚠️ JDS refused the declared job with a DeclareMiningJobError ❌. Starting fallback mechanism.");
self.channel_manager_channel
.status_sender
.send(Status {
state: State::JobDeclaratorShutdownFallback(JDCError::Shutdown),
})
.await
.map_err(|_e| JDCError::ChannelErrorSender)?;

Ok(())
Err(JDCError::fallback(JDCErrorKind::DeclareMiningJobError))
}

// Handles a `DeclareMiningJobSuccess` message from the JDS.
Expand Down Expand Up @@ -175,17 +166,23 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
"No last_declare_job found for request_id={}",
msg.request_id
);
return Err(JDCError::LastDeclareJobNotFound(msg.request_id));
return Err(JDCError::log(JDCErrorKind::LastDeclareJobNotFound(
msg.request_id,
)));
};

let Some(prevhash) = last_declare_job.prev_hash else {
error!("Prevhash not found for request_id = {}", msg.request_id);
return Err(JDCError::LastNewPrevhashNotFound);
return Err(JDCError::log(JDCErrorKind::LastNewPrevhashNotFound));
};

let outputs = match deserialize_outputs(last_declare_job.coinbase_output.clone()) {
Ok(outputs) => outputs,
Err(_) => return Err(JDCError::ChannelManagerHasBadCoinbaseOutputs),
Err(_) => {
return Err(JDCError::shutdown(
JDCErrorKind::ChannelManagerHasBadCoinbaseOutputs,
))
}
};

let Some(custom_job) = self
Expand All @@ -206,10 +203,11 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
Some(custom_job)
})
else {
return Err(JDCError::FailedToCreateCustomJob);
return Err(JDCError::log(JDCErrorKind::FailedToCreateCustomJob));
};

let custom_job = custom_job.map_err(|_e| JDCError::FailedToCreateCustomJob)?;
let custom_job =
custom_job.map_err(|_e| JDCError::log(JDCErrorKind::FailedToCreateCustomJob))?;

self.channel_manager_data.super_safe_lock(|data| {
if let Some(value) = data.last_declare_job_store.get_mut(&msg.request_id) {
Expand All @@ -221,12 +219,14 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {

debug!("Sending SetCustomMiningJob to the upstream with channel_id: {channel_id}");
let message = Mining::SetCustomMiningJob(custom_job).into_static();
let sv2_frame: Sv2Frame = AnyMessage::Mining(message).try_into()?;
let sv2_frame: Sv2Frame = AnyMessage::Mining(message)
.try_into()
.map_err(JDCError::shutdown)?;
self.channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await
.map_err(|_e| JDCError::ChannelErrorSender)?;
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;

info!("Successfully sent SetCustomMiningJob to the upstream with channel_id: {channel_id}");
Ok(())
Expand Down Expand Up @@ -259,7 +259,9 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {
"No transaction list found for request_id={}",
msg.request_id
);
return Err(JDCError::LastDeclareJobNotFound(msg.request_id));
return Err(JDCError::log(JDCErrorKind::LastDeclareJobNotFound(
msg.request_id,
)));
};

let full_tx_list: Vec<B016M> = entry
Expand All @@ -286,16 +288,15 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager {

let response = ProvideMissingTransactionsSuccess {
request_id: msg.request_id,
transaction_list: binary_sv2::Seq064K::new(missing_txns)
.map_err(JDCError::BinarySv2)?,
transaction_list: binary_sv2::Seq064K::new(missing_txns).map_err(JDCError::shutdown)?,
};
let message = JobDeclaration::ProvideMissingTransactionsSuccess(response);

self.channel_manager_channel
.jd_sender
.send(message)
.await
.map_err(|_e| JDCError::ChannelErrorSender)?;
.map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?;

info!("Successfully sent ProvideMissingTransactionsSuccess to the JDS with request_id: {request_id}");

Expand Down
Loading
Loading