Skip to content

Commit

Permalink
Retry failed RPC calls on error
Browse files Browse the repository at this point in the history
Whenever the transport errors with "WouldBlock" we wait a bit and retry
the call.

Related to #650, should at least fix the errors with RPC
  • Loading branch information
afilini authored and evanlinjin committed Sep 1, 2022
1 parent ff398ec commit 09e5e61
Showing 1 changed file with 97 additions and 1 deletion.
98 changes: 97 additions & 1 deletion src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
//! wallet_name: "wallet_name".to_string(),
//! sync_params: None,
//! import_params: None,
//! max_tries: 3,
//! };
//! let blockchain = RpcBlockchain::from_config(&config);
//! ```
Expand All @@ -46,14 +47,19 @@ use bitcoincore_rpc::json::{
ListUnspentResultEntry, ScanningDetails,
};
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
use bitcoincore_rpc::jsonrpc::{
self, simple_http::SimpleHttpTransport, Error as JsonRpcError, Request, Response, Transport,
};
use bitcoincore_rpc::Auth as RpcAuth;
use bitcoincore_rpc::{Client, RpcApi};
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU8, Ordering};
use std::thread;
use std::time::Duration;

Expand Down Expand Up @@ -97,6 +103,10 @@ pub struct RpcConfig {
/// [`RpcBlockchain`] for every sync to reflect what was imported. The updated params can be
/// obtained via [`RpcBlockchain::import_parameters`].
pub import_params: Option<RpcImportParams>,
/// Max number of attempts before giving up and returning an error
///
/// Set to `0` preserve the old behavior of erroring immediately
pub max_tries: u8,
}

/// Sync parameters for Bitcoin Core RPC.
Expand Down Expand Up @@ -244,6 +254,68 @@ impl WalletSync for RpcBlockchain {
}
}

struct SimpleHttpWithRetry {
inner: SimpleHttpTransport,
attempts: AtomicU8,
limit: u8,
}

macro_rules! impl_inner {
($self:expr, $method:ident, $req:expr) => {{
while $self.attempts.load(Ordering::Relaxed) <= $self.limit {
match $self.inner.$method($req.clone()) {
Ok(r) => {
$self.attempts.store(0, Ordering::Relaxed);
return Ok(r);
}
Err(JsonRpcError::Transport(e)) => {
match e.downcast_ref::<jsonrpc::simple_http::Error>() {
Some(jsonrpc::simple_http::Error::SocketError(io))
if io.kind() == std::io::ErrorKind::WouldBlock =>
{
let attempt = $self.attempts.fetch_add(1, Ordering::Relaxed);
let delay = std::cmp::min(1000, 100 << attempt as u64);

debug!(
"Got a WouldBlock error at attempt {}, sleeping for {}ms",
attempt, delay
);
std::thread::sleep(std::time::Duration::from_millis(delay));

continue;
}
_ => {}
}

$self.attempts.store(0, Ordering::Relaxed);
return Err(JsonRpcError::Transport(e));
}
Err(e) => {
$self.attempts.store(0, Ordering::Relaxed);
return Err(e);
}
}
}

$self.attempts.store(0, Ordering::Relaxed);
Err(JsonRpcError::Transport("All attempts errored".into()))
}};
}

impl Transport for SimpleHttpWithRetry {
fn send_request(&self, req: Request) -> Result<Response, JsonRpcError> {
impl_inner!(self, send_request, req)
}

fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, JsonRpcError> {
impl_inner!(self, send_batch, reqs)
}

fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt_target(f)
}
}

impl ConfigurableBlockchain for RpcBlockchain {
type Config = RpcConfig;

Expand All @@ -252,7 +324,23 @@ impl ConfigurableBlockchain for RpcBlockchain {
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);

let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let mut builder = SimpleHttpTransport::builder()
.url(&wallet_url)
.map_err(|e| bitcoincore_rpc::Error::JsonRpc(e.into()))?;

let (user, pass) = bitcoincore_rpc::Auth::from(config.auth.clone()).get_user_pass()?;
if let Some(user) = user {
builder = builder.auth(user, pass);
}

let transport = SimpleHttpWithRetry {
inner: builder.build(),
attempts: AtomicU8::new(0),
limit: config.max_tries,
};
let jsonrpc_client = jsonrpc::client::Client::with_transport(transport);

let client = Client::from_jsonrpc(jsonrpc_client);
let rpc_version = client.version()?;

info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
Expand Down Expand Up @@ -933,6 +1021,7 @@ fn descriptor_from_script_pubkey(script: &Script) -> String {
/// wallet_name_prefix: Some("prefix-".to_string()),
/// default_skip_blocks: 100_000,
/// sync_params: None,
/// max_tries: 3,
/// };
/// let main_wallet_blockchain = factory.build("main_wallet", Some(200_000))?;
/// # Ok(())
Expand All @@ -952,6 +1041,10 @@ pub struct RpcBlockchainFactory {
pub default_skip_blocks: u32,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
/// Max number of attempts before giving up and returning an error
///
/// Set to `0` preserve the old behavior of erroring immediately
pub max_tries: u8,
}

impl BlockchainFactory for RpcBlockchainFactory {
Expand All @@ -974,6 +1067,7 @@ impl BlockchainFactory for RpcBlockchainFactory {
sync_params: self.sync_params.clone(),
// TODO @evanlinjin: How can be set this individually for each build?
import_params: Default::default(),
max_tries: self.max_tries,
})
}
}
Expand Down Expand Up @@ -1002,6 +1096,7 @@ mod test {
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
sync_params: None,
import_params: None,
max_tries: 5,
};
RpcBlockchain::from_config(&config).unwrap()
}
Expand All @@ -1019,6 +1114,7 @@ mod test {
wallet_name_prefix: Some("prefix-".into()),
default_skip_blocks: 0,
sync_params: None,
max_tries: 3,
};

(test_client, factory)
Expand Down

0 comments on commit 09e5e61

Please sign in to comment.