Skip to content

Commit

Permalink
put retry logic in a macro
Browse files Browse the repository at this point in the history
A macro makes for a little less code duplication. The macro returns a
single awaitable future. So the result of the macro can be used in join!
calls. I was unable to do this without consuming the grpc client object.
Therefore the grpc client object has to be cloned if it's used later
again.
  • Loading branch information
JssDWt committed Nov 30, 2024
1 parent e60aa4c commit 41f8860
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 402 deletions.
48 changes: 21 additions & 27 deletions libs/sdk-common/src/breez_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::grpc::support_client::SupportClient;
use crate::grpc::swapper_client::SwapperClient;
use crate::grpc::{ChainApiServersRequest, PingRequest};
use crate::prelude::{ServiceConnectivityError, ServiceConnectivityErrorKind};
use crate::tonic_wrap::with_connection_fallback;
use crate::with_connection_retry;

pub static PRODUCTION_BREEZSERVER_URL: &str = "https://bs1.breez.technology:443";
pub static STAGING_BREEZSERVER_URL: &str = "https://bs1-st.breez.technology:443";
Expand Down Expand Up @@ -113,20 +113,17 @@ impl BreezServer {

pub async fn fetch_mempoolspace_urls(&self) -> Result<Vec<String>, ServiceConnectivityError> {
let mut client = self.get_information_client().await;
let mut client_clone = client.clone();
let chain_api_servers =
with_connection_fallback(client.chain_api_servers(ChainApiServersRequest {}), || {
client_clone.chain_api_servers(ChainApiServersRequest {})
})
.await
.map_err(|e| {
ServiceConnectivityError::new(
ServiceConnectivityErrorKind::Other,
format!("(Breez: {e:?}) Failed to fetch ChainApiServers"),
)
})?
.into_inner()
.servers;
with_connection_retry!(client.chain_api_servers(ChainApiServersRequest {}))
.await
.map_err(|e| {
ServiceConnectivityError::new(
ServiceConnectivityErrorKind::Other,
format!("(Breez: {e:?}) Failed to fetch ChainApiServers"),
)
})?
.into_inner()
.servers;
trace!("Received chain_api_servers: {chain_api_servers:?}");

let mempoolspace_urls = chain_api_servers
Expand All @@ -141,21 +138,18 @@ impl BreezServer {

pub async fn fetch_boltz_swapper_urls(&self) -> Result<Vec<String>, ServiceConnectivityError> {
let mut client = self.get_information_client().await;
let mut client_clone = client.clone();

let chain_api_servers =
with_connection_fallback(client.chain_api_servers(ChainApiServersRequest {}), || {
client_clone.chain_api_servers(ChainApiServersRequest {})
})
.await
.map_err(|e| {
ServiceConnectivityError::new(
ServiceConnectivityErrorKind::Other,
format!("(Breez: {e:?}) Failed to fetch ChainApiServers"),
)
})?
.into_inner()
.servers;
with_connection_retry!(client.chain_api_servers(ChainApiServersRequest {}))
.await
.map_err(|e| {
ServiceConnectivityError::new(
ServiceConnectivityErrorKind::Other,
format!("(Breez: {e:?}) Failed to fetch ChainApiServers"),
)
})?
.into_inner()
.servers;
trace!("Received chain_api_servers: {chain_api_servers:?}");

let boltz_swapper_urls = chain_api_servers
Expand Down
11 changes: 4 additions & 7 deletions libs/sdk-common/src/fiat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};

use crate::grpc::RatesRequest;
use crate::prelude::BreezServer;
use crate::tonic_wrap::with_connection_fallback;
use crate::with_connection_retry;

/// Trait covering fiat-related functionality
#[tonic::async_trait]
Expand Down Expand Up @@ -97,14 +97,11 @@ impl FiatAPI for BreezServer {

async fn fetch_fiat_rates(&self) -> Result<Vec<Rate>> {
let mut client = self.get_information_client().await;
let mut client_clone = client.clone();

let request = RatesRequest {};
let response = with_connection_fallback(client.rates(request.clone()), || {
client_clone.rates(request)
})
.await
.map_err(|e| anyhow!("Fetch rates request failed: {e}"))?;
let response = with_connection_retry!(client.rates(request.clone()))
.await
.map_err(|e| anyhow!("Fetch rates request failed: {e}"))?;

let mut rates = response.into_inner().rates;
rates.sort_by(|a, b| a.coin.cmp(&b.coin));
Expand Down
105 changes: 52 additions & 53 deletions libs/sdk-common/src/tonic_wrap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::{error::Error, fmt::Display, future::Future};

use log::debug;
use std::{error::Error, fmt::Display};

pub struct Status(pub tonic::Status);

Expand All @@ -20,13 +18,6 @@ impl Display for Status {

pub struct TransportError(pub tonic::transport::Error);

const BROKEN_CONNECTION_STRINGS: [&str; 4] = [
"http2 error: keep-alive timed out",
"connection error: address not available",
"connection error: timed out",
"connection error: unexpected end of file",
];

impl Display for TransportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand All @@ -38,54 +29,62 @@ impl Display for TransportError {
}
}

pub async fn with_connection_fallback<T, M, F>(
main: M,
fallback: impl FnOnce() -> F,
) -> Result<T, tonic::Status>
where
M: Future<Output = Result<T, tonic::Status>>,
F: Future<Output = Result<T, tonic::Status>>,
T: std::fmt::Debug,
{
let res = main.await;
let status = match res {
Ok(t) => return Ok(t),
Err(s) => s,
};
/// Executes the given grpc call function. If an error is returned that
/// indicates the connection broke, the call is tried again.
#[macro_export]
macro_rules! with_connection_retry {
($f:expr) => {{
use log::debug;
use std::error::Error;
const BROKEN_CONNECTION_STRINGS: [&str; 4] = [
"http2 error: keep-alive timed out",
"connection error: address not available",
"connection error: timed out",
"connection error: unexpected end of file",
];

debug!(
"with_connection_fallback: initial call failed with: {:?}",
status
);
let source = match status.source() {
Some(source) => source,
None => return Err(status),
};
async {
let res = $f.await;
let status = match res {
Ok(t) => return Ok(t),
Err(s) => s,
};

let error: &tonic::transport::Error = match source.downcast_ref() {
Some(error) => error,
None => return Err(status),
};
debug!(
"with_connection_fallback: initial call failed with: {:?}",
status
);
let source = match status.source() {
Some(source) => source,
None => return Err(status),
};

if error.to_string() != "transport error" {
return Err(status);
}
let error: &tonic::transport::Error = match source.downcast_ref() {
Some(error) => error,
None => return Err(status),
};

let source = match error.source() {
Some(source) => source,
None => return Err(status),
};
if error.to_string() != "transport error" {
return Err(status);
}

// It's a bit of a guess which errors can occur here. hyper Io errors start
// with 'connection error'. These are some of the errors seen before.
if !BROKEN_CONNECTION_STRINGS.contains(&source.to_string().as_str()) {
debug!("transport error string is: '{}'", source.to_string());
return Err(status);
}
let source = match error.source() {
Some(source) => source,
None => return Err(status),
};

// It's a bit of a guess which errors can occur here. hyper Io errors start
// with 'connection error'. These are some of the errors seen before.
if !BROKEN_CONNECTION_STRINGS.contains(&source.to_string().as_str()) {
debug!("transport error string is: '{}'", source.to_string());
return Err(status);
}

debug!(
"with_connection_fallback: initial call failed due to broken connection. Retrying fallback."
);
debug!(
"with_connection_fallback: initial call failed due to broken connection. Retrying fallback."
);

fallback().await
$f.await
}
}};
}
20 changes: 7 additions & 13 deletions libs/sdk-core/src/greenlight/backup_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{

use super::node_api::Greenlight;
use gl_client::pb::cln;
use sdk_common::tonic_wrap::with_connection_fallback;
use sdk_common::with_connection_retry;
use std::sync::Arc;

const BREEZ_SDK_DATASTORE_PATH: [&str; 2] = ["breez-sdk", "backup"];
Expand All @@ -25,15 +25,12 @@ impl BackupTransport for GLBackupTransport {
async fn pull(&self) -> SdkResult<Option<BackupState>> {
let key = self.gl_key();
let mut client = self.inner.get_node_client().await?;
let mut client_clone = client.clone();

let req = cln::ListdatastoreRequest { key };
let response: cln::ListdatastoreResponse =
with_connection_fallback(client.list_datastore(req.clone()), || {
client_clone.list_datastore(req)
})
.await?
.into_inner();
with_connection_retry!(client.list_datastore(req.clone()))
.await?
.into_inner();
let store = response.datastore;
match store.len() {
0 => Ok(None),
Expand All @@ -51,7 +48,6 @@ impl BackupTransport for GLBackupTransport {
let key = self.gl_key();
info!("set_value key = {:?} data length={:?}", key, hex.len());
let mut client = self.inner.get_node_client().await?;
let mut client_clone = client.clone();

let mut mode = cln::datastore_request::DatastoreMode::MustCreate;
if version.is_some() {
Expand All @@ -64,11 +60,9 @@ impl BackupTransport for GLBackupTransport {
generation: version,
mode: Some(mode.into()),
};
let response = with_connection_fallback(client.datastore(req.clone()), || {
client_clone.datastore(req)
})
.await?
.into_inner();
let response = with_connection_retry!(client.datastore(req.clone()))
.await?
.into_inner();
Ok(response.generation.unwrap())
}
}
Loading

0 comments on commit 41f8860

Please sign in to comment.