diff --git a/libs/sdk-core/src/breez_services.rs b/libs/sdk-core/src/breez_services.rs index a4b5ffdbe..167591ba2 100644 --- a/libs/sdk-core/src/breez_services.rs +++ b/libs/sdk-core/src/breez_services.rs @@ -170,6 +170,7 @@ pub struct BreezServices { shutdown_sender: watch::Sender<()>, shutdown_receiver: watch::Receiver<()>, hibernation_sender: watch::Sender<()>, + hibernation_receiver: watch::Receiver<()>, } impl BreezServices { @@ -1465,7 +1466,7 @@ impl BreezServices { self.detect_hibernation(); // start the signer - let (shutdown_signer_sender, signer_signer_receiver) = mpsc::channel(1); + let (shutdown_signer_sender, signer_signer_receiver) = watch::channel(()); self.start_signer(signer_signer_receiver).await; self.start_node_keep_alive(self.shutdown_receiver.clone()) .await; @@ -1507,7 +1508,7 @@ impl BreezServices { tokio::spawn(async move { // start the backup watcher _ = shutdown_receiver.changed().await; - _ = shutdown_signer_sender.send(()).await; + _ = shutdown_signer_sender.send(()); debug!("Received the signal to exit event polling loop"); }); @@ -1541,11 +1542,24 @@ impl BreezServices { }); } - async fn start_signer(self: &Arc, shutdown_receiver: mpsc::Receiver<()>) { - let signer_api = self.clone(); + async fn start_signer(self: &Arc, shutdown_receiver: watch::Receiver<()>) { + let node_api = self.node_api.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(2)).await; - signer_api.node_api.start_signer(shutdown_receiver).await; + node_api.start_signer(shutdown_receiver).await; + }); + + let node_api = self.node_api.clone(); + let mut hibernation_receiver = self.hibernation_receiver.clone(); + tokio::spawn(async move { + loop { + if hibernation_receiver.changed().await.is_err() { + return; + } + + debug!("hibernation: reconnecting node api."); + node_api.reconnect().await; + } }); } @@ -2364,12 +2378,12 @@ impl BreezServicesBuilder { self.seed.clone().unwrap(), restore_only, persister.clone(), - hibernation_receiver.clone(), ) .await?; - node_api = Some(greenlight.clone()); + let gl_arc = Arc::new(greenlight); + node_api = Some(gl_arc.clone()); if backup_transport.is_none() { - backup_transport = Some(Arc::new(GLBackupTransport { inner: greenlight })); + backup_transport = Some(Arc::new(GLBackupTransport { inner: gl_arc })); } } @@ -2513,6 +2527,7 @@ impl BreezServicesBuilder { shutdown_sender, shutdown_receiver, hibernation_sender, + hibernation_receiver, }); Ok(breez_services) diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index f415b17bb..90986931f 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -67,10 +67,13 @@ pub(crate) struct Greenlight { sdk_config: Config, signer: Mutex>, device: Device, + seed: Vec, gl_client: Mutex>, node_client: Mutex>, persister: Arc, inprogress_payments: AtomicU16, + internal_shutdown: Mutex>>, + external_shutdown: Mutex>>, } #[derive(Serialize, Deserialize)] @@ -99,7 +102,7 @@ impl Greenlight { seed: Vec, restore_only: Option, persister: Arc, - ) -> NodeResult> { + ) -> NodeResult { // Derive the encryption key from the seed let temp_signer = Arc::new(Signer::new( seed.clone(), @@ -177,20 +180,52 @@ impl Greenlight { seed: Vec, device: Device, persister: Arc, - ) -> NodeResult> { + ) -> NodeResult { let greenlight_network = sdk_config.network.into(); let signer = Signer::new(seed.clone(), greenlight_network, device.clone())?; - - let greenlight = Arc::new(Greenlight { + let (tx, _) = mpsc::channel(1); + Ok(Greenlight { sdk_config, signer: Mutex::new(Arc::new(signer)), - device: device.clone(), + device, + seed, gl_client: Mutex::new(None), node_client: Mutex::new(None), persister, inprogress_payments: AtomicU16::new(0), + internal_shutdown: Mutex::new(Arc::new(tx)), + external_shutdown: Mutex::new(None), + }) + } + + async fn do_start_signer(&self) { + // Create a new shutdown receiver that can be invoked by either the + // internal sender for reconnects, or the external sender for shutdown + // requests. + let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); + let shutdown_sender = Arc::new(shutdown_sender); + *self.internal_shutdown.lock().await = Arc::clone(&shutdown_sender); + if let Some(mut external_shutdown) = (*self.external_shutdown.lock().await).clone() { + // Listen to the external shutdown to ensure the signer is stopped. + tokio::spawn(async move { + if external_shutdown.changed().await.is_ok() { + debug!("signer shutdown requested"); + let _ = shutdown_sender.send(()).await; + } + }); + } + + let signer = self.get_signer().await; + + // Run the signer in the background. The exit receiver will be invoked + // when the signer exits. + tokio::spawn(async move { + debug!("starting signer"); + match signer.run_forever(shutdown_receiver).await { + Ok(_) => info!("signer exited gracefully"), + Err(e) => error!("signer exited with error: {e}"), + } }); - Ok(greenlight) } async fn get_signer(&self) -> Arc { @@ -999,6 +1034,42 @@ struct SyncState { #[tonic::async_trait] impl NodeAPI for Greenlight { + async fn reconnect(&self) { + debug!("Reconnect hibernation: request received"); + + // Force refresh existing grpc clients + *self.gl_client.lock().await = None; + *self.node_client.lock().await = None; + + // Shutdown the existing signer + debug!("Reconnect: shutting down old signer"); + let _ = self.internal_shutdown.lock().await.send(()).await; + + // Create a new signer + debug!("Reconnect hibernation: creating new signer"); + let new_signer = match Signer::new( + self.seed.clone(), + self.sdk_config.network.into(), + self.device.clone(), + ) { + Ok(new_signer) => new_signer, + Err(e) => { + error!( + "Reconnect hibernation: failed to create new signer after reconnect request: {:?}", + e + ); + return; + } + }; + *self.signer.lock().await = Arc::new(new_signer); + + // Start the new signer if the previous one was started. + if self.external_shutdown.lock().await.is_some() { + debug!("Reconnect hibernation: starting new signer"); + self.do_start_signer().await; + } + } + async fn node_credentials(&self) -> NodeResult> { Ok(Self::get_node_credentials( self.sdk_config.network, @@ -1530,11 +1601,9 @@ impl NodeAPI for Greenlight { } /// Starts the signer that listens in a loop until the shutdown signal is received - async fn start_signer(&self, shutdown: mpsc::Receiver<()>) { - match self.get_signer().await.run_forever(shutdown).await { - Ok(_) => info!("signer exited gracefully"), - Err(e) => error!("signer exited with error: {e}"), - } + async fn start_signer(&self, shutdown: watch::Receiver<()>) { + *self.external_shutdown.lock().await = Some(shutdown); + self.do_start_signer().await; } async fn start_keep_alive(&self, mut shutdown: watch::Receiver<()>) { diff --git a/libs/sdk-core/src/node_api.rs b/libs/sdk-core/src/node_api.rs index 05dba8306..0e7063517 100644 --- a/libs/sdk-core/src/node_api.rs +++ b/libs/sdk-core/src/node_api.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use anyhow::Result; use serde_json::Value; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use tokio_stream::Stream; use tonic::Streaming; @@ -114,6 +114,7 @@ pub struct FetchBolt11Result { /// Trait covering functions affecting the LN node #[tonic::async_trait] pub trait NodeAPI: Send + Sync { + async fn reconnect(&self); async fn node_credentials(&self) -> NodeResult>; async fn configure_node(&self, close_to_address: Option) -> NodeResult<()>; async fn create_invoice(&self, request: CreateInvoiceRequest) -> NodeResult; @@ -168,7 +169,7 @@ pub trait NodeAPI: Send + Sync { &self, req: PrepareRedeemOnchainFundsRequest, ) -> NodeResult; - async fn start_signer(&self, shutdown: mpsc::Receiver<()>); + async fn start_signer(&self, shutdown: watch::Receiver<()>); async fn start_keep_alive(&self, shutdown: watch::Receiver<()>); async fn connect_peer(&self, node_id: String, addr: String) -> NodeResult<()>; async fn sign_invoice(&self, invoice: RawBolt11Invoice) -> NodeResult; diff --git a/libs/sdk-core/src/test_utils.rs b/libs/sdk-core/src/test_utils.rs index d237ba7ad..a02ae14dd 100644 --- a/libs/sdk-core/src/test_utils.rs +++ b/libs/sdk-core/src/test_utils.rs @@ -330,6 +330,7 @@ pub struct MockNodeAPI { #[tonic::async_trait] impl NodeAPI for MockNodeAPI { + async fn reconnect(&self) {} async fn node_credentials(&self) -> NodeResult> { Err(NodeError::Generic("Not implemented".to_string())) } @@ -423,7 +424,7 @@ impl NodeAPI for MockNodeAPI { Err(NodeError::Generic("Not implemented".to_string())) } - async fn start_signer(&self, _shutdown: mpsc::Receiver<()>) {} + async fn start_signer(&self, _shutdown: watch::Receiver<()>) {} async fn start_keep_alive(&self, _shutdown: watch::Receiver<()>) {}