Skip to content

Commit

Permalink
detect and act on network change
Browse files Browse the repository at this point in the history
When there is a network change, tonic behaves as follows:
- After the keepalive timeout, reconnect automatically.
- Before the keepalive timeout, any grpc call will time out. After the
  timeout, the connection is reestablished.

This commit adds a mechanism to reconnect all grpc clients after one of
the clients detects a network change. Initially it was attempted to only
retry based on a keepalive timeout error, but a network change affects
all grpc clients, so subsequent requests to other grpc endpoints would
still fail with a timeout. Ofcourse those grpc clients can also add a
retry-on-timeout, but since it is known at this point the grpc clients
are temporarily dead, reconnect them immediately. This ensures
subsequent calls to other endpoints won't add additional time by waiting
for a timeout.
  • Loading branch information
JssDWt committed Nov 29, 2024
1 parent 18ab125 commit aebafe7
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 95 deletions.
27 changes: 13 additions & 14 deletions libs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ serde_json = "1.0"
strum = "0.25"
strum_macros = "0.25"
thiserror = "1.0.56"
tokio = { version = "1", features = ["full"] }
tokio = { version = "1.41", features = ["full"] }
tonic = "^0.8"
tonic-build = "^0.8"
uniffi = "0.23.0"
Expand Down
4 changes: 2 additions & 2 deletions libs/sdk-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ flutter_rust_bridge = "=1.82.6"
aes = { workspace = true }
anyhow = { workspace = true }
hex = { workspace = true }
gl-client = { git = "https://github.com/Blockstream/greenlight.git", features = [
gl-client = { path = "../../../greenlight/libs/gl-client", features = [
"permissive",
], rev = "c09c1be59994b35aadfe4747b78bcdc8fffbe45a" }
]}
zbase32 = "0.1.2"
base64 = { workspace = true }
chrono = "0.4"
Expand Down
79 changes: 51 additions & 28 deletions libs/sdk-core/src/breez_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ pub struct BreezServices {
backup_watcher: Arc<BackupWatcher>,
shutdown_sender: watch::Sender<()>,
shutdown_receiver: watch::Receiver<()>,
hibernation_sender: watch::Sender<()>,
hibernation_receiver: watch::Receiver<()>,
reconnect_sender: watch::Sender<()>,
reconnect_receiver: watch::Receiver<()>,
network_change_receiver: watch::Receiver<()>,
}

impl BreezServices {
Expand Down Expand Up @@ -1439,6 +1440,7 @@ impl BreezServices {
async fn start_background_tasks(self: &Arc<BreezServices>) -> SdkResult<()> {
// Detect hibernation
self.detect_hibernation();
self.detect_network_change();

// start the signer
let (shutdown_signer_sender, signer_signer_receiver) = watch::channel(());
Expand Down Expand Up @@ -1491,6 +1493,34 @@ impl BreezServices {
Ok(())
}

fn detect_network_change(self: &Arc<BreezServices>) {
let cloned = Arc::clone(self);
let mut network_change_receiver = cloned.network_change_receiver.clone();
tokio::spawn(async move {
loop {
if network_change_receiver.changed().await.is_err() {
debug!("network change detector stopped");
return;
}

debug!("Network change detected.");
cloned.reconnect().await;
}
});
}

async fn reconnect(self: &Arc<BreezServices>) {
// Reconnect node api before notifying anything else, to
// ensure there are no races reconnecting dependant
// services.
debug!("Reconnecting node api.");
self.node_api.reconnect().await;

// Now notify dependant services.
debug!("Notifying services about reconnect.");
let _ = self.reconnect_sender.send(());
}

fn detect_hibernation(self: &Arc<BreezServices>) {
let cloned = Arc::clone(self);
tokio::spawn(async move {
Expand All @@ -1509,25 +1539,15 @@ impl BreezServices {
.saturating_sub(DETECT_HIBERNATE_SLEEP_DURATION)
.ge(&DETECT_HIBERNATE_MAX_OFFSET)
{
// Reconnect node api before notifying anything else, to
// ensure there are no races reconnecting dependant
// services.
debug!(
"Hibernation detected, time diff {}s, reconnecting node api.",
elapsed.as_secs_f32()
);
cloned.node_api.reconnect().await;

// Now notify dependant services.
debug!("Hibernation detected, notifying services.");
let _ = cloned.hibernation_sender.send(());
debug!("Hibernation detected, time diff {}s", elapsed.as_secs_f32());
cloned.reconnect().await;
}
}
});
}

async fn start_signer(self: &Arc<BreezServices>, mut shutdown_receiver: watch::Receiver<()>) {
let mut hibernation_receiver = self.hibernation_receiver.clone();
let mut reconnect_receiver = self.reconnect_receiver.clone();
let node_api = self.node_api.clone();

tokio::spawn(async move {
Expand All @@ -1543,9 +1563,9 @@ impl BreezServices {
true
}

_ = hibernation_receiver.changed() => {
_ = reconnect_receiver.changed() => {
// NOTE: The node api is reconnected already inside the
// detect_hibernation function, to avoid races.
// reconnect function, to avoid races.
false
}
};
Expand Down Expand Up @@ -1648,7 +1668,7 @@ impl BreezServices {
let cloned = self.clone();
tokio::spawn(async move {
let mut shutdown_receiver = cloned.shutdown_receiver.clone();
let mut reconnect_receiver = cloned.hibernation_receiver.clone();
let mut reconnect_receiver = cloned.reconnect_receiver.clone();
loop {
if shutdown_receiver.has_changed().unwrap_or(true) {
return;
Expand All @@ -1674,7 +1694,7 @@ impl BreezServices {
}

_ = reconnect_receiver.changed() => {
debug!("Reconnect hibernation: track invoices");
debug!("Reconnect: track invoices");
break;
}
};
Expand Down Expand Up @@ -1733,7 +1753,7 @@ impl BreezServices {
let cloned = self.clone();
tokio::spawn(async move {
let mut shutdown_receiver = cloned.shutdown_receiver.clone();
let mut reconnect_receiver = cloned.hibernation_receiver.clone();
let mut reconnect_receiver = cloned.reconnect_receiver.clone();
loop {
if shutdown_receiver.has_changed().unwrap_or(true) {
return;
Expand All @@ -1759,7 +1779,7 @@ impl BreezServices {
}

_ = reconnect_receiver.changed() => {
debug!("Reconnect hibernation: track logs");
debug!("Reconnect: track logs");
break;
}
};
Expand Down Expand Up @@ -2394,7 +2414,8 @@ impl BreezServicesBuilder {
});
}

let (hibernation_sender, hibernation_receiver) = watch::channel(());
let (reconnect_sender, reconnect_receiver) = watch::channel(());
let (network_change_sender, network_change_receiver) = watch::channel(());
// The storage is implemented via sqlite.
let persister = self
.persister
Expand All @@ -2410,6 +2431,7 @@ impl BreezServicesBuilder {
self.seed.clone().unwrap(),
restore_only,
persister.clone(),
network_change_sender.clone(),
)
.await?;
let gl_arc = Arc::new(greenlight);
Expand Down Expand Up @@ -2468,16 +2490,16 @@ impl BreezServicesBuilder {
}
});

// Reconnect breez server on hibernation.
// Reconnect breez server on when requested.
let cloned_breez_server = breez_server.clone();
let mut cloned_hibernation_receiver = hibernation_receiver.clone();
let mut cloned_reconnect_receiver = reconnect_receiver.clone();
tokio::spawn(async move {
loop {
if cloned_hibernation_receiver.changed().await.is_err() {
if cloned_reconnect_receiver.changed().await.is_err() {
return;
}

debug!("Reconnect hibernation: reconnecting breez server");
debug!("Reconnect: reconnecting breez server");
let _ = cloned_breez_server.reconnect().await;
}
});
Expand Down Expand Up @@ -2572,8 +2594,9 @@ impl BreezServicesBuilder {
backup_watcher: Arc::new(backup_watcher),
shutdown_sender,
shutdown_receiver,
hibernation_sender,
hibernation_receiver,
reconnect_sender,
reconnect_receiver,
network_change_receiver,
});

Ok(breez_services)
Expand Down
Loading

0 comments on commit aebafe7

Please sign in to comment.