Skip to content

Commit

Permalink
Retry failed RPC calls on error
Browse files Browse the repository at this point in the history
Whenever the transport errors with "WouldBlock" we wait a bit and retry
the call.

Related to bitcoindevkit#650, should at least fix the errors with RPC
  • Loading branch information
afilini committed Aug 9, 2022
1 parent 9f9ffd0 commit 9aa08ed
Showing 1 changed file with 95 additions and 1 deletion.
96 changes: 95 additions & 1 deletion src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ use bitcoincore_rpc::json::{
ListUnspentResultEntry, ScanningDetails,
};
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
use bitcoincore_rpc::jsonrpc::{
self, simple_http::SimpleHttpTransport, Error as JsonRpcError, Request, Response, Transport,
};
use bitcoincore_rpc::Auth as RpcAuth;
use bitcoincore_rpc::{Client, RpcApi};
use log::{debug, info};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU8, Ordering};
use std::thread;
use std::time::Duration;

Expand Down Expand Up @@ -80,6 +85,10 @@ pub struct RpcConfig {
pub wallet_name: String,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
/// Max number of attempts before giving up and returning an error
///
/// Set to `0` preserve the old behavior of erroring immediately
pub max_tries: u8,
}

/// Sync parameters for Bitcoin Core RPC.
Expand Down Expand Up @@ -195,6 +204,68 @@ impl WalletSync for RpcBlockchain {
}
}

struct SimpleHttpWithRetry {
inner: SimpleHttpTransport,
attempts: AtomicU8,
limit: u8,
}

macro_rules! impl_inner {
($self:expr, $method:ident, $req:expr) => {{
while $self.attempts.load(Ordering::Relaxed) <= $self.limit {
match $self.inner.$method($req.clone()) {
Ok(r) => {
$self.attempts.store(0, Ordering::Relaxed);
return Ok(r);
}
Err(JsonRpcError::Transport(e)) => {
match e.downcast_ref::<jsonrpc::simple_http::Error>() {
Some(jsonrpc::simple_http::Error::SocketError(io))
if io.kind() == std::io::ErrorKind::WouldBlock =>
{
let attempt = $self.attempts.fetch_add(1, Ordering::Relaxed);
let delay = std::cmp::min(1000, 100 << attempt as u64);

debug!(
"Got a WouldBlock error at attempt {}, sleeping for {}ms",
attempt, delay
);
std::thread::sleep(std::time::Duration::from_millis(delay));

continue;
}
_ => {}
}

$self.attempts.store(0, Ordering::Relaxed);
return Err(JsonRpcError::Transport(e));
}
Err(e) => {
$self.attempts.store(0, Ordering::Relaxed);
return Err(e);
}
}
}

$self.attempts.store(0, Ordering::Relaxed);
Err(JsonRpcError::Transport("All attempts errored".into()))
}};
}

impl Transport for SimpleHttpWithRetry {
fn send_request(&self, req: Request) -> Result<Response, JsonRpcError> {
impl_inner!(self, send_request, req)
}

fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, JsonRpcError> {
impl_inner!(self, send_batch, reqs)
}

fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt_target(f)
}
}

impl ConfigurableBlockchain for RpcBlockchain {
type Config = RpcConfig;

Expand All @@ -203,7 +274,23 @@ impl ConfigurableBlockchain for RpcBlockchain {
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);

let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let mut builder = SimpleHttpTransport::builder()
.url(&wallet_url)
.map_err(|e| bitcoincore_rpc::Error::JsonRpc(e.into()))?;

let (user, pass) = bitcoincore_rpc::Auth::from(config.auth.clone()).get_user_pass()?;
if let Some(user) = user {
builder = builder.auth(user, pass);
}

let transport = SimpleHttpWithRetry {
inner: builder.build(),
attempts: AtomicU8::new(0),
limit: config.max_tries,
};
let jsonrpc_client = jsonrpc::client::Client::with_transport(transport);

let client = Client::from_jsonrpc(jsonrpc_client);
let rpc_version = client.version()?;

info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
Expand Down Expand Up @@ -835,6 +922,10 @@ pub struct RpcBlockchainFactory {
pub default_skip_blocks: u32,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
/// Max number of attempts before giving up and returning an error
///
/// Set to `0` preserve the old behavior of erroring immediately
pub max_tries: u8,
}

impl BlockchainFactory for RpcBlockchainFactory {
Expand All @@ -855,6 +946,7 @@ impl BlockchainFactory for RpcBlockchainFactory {
checksum
),
sync_params: self.sync_params.clone(),
max_tries: self.max_tries,
})
}
}
Expand Down Expand Up @@ -882,6 +974,7 @@ mod test {
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
sync_params: None,
max_tries: 5,
};
RpcBlockchain::from_config(&config).unwrap()
}
Expand All @@ -899,6 +992,7 @@ mod test {
wallet_name_prefix: Some("prefix-".into()),
default_skip_blocks: 0,
sync_params: None,
max_tries: 3,
};

(test_client, factory)
Expand Down

0 comments on commit 9aa08ed

Please sign in to comment.