Skip to content

Commit 122d94d

Browse files
committed
Support syncing with compact block filters
1 parent 9b58d36 commit 122d94d

File tree

9 files changed

+437
-28
lines changed

9 files changed

+437
-28
lines changed

client/src/bin/spaced.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,15 @@ impl Composer {
5353
}
5454
}
5555

56-
async fn setup_rpc_wallet(&mut self, spaced: &Spaced, rx: mpsc::Receiver<WalletLoadRequest>) {
56+
async fn setup_rpc_wallet(&mut self, spaced: &Spaced, rx: mpsc::Receiver<WalletLoadRequest>, cbf: bool) {
5757
let wallet_service = RpcWallet::service(
5858
spaced.network,
5959
spaced.rpc.clone(),
6060
spaced.chain.state.clone(),
6161
rx,
6262
self.shutdown.clone(),
6363
spaced.num_workers,
64+
cbf
6465
);
6566

6667
self.services.spawn(async move {
@@ -107,7 +108,7 @@ impl Composer {
107108
.map_err(|e| anyhow!("RPC Server error: {}", e))
108109
});
109110

110-
self.setup_rpc_wallet(spaced, wallet_loader_rx).await;
111+
self.setup_rpc_wallet(spaced, wallet_loader_rx, spaced.cbf).await;
111112
}
112113

113114
async fn setup_sync_service(&mut self, mut spaced: Spaced) {

client/src/cbf.rs

+258
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
use std::collections::{BTreeMap, HashSet, VecDeque};
2+
use std::time::Duration;
3+
use anyhow::anyhow;
4+
use log::info;
5+
use tokio::time::Instant;
6+
use spaces_protocol::bitcoin::BlockHash;
7+
use spaces_protocol::constants::ChainAnchor;
8+
use spaces_wallet::bdk_wallet::chain::{local_chain, BlockId, ConfirmationBlockTime, IndexedTxGraph, TxUpdate};
9+
use spaces_wallet::bdk_wallet::chain::keychain_txout::KeychainTxOutIndex;
10+
use spaces_wallet::bdk_wallet::{KeychainKind, Update};
11+
use spaces_wallet::bitcoin::bip158::BlockFilter;
12+
use spaces_wallet::bitcoin::ScriptBuf;
13+
use spaces_wallet::SpacesWallet;
14+
use crate::client::{BlockSource, BlockchainInfo};
15+
use crate::source::BitcoinBlockSource;
16+
17+
pub struct CompactFilterSync {
18+
graph: IndexedTxGraph<ConfirmationBlockTime, KeychainTxOutIndex<KeychainKind>>,
19+
chain: local_chain::LocalChain,
20+
chain_changeset: BTreeMap<u32, Option<BlockHash>>,
21+
scripts: HashSet<ScriptBuf>,
22+
last_peek_index: u32,
23+
initial_tip: ChainAnchor,
24+
queued_blocks: BTreeMap<u32, BlockHash>,
25+
queued_filters: VecDeque<u32>,
26+
filters_tip: u32,
27+
wait: Option<Instant>,
28+
state: SyncState,
29+
}
30+
31+
enum SyncState {
32+
SyncChecks,
33+
LoadFilterRange(BlockchainInfo),
34+
ProcessFilters,
35+
QueueBlocks,
36+
WaitForBlocks,
37+
ProcessBlocks,
38+
ApplyUpdate,
39+
Synced
40+
}
41+
42+
impl CompactFilterSync {
43+
pub fn new(wallet: &SpacesWallet) -> Self {
44+
let initial_tip = {
45+
let tip = wallet.local_chain().tip();
46+
ChainAnchor { height: tip.height(), hash: tip.hash() }
47+
};
48+
49+
let mut cbf = Self {
50+
graph: IndexedTxGraph::new(wallet.spk_index().clone()),
51+
chain: wallet.local_chain().clone(),
52+
chain_changeset: BTreeMap::new(),
53+
scripts: HashSet::new(),
54+
last_peek_index: 0,
55+
initial_tip,
56+
queued_blocks: BTreeMap::new(),
57+
queued_filters: Default::default(),
58+
filters_tip: 0,
59+
wait: None,
60+
state: SyncState::SyncChecks,
61+
};
62+
cbf.load_scripts(wallet);
63+
cbf
64+
}
65+
66+
fn load_scripts(&mut self, wallet: &SpacesWallet) {
67+
let lookahead = wallet.spk_index().lookahead();
68+
let mut max_idx = 0;
69+
for keychain in [KeychainKind::External, KeychainKind::Internal] {
70+
let last_revealed = wallet
71+
.spk_index()
72+
.last_revealed_index(keychain)
73+
.unwrap_or(0);
74+
let chain_limit = last_revealed + lookahead;
75+
for idx in 0..=chain_limit {
76+
let script = wallet.peek_address(keychain, idx).script_pubkey();
77+
self.scripts.insert(script);
78+
}
79+
max_idx = max_idx.max(chain_limit);
80+
}
81+
self.last_peek_index = max_idx;
82+
}
83+
84+
/// Expand scripts by an additional fixed window beyond the last peek
85+
fn load_more_scripts(&mut self, wallet: &SpacesWallet) {
86+
let end = self.last_peek_index + 10;
87+
for keychain in [KeychainKind::External, KeychainKind::Internal] {
88+
for idx in self.last_peek_index..=end {
89+
let script = wallet.peek_address(keychain, idx).script_pubkey();
90+
self.scripts.insert(script);
91+
}
92+
}
93+
self.last_peek_index = end;
94+
}
95+
96+
pub fn synced(&self) -> bool {
97+
matches!(self.state, SyncState::Synced)
98+
}
99+
100+
pub fn sync_next(
101+
&mut self,
102+
wallet: &mut SpacesWallet,
103+
source: &BitcoinBlockSource,
104+
) -> anyhow::Result<()> {
105+
if self.wait.is_some_and(|w| w.elapsed() < Duration::from_secs(10)) {
106+
return Ok(());
107+
}
108+
self.wait = None;
109+
110+
match &self.state {
111+
SyncState::SyncChecks => {
112+
let info = source.get_blockchain_info()?;
113+
if info.headers != info.blocks {
114+
info!("Source still syncing, retrying...");
115+
self.wait = Some(Instant::now());
116+
return Ok(());
117+
}
118+
if info.filters != info.filter_headers {
119+
info!("Filters syncing, retrying...");
120+
self.wait = Some(Instant::now());
121+
return Ok(());
122+
}
123+
// if wallet already past filter headers, we're done
124+
if let Some(filter_headers) = info.filter_headers {
125+
if self.initial_tip.height >= filter_headers {
126+
info!("wallet({}): tip {} >= filters {}, cbf done", wallet.name(), self.initial_tip.height, filter_headers);
127+
self.state = SyncState::Synced;
128+
return Ok(());
129+
}
130+
}
131+
self.state = SyncState::LoadFilterRange(info);
132+
}
133+
SyncState::LoadFilterRange(info) => {
134+
let checkpoint = info
135+
.checkpoint
136+
.ok_or_else(|| anyhow!("filter sync: checkpoint missing"))?;
137+
if self.initial_tip.height < checkpoint.height {
138+
return Err(anyhow!(
139+
"Wallet birthday {} < checkpoint {}", self.initial_tip.height, checkpoint.height
140+
));
141+
}
142+
143+
let start = self.initial_tip.height;
144+
let end = info
145+
.prune_height
146+
.ok_or(anyhow!("Prune height missing"))?;
147+
let available_filters = info.filters.ok_or(anyhow!("Filters missing"))?;
148+
if end > available_filters {
149+
return Err(anyhow!("Prune height {} > {} available filters", end, available_filters));
150+
}
151+
152+
if start >= end {
153+
return Ok(());
154+
}
155+
for height in start..=end {
156+
self.queued_filters.push_back(height);
157+
}
158+
self.filters_tip = end;
159+
self.state = SyncState::ProcessFilters;
160+
}
161+
SyncState::ProcessFilters => {
162+
let height = match self.queued_filters.pop_front() {
163+
None => {
164+
self.state = SyncState::QueueBlocks;
165+
return Ok(())
166+
},
167+
Some(f) => f,
168+
};
169+
let idx_filter = source.get_block_filter_by_height(height)?;
170+
let idx_filter = idx_filter
171+
.ok_or_else(|| anyhow!("filter sync: block filter missing {}", height))?;
172+
let filter = BlockFilter::new(&idx_filter.content);
173+
if filter.match_any(&idx_filter.hash, self.scripts.iter().map(|s| s.as_bytes()))? {
174+
self.queued_blocks.insert(height, idx_filter.hash);
175+
self.load_more_scripts(wallet);
176+
info!("wallet({}) processed block filter {} - match found", wallet.name(), height);
177+
} else {
178+
info!("wallet({}) processed block filter {} - no match", wallet.name(), height);
179+
}
180+
}
181+
SyncState::QueueBlocks => {
182+
if !self.queued_blocks.is_empty() {
183+
let heights: Vec<u32> = self.queued_blocks.keys().copied().collect();
184+
info!("wallet({}): queueing {} blocks", wallet.name(), heights.len());
185+
source.queue_blocks(heights)?;
186+
}
187+
self.state = SyncState::WaitForBlocks;
188+
}
189+
SyncState::WaitForBlocks => {
190+
let info = source.get_blockchain_info()?;
191+
let status = info
192+
.block_queue
193+
.as_ref()
194+
.ok_or_else(|| anyhow!("filter sync: block queue missing"))?;
195+
196+
if status.pending > 0 {
197+
info!("wallet({}): waiting for {} pending blocks", wallet.name(), status.pending);
198+
self.wait = Some(Instant::now());
199+
return Ok(());
200+
}
201+
202+
if status.completed < self.queued_blocks.len() as u32 {
203+
return Err(anyhow!(
204+
"incomplete downloads: {} of {}", status.completed, self.queued_blocks.len()
205+
));
206+
}
207+
self.state = SyncState::ProcessBlocks;
208+
}
209+
SyncState::ProcessBlocks => {
210+
let (height, hash) = match self.queued_blocks.pop_first() {
211+
None => {
212+
self.state = SyncState::Synced;
213+
return Ok(());
214+
}
215+
Some(f) => f,
216+
};
217+
info!("wallet({}): processing block {} {}", wallet.name(), height, hash);
218+
let block = source.get_block(&hash)?
219+
.ok_or(anyhow!("block {} {} not found", height, hash))?;
220+
self.chain_changeset.insert(height, Some(hash));
221+
let _ = self.graph.apply_block_relevant(&block, height);
222+
self.state = SyncState::ApplyUpdate;
223+
}
224+
SyncState::ApplyUpdate => {
225+
info!("wallet({}): updating wallet tip to {}", wallet.name(), self.filters_tip);
226+
let filters_anchor = BlockId {
227+
height: self.filters_tip,
228+
hash: source.get_block_hash(self.filters_tip)?,
229+
};
230+
231+
let update = self.get_scan_response();
232+
wallet.apply_update(update)?;
233+
wallet.insert_checkpoint(filters_anchor)?;
234+
info!("wallet({}): compact filter sync portion complete at {}", wallet.name(), self.filters_tip);
235+
self.state = SyncState::Synced;
236+
}
237+
SyncState::Synced => {}
238+
}
239+
Ok(())
240+
}
241+
242+
// based on https://github.com/bitcoindevkit/bdk-kyoto/blob/master/src/lib.rs#L137
243+
fn get_scan_response(&mut self) -> Update {
244+
let changes = std::mem::take(&mut self.chain_changeset);
245+
self.chain
246+
.apply_changeset(&local_chain::ChangeSet::from(changes))
247+
.expect("initialized from genesis");
248+
let tx_update = TxUpdate::from(self.graph.graph().clone());
249+
let graph = std::mem::take(&mut self.graph);
250+
let last_indices = graph.index.last_used_indices();
251+
self.graph = IndexedTxGraph::new(graph.index);
252+
Update {
253+
tx_update,
254+
last_active_indices: last_indices,
255+
chain: Some(self.chain.tip()),
256+
}
257+
}
258+
}

client/src/client.rs

+56-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use std::{error::Error, fmt};
55

66
use anyhow::{anyhow, Result};
77
use bincode::{Decode, Encode};
8-
use serde::{Deserialize, Serialize};
8+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
9+
use serde::de::Error as SerdeError;
910
use spaces_protocol::{
1011
bitcoin::{Amount, Block, BlockHash, OutPoint, Txid},
1112
constants::{ChainAnchor, ROLLOUT_BATCH_SIZE, ROLLOUT_BLOCK_INTERVAL},
@@ -20,16 +21,53 @@ use crate::{
2021
source::BitcoinRpcError,
2122
store::{ChainState, ChainStore, LiveSnapshot, LiveStore, Sha256},
2223
};
24+
use crate::source::BlockQueueResult;
2325

2426
pub trait BlockSource {
2527
fn get_block_hash(&self, height: u32) -> Result<BlockHash, BitcoinRpcError>;
26-
fn get_block(&self, hash: &BlockHash) -> Result<Block, BitcoinRpcError>;
28+
fn get_block(&self, hash: &BlockHash) -> Result<Option<Block>, BitcoinRpcError>;
2729
fn get_median_time(&self) -> Result<u64, BitcoinRpcError>;
2830
fn in_mempool(&self, txid: &Txid, height: u32) -> Result<bool, BitcoinRpcError>;
2931
fn get_block_count(&self) -> Result<u64, BitcoinRpcError>;
3032
fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError>;
33+
fn get_blockchain_info(&self) -> Result<BlockchainInfo, BitcoinRpcError>;
34+
fn get_block_filter_by_height(&self, height: u32) -> Result<Option<BlockFilterRpc>, BitcoinRpcError>;
35+
fn queue_blocks(&self, heights: Vec<u32>) -> Result<(), BitcoinRpcError>;
3136
}
3237

38+
#[derive(Debug, Clone, Serialize, Deserialize)]
39+
pub struct BlockFilterRpc {
40+
pub hash: BlockHash,
41+
pub height: u32,
42+
#[serde(
43+
serialize_with = "serialize_hex",
44+
deserialize_with = "deserialize_hex"
45+
)]
46+
pub content: Vec<u8>,
47+
}
48+
49+
#[derive(Serialize, Deserialize)]
50+
pub struct BlockchainInfo {
51+
pub chain: String,
52+
pub blocks: u32,
53+
pub headers: u32,
54+
#[serde(rename = "bestblockhash")]
55+
pub best_block_hash: BlockHash,
56+
#[serde(rename = "pruneheight", skip_serializing_if = "Option::is_none")]
57+
pub prune_height: Option<u32>,
58+
pub pruned: bool,
59+
// Light sync specific info
60+
#[serde(skip_serializing_if = "Option::is_none")]
61+
pub filters: Option<u32>,
62+
#[serde(rename = "filterheaders", skip_serializing_if = "Option::is_none")]
63+
pub filter_headers: Option<u32>,
64+
#[serde(rename = "blockqueue", skip_serializing_if = "Option::is_none")]
65+
pub block_queue: Option<BlockQueueResult>,
66+
#[serde(skip_serializing_if = "Option::is_none")]
67+
pub checkpoint: Option<ChainAnchor>,
68+
}
69+
70+
3371
#[derive(Debug, Clone)]
3472
pub struct Client {
3573
validator: Validator,
@@ -354,3 +392,19 @@ fn unwrap_bid_value(spaceout: &SpaceOut) -> (Amount, Amount) {
354392
}
355393
panic!("expected a bid covenant")
356394
}
395+
396+
397+
fn serialize_hex<S>(bytes: &Vec<u8>, s: S) -> std::result::Result<S::Ok, S::Error>
398+
where
399+
S: Serializer,
400+
{
401+
s.serialize_str(hex::encode(bytes).as_str())
402+
}
403+
404+
fn deserialize_hex<'de, D>(d: D) -> std::result::Result<Vec<u8>, D::Error>
405+
where
406+
D: Deserializer<'de>,
407+
{
408+
let s = String::deserialize(d)?;
409+
hex::decode(s).map_err(D::Error::custom)
410+
}

client/src/config.rs

+1
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ impl Args {
224224
num_workers: args.jobs as usize,
225225
anchors_path,
226226
synced: false,
227+
cbf: args.bitcoin_rpc_light
227228
})
228229
}
229230

client/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod source;
1818
pub mod store;
1919
pub mod sync;
2020
pub mod wallets;
21+
mod cbf;
2122

2223
fn std_wait<F>(mut predicate: F, wait: Duration)
2324
where

0 commit comments

Comments
 (0)