From b61bcd27caab3ef0689c1f9df0f24bdda6f508b4 Mon Sep 17 00:00:00 2001 From: Armando Dutra Date: Fri, 24 Mar 2023 10:23:34 -0300 Subject: [PATCH] monitoring funding tx mining --- src/bus/ctl.rs | 13 +++++++- src/channeld/automata/accept.rs | 8 +++++ src/channeld/automata/propose.rs | 6 +++- src/watchd/runtime.rs | 16 ++++----- src/watchd/worker.rs | 56 ++++++++++++++++++++++++++------ 5 files changed, 78 insertions(+), 21 deletions(-) diff --git a/src/bus/ctl.rs b/src/bus/ctl.rs index 066e3cc..ed988f6 100644 --- a/src/bus/ctl.rs +++ b/src/bus/ctl.rs @@ -86,7 +86,7 @@ pub enum CtlMsg { /// Reports changes in the mining status for previously requested transaction tracked by an /// on-chain service #[display("tx_found({0})")] - TxFound(TxStatus), + TxFound(TxConfirmation), // Routing & payments /// Request to channel daemon to perform payment using provided route @@ -260,6 +260,17 @@ pub struct TxStatus { pub block_pos: Option, } +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display)] +#[derive(NetworkEncode, NetworkDecode)] +#[display("{txid}, ...")] +pub struct TxConfirmation { + /// Id of a transaction previously requested to be tracked + pub txid: Txid, + + /// number of block confirmations + pub confirmations: u32, +} + #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display, NetworkEncode, NetworkDecode)] #[display("{client}, {status}")] pub struct Report { diff --git a/src/channeld/automata/accept.rs b/src/channeld/automata/accept.rs index 0ff70f6..a4e9719 100644 --- a/src/channeld/automata/accept.rs +++ b/src/channeld/automata/accept.rs @@ -167,6 +167,14 @@ fn finish_signed(event: Event, runtime: &mut Runtime) -> Result { diff --git a/src/channeld/automata/propose.rs b/src/channeld/automata/propose.rs index b8b5d7e..e7e8d00 100644 --- a/src/channeld/automata/propose.rs +++ b/src/channeld/automata/propose.rs @@ -284,7 +284,11 @@ fn complete_funding( let txid = runtime.state.channel.funding().txid(); debug!("Waiting for funding transaction {} to be mined", txid); - runtime.send_ctl(event.endpoints, ServiceId::Watch, CtlMsg::Track { txid, depth: 0 })?; + let core = runtime.state.channel.constructor(); + runtime.send_ctl(event.endpoints, ServiceId::Watch, CtlMsg::Track { + txid, + depth: core.common_params().minimum_depth, + })?; Ok(ChannelPropose::Published) } diff --git a/src/watchd/runtime.rs b/src/watchd/runtime.rs index c0798d3..e00df3a 100644 --- a/src/watchd/runtime.rs +++ b/src/watchd/runtime.rs @@ -34,7 +34,7 @@ pub fn run(config: Config) -> Result<(), Error> { rx.bind("inproc://electrum-bridge")?; let (sender, receiver) = mpsc::channel::(); - let electrum_worker = ElectrumWorker::with(sender, &config.electrum_url, 5)?; + let electrum_worker = ElectrumWorker::with(sender, &config.electrum_url, 15)?; debug!("Starting electrum watcher thread"); let watcher_runtime = WatcherRuntime::with(receiver, tx)?; @@ -88,16 +88,14 @@ impl WatcherRuntime { // TODO: Forward all electrum notifications over the bridge // self.send_over_bridge(msg.into()).expect("watcher bridge is halted"); match msg { - ElectrumUpdate::TxBatch(transactions, _) => { + ElectrumUpdate::TxConfirmations(transactions, _) => { for transaction in transactions { - self.send_over_bridge(BusMsg::Ctl(CtlMsg::TxFound(crate::bus::TxStatus { - txid: transaction.txid(), - block_pos: None, - }))) - .expect("unable forward electrum notifications over the bridge"); + self.send_over_bridge(BusMsg::Ctl(CtlMsg::TxFound(transaction))) + .expect("unable forward electrum notifications over the bridge"); } } - ElectrumUpdate::Connecting + ElectrumUpdate::TxBatch(..) + | ElectrumUpdate::Connecting | ElectrumUpdate::Connected | ElectrumUpdate::Complete | ElectrumUpdate::FeeEstimate(..) @@ -170,7 +168,7 @@ impl Runtime { match request { CtlMsg::TxFound(tx_status) => { if let Some((required_height, service_id)) = self.track_list.get(&tx_status.txid) { - if *required_height >= tx_status.block_pos.map(|b| b.pos).unwrap_or_default() { + if *required_height <= tx_status.confirmations { let service_id = service_id.clone(); self.untrack(tx_status.txid); match self.electrum_worker.untrack_transaction(tx_status.txid) { diff --git a/src/watchd/worker.rs b/src/watchd/worker.rs index 2fea972..39cd1a2 100644 --- a/src/watchd/worker.rs +++ b/src/watchd/worker.rs @@ -13,13 +13,16 @@ // TODO: Consider making it part of descriptor wallet onchain library +use std::collections::BTreeMap; use std::sync::mpsc; use std::thread::{self, JoinHandle}; use std::time::Duration; -use bitcoin::{Transaction, Txid}; +use bitcoin::{Script, Transaction, Txid}; use electrum_client::{Client as ElectrumClient, ElectrumApi, HeaderNotification}; +use crate::bus::TxConfirmation; + #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display, Error, From)] #[display("failed electrum watcher channel")] #[from(mpsc::SendError)] @@ -48,6 +51,9 @@ pub enum ElectrumUpdate { #[display("tx_batch(...)")] TxBatch(Vec, f32), + #[display("tx_confirmations(...)")] + TxConfirmations(Vec, u32), + #[display("channel_disconnected")] ChannelDisconnected, @@ -82,7 +88,6 @@ impl ElectrumWorker { .spawn(move || loop { thread::sleep(Duration::from_secs(interval)); sender.send(ElectrumCmd::GetTrasactions).expect("Electrum thread is dead"); - sender.send(ElectrumCmd::PopHeader).expect("Electrum thread is dead") }) .expect("unable to start blockchain watcher pacemaker thread"); @@ -190,10 +195,45 @@ impl ElectrumProcessor { &mut self, txids: &Vec, ) -> Result, electrum_client::Error> { - if self.tracks.is_empty() { + if self.tracks.is_empty() || txids.is_empty() { + return Ok(None); + } + let transactions = self.client.batch_transaction_get(txids)?; + let scripts: Vec