Skip to content

Retry failed RPC calls on error #707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//! network: bdk::bitcoin::Network::Testnet,
//! wallet_name: "wallet_name".to_string(),
//! sync_params: None,
//! max_tries: 3,
//! };
//! let blockchain = RpcBlockchain::from_config(&config);
//! ```
Expand All @@ -45,12 +46,17 @@ use bitcoincore_rpc::json::{
ListUnspentResultEntry, ScanningDetails,
};
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
use bitcoincore_rpc::jsonrpc::{
self, simple_http::SimpleHttpTransport, Error as JsonRpcError, Request, Response, Transport,
};
use bitcoincore_rpc::Auth as RpcAuth;
use bitcoincore_rpc::{Client, RpcApi};
use log::{debug, info};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU8, Ordering};
use std::thread;
use std::time::Duration;

Expand Down Expand Up @@ -80,6 +86,10 @@ pub struct RpcConfig {
pub wallet_name: String,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
/// Max number of attempts before giving up and returning an error
///
/// Set to `0` preserve the old behavior of erroring immediately
pub max_tries: u8,
}

/// Sync parameters for Bitcoin Core RPC.
Expand Down Expand Up @@ -195,6 +205,68 @@ impl WalletSync for RpcBlockchain {
}
}

struct SimpleHttpWithRetry {
inner: SimpleHttpTransport,
attempts: AtomicU8,
limit: u8,
}

macro_rules! impl_inner {
($self:expr, $method:ident, $req:expr) => {{
while $self.attempts.load(Ordering::Relaxed) <= $self.limit {
match $self.inner.$method($req.clone()) {
Ok(r) => {
$self.attempts.store(0, Ordering::Relaxed);
return Ok(r);
}
Err(JsonRpcError::Transport(e)) => {
match e.downcast_ref::<jsonrpc::simple_http::Error>() {
Some(jsonrpc::simple_http::Error::SocketError(io))
if io.kind() == std::io::ErrorKind::WouldBlock =>
{
let attempt = $self.attempts.fetch_add(1, Ordering::Relaxed);
let delay = std::cmp::min(1000, 100 << attempt as u64);

debug!(
"Got a WouldBlock error at attempt {}, sleeping for {}ms",
attempt, delay
);
std::thread::sleep(std::time::Duration::from_millis(delay));

continue;
}
_ => {}
}

$self.attempts.store(0, Ordering::Relaxed);
return Err(JsonRpcError::Transport(e));
}
Err(e) => {
$self.attempts.store(0, Ordering::Relaxed);
return Err(e);
}
}
}

$self.attempts.store(0, Ordering::Relaxed);
Err(JsonRpcError::Transport("All attempts errored".into()))
}};
}

impl Transport for SimpleHttpWithRetry {
fn send_request(&self, req: Request) -> Result<Response, JsonRpcError> {
impl_inner!(self, send_request, req)
}

fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, JsonRpcError> {
impl_inner!(self, send_batch, reqs)
}

fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt_target(f)
}
}

impl ConfigurableBlockchain for RpcBlockchain {
type Config = RpcConfig;

Expand All @@ -203,7 +275,23 @@ impl ConfigurableBlockchain for RpcBlockchain {
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);

let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let mut builder = SimpleHttpTransport::builder()
.url(&wallet_url)
.map_err(|e| bitcoincore_rpc::Error::JsonRpc(e.into()))?;

let (user, pass) = bitcoincore_rpc::Auth::from(config.auth.clone()).get_user_pass()?;
if let Some(user) = user {
builder = builder.auth(user, pass);
}

let transport = SimpleHttpWithRetry {
inner: builder.build(),
attempts: AtomicU8::new(0),
limit: config.max_tries,
};
let jsonrpc_client = jsonrpc::client::Client::with_transport(transport);

let client = Client::from_jsonrpc(jsonrpc_client);
let rpc_version = client.version()?;

info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
Expand Down Expand Up @@ -816,6 +904,7 @@ fn descriptor_from_script_pubkey(script: &Script) -> String {
/// wallet_name_prefix: Some("prefix-".to_string()),
/// default_skip_blocks: 100_000,
/// sync_params: None,
/// max_tries: 3,
/// };
/// let main_wallet_blockchain = factory.build("main_wallet", Some(200_000))?;
/// # Ok(())
Expand All @@ -835,6 +924,10 @@ pub struct RpcBlockchainFactory {
pub default_skip_blocks: u32,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
/// Max number of attempts before giving up and returning an error
///
/// Set to `0` preserve the old behavior of erroring immediately
pub max_tries: u8,
}

impl BlockchainFactory for RpcBlockchainFactory {
Expand All @@ -855,6 +948,7 @@ impl BlockchainFactory for RpcBlockchainFactory {
checksum
),
sync_params: self.sync_params.clone(),
max_tries: self.max_tries,
})
}
}
Expand Down Expand Up @@ -882,6 +976,7 @@ mod test {
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
sync_params: None,
max_tries: 5,
};
RpcBlockchain::from_config(&config).unwrap()
}
Expand All @@ -899,6 +994,7 @@ mod test {
wallet_name_prefix: Some("prefix-".into()),
default_skip_blocks: 0,
sync_params: None,
max_tries: 3,
};

(test_client, factory)
Expand Down