Skip to content

Commit 915906b

Browse files
committed
send cached share on custom job arrival
1 parent b53852f commit 915906b

File tree

2 files changed

+136
-23
lines changed

2 files changed

+136
-23
lines changed

miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
error::{ChannelSv2Error, JDCError},
2525
jd_mode::{get_jd_mode, JdMode},
2626
status::{State, Status},
27-
utils::{create_close_channel_msg, UpstreamState},
27+
utils::{create_close_channel_msg, validate_share, UpstreamState},
2828
};
2929

3030
#[hotpath::measure_all]
@@ -557,32 +557,66 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
557557
_tlv_fields: Option<&[Tlv]>,
558558
) -> Result<(), Self::Error> {
559559
info!("Received: {} ✅", msg);
560+
561+
let mut messages = Vec::new();
562+
560563
self.channel_manager_data.super_safe_lock(|data| {
561-
if let Some(last_declare_job) = data.last_declare_job_store.remove(&msg.request_id) {
562-
let template_id = last_declare_job.template.template_id;
563-
data.last_declare_job_store
564-
.retain(|_, job| job.template.template_id != template_id);
565-
566-
data.template_id_to_upstream_job_id
567-
.insert(last_declare_job.template.template_id, msg.job_id);
568-
debug!(job_id = msg.job_id, "Mapped custom job into template store");
569-
if let (Some(upstream_channel), Some(set_custom_job)) = (
570-
data.upstream_channel.as_mut(),
571-
last_declare_job.set_custom_mining_job,
572-
) {
573-
if let Err(e) =
574-
upstream_channel.on_set_custom_mining_job_success(set_custom_job, msg)
575-
{
576-
error!("Custom mining job success validation failed: {e:#?}");
577-
}
578-
}
579-
} else {
564+
let Some(last_declare_job) = data.last_declare_job_store.remove(&msg.request_id) else {
580565
warn!(
581566
request_id = msg.request_id,
582567
"No matching declare job found for custom job success"
583568
);
569+
return;
570+
};
571+
572+
let template_id = last_declare_job.template.template_id;
573+
574+
data.last_declare_job_store
575+
.retain(|_, job| job.template.template_id != template_id);
576+
577+
data.template_id_to_upstream_job_id
578+
.insert(template_id, msg.job_id);
579+
580+
let job_id = msg.job_id;
581+
582+
let cached = data.cached_shares.remove(&template_id);
583+
584+
let Some(upstream_channel) = data.upstream_channel.as_mut() else {
585+
debug!("No upstream channel available");
586+
return;
587+
};
588+
589+
let Some(set_custom_job) = last_declare_job.set_custom_mining_job else {
590+
debug!("No custom job found");
591+
return;
592+
};
593+
594+
if let Err(e) = upstream_channel.on_set_custom_mining_job_success(set_custom_job, msg) {
595+
error!("Custom mining job success validation failed: {e:#?}");
596+
return;
597+
}
598+
599+
let Some(cached) = cached else {
600+
return;
601+
};
602+
603+
debug!(
604+
"Flushing {} cached shares for template_id={}",
605+
cached.len(),
606+
template_id
607+
);
608+
609+
for mut share in cached {
610+
share.job_id = job_id;
611+
612+
validate_share(share, data, &mut messages);
584613
}
585614
});
615+
616+
for msg in messages {
617+
_ = msg.forward(&self.channel_manager_channel).await;
618+
}
619+
586620
Ok(())
587621
}
588622

miner-apps/jd-client/src/lib/utils.rs

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,24 @@ use std::{
2222
use stratum_apps::{
2323
stratum_core::{
2424
binary_sv2::Str0255,
25+
channels_sv2::client,
2526
common_messages_sv2::{Protocol, SetupConnection},
26-
mining_sv2::{CloseChannel, OpenExtendedMiningChannel, OpenStandardMiningChannel},
27-
parsers_sv2::Mining,
27+
job_declaration_sv2::PushSolution,
28+
mining_sv2::{
29+
CloseChannel, OpenExtendedMiningChannel, OpenStandardMiningChannel,
30+
SubmitSharesExtended,
31+
},
32+
parsers_sv2::{JobDeclaration, Mining},
2833
},
2934
utils::types::{ChannelId, DownstreamId, Hashrate, JobId},
3035
};
36+
use tracing::{debug, info};
3137

32-
use crate::{config::ConfigJDCMode, error::JDCError};
38+
use crate::{
39+
channel_manager::{downstream_message_handler::RouteMessageTo, ChannelManagerData},
40+
config::ConfigJDCMode,
41+
error::JDCError,
42+
};
3343

3444
/// Represents a message that can trigger shutdown of various system components.
3545
#[derive(Debug, Clone)]
@@ -303,3 +313,72 @@ impl From<(DownstreamId, ChannelId, JobId)> for DownstreamChannelJobId {
303313
}
304314
}
305315
}
316+
317+
pub fn validate_share(
318+
mut upstream_message: SubmitSharesExtended<'static>,
319+
channel_manager_data: &mut ChannelManagerData,
320+
messages: &mut Vec<RouteMessageTo>,
321+
) {
322+
let Some(upstream_channel) = channel_manager_data.upstream_channel.as_mut() else {
323+
return;
324+
};
325+
let Some(prev_hash) = channel_manager_data.last_new_prev_hash.as_ref() else {
326+
return;
327+
};
328+
329+
match upstream_channel.validate_share(upstream_message.clone()) {
330+
Ok(client::share_accounting::ShareValidationResult::Valid(share_hash)) => {
331+
upstream_message.sequence_number = channel_manager_data
332+
.sequence_number_factory
333+
.fetch_add(1, Ordering::Relaxed);
334+
335+
info!(
336+
"Flushed cached extended share: valid | ch={}, seq={}, hash={} ✅",
337+
upstream_message.channel_id, upstream_message.sequence_number, share_hash
338+
);
339+
340+
messages.push(Mining::SubmitSharesExtended(upstream_message.into_static()).into());
341+
}
342+
343+
Ok(client::share_accounting::ShareValidationResult::BlockFound(share_hash)) => {
344+
upstream_message.sequence_number = channel_manager_data
345+
.sequence_number_factory
346+
.fetch_add(1, Ordering::Relaxed);
347+
348+
info!("💰 Block Found (cached extended)!!! 💰 {share_hash}");
349+
350+
let mut channel_extranonce = upstream_channel.get_extranonce_prefix().to_vec();
351+
channel_extranonce.extend_from_slice(&upstream_message.extranonce.to_vec());
352+
353+
let push_solution = PushSolution {
354+
extranonce: channel_extranonce.try_into().expect("extranonce"),
355+
ntime: upstream_message.ntime,
356+
nonce: upstream_message.nonce,
357+
version: upstream_message.version,
358+
nbits: prev_hash.n_bits,
359+
prev_hash: prev_hash.prev_hash.clone(),
360+
};
361+
362+
messages.push(JobDeclaration::PushSolution(push_solution).into());
363+
messages.push(Mining::SubmitSharesExtended(upstream_message.into_static()).into());
364+
}
365+
366+
Err(err) => {
367+
let code = match err {
368+
client::share_accounting::ShareValidationError::Invalid => "invalid-share",
369+
client::share_accounting::ShareValidationError::Stale => "stale-share",
370+
client::share_accounting::ShareValidationError::InvalidJobId => "invalid-job-id",
371+
client::share_accounting::ShareValidationError::DoesNotMeetTarget => {
372+
"difficulty-too-low"
373+
}
374+
client::share_accounting::ShareValidationError::DuplicateShare => "duplicate-share",
375+
_ => unreachable!(),
376+
};
377+
378+
debug!(
379+
"❌ Cached extended share rejected: ch={}, seq={}, error={}",
380+
upstream_message.channel_id, upstream_message.sequence_number, code
381+
);
382+
}
383+
}
384+
}

0 commit comments

Comments
 (0)