Skip to content

Commit

Permalink
reconnect node api on hibernation
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Oct 29, 2024
1 parent 9abd28b commit 23c95fe
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 22 deletions.
31 changes: 23 additions & 8 deletions libs/sdk-core/src/breez_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub struct BreezServices {
shutdown_sender: watch::Sender<()>,
shutdown_receiver: watch::Receiver<()>,
hibernation_sender: watch::Sender<()>,
hibernation_receiver: watch::Receiver<()>,
}

impl BreezServices {
Expand Down Expand Up @@ -1465,7 +1466,7 @@ impl BreezServices {
self.detect_hibernation();

// start the signer
let (shutdown_signer_sender, signer_signer_receiver) = mpsc::channel(1);
let (shutdown_signer_sender, signer_signer_receiver) = watch::channel(());
self.start_signer(signer_signer_receiver).await;
self.start_node_keep_alive(self.shutdown_receiver.clone())
.await;
Expand Down Expand Up @@ -1507,7 +1508,7 @@ impl BreezServices {
tokio::spawn(async move {
// start the backup watcher
_ = shutdown_receiver.changed().await;
_ = shutdown_signer_sender.send(()).await;
_ = shutdown_signer_sender.send(());
debug!("Received the signal to exit event polling loop");
});

Expand Down Expand Up @@ -1541,11 +1542,24 @@ impl BreezServices {
});
}

async fn start_signer(self: &Arc<BreezServices>, shutdown_receiver: mpsc::Receiver<()>) {
let signer_api = self.clone();
async fn start_signer(self: &Arc<BreezServices>, shutdown_receiver: watch::Receiver<()>) {
let node_api = self.node_api.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
signer_api.node_api.start_signer(shutdown_receiver).await;
node_api.start_signer(shutdown_receiver).await;
});

let node_api = self.node_api.clone();
let mut hibernation_receiver = self.hibernation_receiver.clone();
tokio::spawn(async move {
loop {
if hibernation_receiver.changed().await.is_err() {
return;
}

debug!("hibernation: reconnecting node api.");
node_api.reconnect().await;
}
});
}

Expand Down Expand Up @@ -2364,12 +2378,12 @@ impl BreezServicesBuilder {
self.seed.clone().unwrap(),
restore_only,
persister.clone(),
hibernation_receiver.clone(),
)
.await?;
node_api = Some(greenlight.clone());
let gl_arc = Arc::new(greenlight);
node_api = Some(gl_arc.clone());
if backup_transport.is_none() {
backup_transport = Some(Arc::new(GLBackupTransport { inner: greenlight }));
backup_transport = Some(Arc::new(GLBackupTransport { inner: gl_arc }));
}
}

Expand Down Expand Up @@ -2513,6 +2527,7 @@ impl BreezServicesBuilder {
shutdown_sender,
shutdown_receiver,
hibernation_sender,
hibernation_receiver,
});

Ok(breez_services)
Expand Down
91 changes: 80 additions & 11 deletions libs/sdk-core/src/greenlight/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ pub(crate) struct Greenlight {
sdk_config: Config,
signer: Mutex<Arc<Signer>>,
device: Device,
seed: Vec<u8>,
gl_client: Mutex<Option<node::Client>>,
node_client: Mutex<Option<ClnClient>>,
persister: Arc<SqliteStorage>,
inprogress_payments: AtomicU16,
internal_shutdown: Mutex<Arc<mpsc::Sender<()>>>,
external_shutdown: Mutex<Option<watch::Receiver<()>>>,
}

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -99,7 +102,7 @@ impl Greenlight {
seed: Vec<u8>,
restore_only: Option<bool>,
persister: Arc<SqliteStorage>,
) -> NodeResult<Arc<Self>> {
) -> NodeResult<Self> {
// Derive the encryption key from the seed
let temp_signer = Arc::new(Signer::new(
seed.clone(),
Expand Down Expand Up @@ -177,20 +180,52 @@ impl Greenlight {
seed: Vec<u8>,
device: Device,
persister: Arc<SqliteStorage>,
) -> NodeResult<Arc<Self>> {
) -> NodeResult<Self> {
let greenlight_network = sdk_config.network.into();
let signer = Signer::new(seed.clone(), greenlight_network, device.clone())?;

let greenlight = Arc::new(Greenlight {
let (tx, _) = mpsc::channel(1);
Ok(Greenlight {
sdk_config,
signer: Mutex::new(Arc::new(signer)),
device: device.clone(),
device,
seed,
gl_client: Mutex::new(None),
node_client: Mutex::new(None),
persister,
inprogress_payments: AtomicU16::new(0),
internal_shutdown: Mutex::new(Arc::new(tx)),
external_shutdown: Mutex::new(None),
})
}

async fn do_start_signer(&self) {
// Create a new shutdown receiver that can be invoked by either the
// internal sender for reconnects, or the external sender for shutdown
// requests.
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
let shutdown_sender = Arc::new(shutdown_sender);
*self.internal_shutdown.lock().await = Arc::clone(&shutdown_sender);
if let Some(mut external_shutdown) = (*self.external_shutdown.lock().await).clone() {
// Listen to the external shutdown to ensure the signer is stopped.
tokio::spawn(async move {
if external_shutdown.changed().await.is_ok() {
debug!("signer shutdown requested");
let _ = shutdown_sender.send(()).await;
}
});
}

let signer = self.get_signer().await;

// Run the signer in the background. The exit receiver will be invoked
// when the signer exits.
tokio::spawn(async move {
debug!("starting signer");
match signer.run_forever(shutdown_receiver).await {
Ok(_) => info!("signer exited gracefully"),
Err(e) => error!("signer exited with error: {e}"),
}
});
Ok(greenlight)
}

async fn get_signer(&self) -> Arc<Signer> {
Expand Down Expand Up @@ -999,6 +1034,42 @@ struct SyncState {

#[tonic::async_trait]
impl NodeAPI for Greenlight {
async fn reconnect(&self) {
debug!("Reconnect hibernation: request received");

// Force refresh existing grpc clients
*self.gl_client.lock().await = None;
*self.node_client.lock().await = None;

// Shutdown the existing signer
debug!("Reconnect: shutting down old signer");
let _ = self.internal_shutdown.lock().await.send(()).await;

// Create a new signer
debug!("Reconnect hibernation: creating new signer");
let new_signer = match Signer::new(
self.seed.clone(),
self.sdk_config.network.into(),
self.device.clone(),
) {
Ok(new_signer) => new_signer,
Err(e) => {
error!(
"Reconnect hibernation: failed to create new signer after reconnect request: {:?}",
e
);
return;
}
};
*self.signer.lock().await = Arc::new(new_signer);

// Start the new signer if the previous one was started.
if self.external_shutdown.lock().await.is_some() {
debug!("Reconnect hibernation: starting new signer");
self.do_start_signer().await;
}
}

async fn node_credentials(&self) -> NodeResult<Option<NodeCredentials>> {
Ok(Self::get_node_credentials(
self.sdk_config.network,
Expand Down Expand Up @@ -1530,11 +1601,9 @@ impl NodeAPI for Greenlight {
}

/// Starts the signer that listens in a loop until the shutdown signal is received
async fn start_signer(&self, shutdown: mpsc::Receiver<()>) {
match self.get_signer().await.run_forever(shutdown).await {
Ok(_) => info!("signer exited gracefully"),
Err(e) => error!("signer exited with error: {e}"),
}
async fn start_signer(&self, shutdown: watch::Receiver<()>) {
*self.external_shutdown.lock().await = Some(shutdown);
self.do_start_signer().await;
}

async fn start_keep_alive(&self, mut shutdown: watch::Receiver<()>) {
Expand Down
5 changes: 3 additions & 2 deletions libs/sdk-core/src/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::pin::Pin;

use anyhow::Result;
use serde_json::Value;
use tokio::sync::{mpsc, watch};
use tokio::sync::watch;
use tokio_stream::Stream;
use tonic::Streaming;

Expand Down Expand Up @@ -114,6 +114,7 @@ pub struct FetchBolt11Result {
/// Trait covering functions affecting the LN node
#[tonic::async_trait]
pub trait NodeAPI: Send + Sync {
async fn reconnect(&self);
async fn node_credentials(&self) -> NodeResult<Option<NodeCredentials>>;
async fn configure_node(&self, close_to_address: Option<String>) -> NodeResult<()>;
async fn create_invoice(&self, request: CreateInvoiceRequest) -> NodeResult<String>;
Expand Down Expand Up @@ -168,7 +169,7 @@ pub trait NodeAPI: Send + Sync {
&self,
req: PrepareRedeemOnchainFundsRequest,
) -> NodeResult<PrepareRedeemOnchainFundsResponse>;
async fn start_signer(&self, shutdown: mpsc::Receiver<()>);
async fn start_signer(&self, shutdown: watch::Receiver<()>);
async fn start_keep_alive(&self, shutdown: watch::Receiver<()>);
async fn connect_peer(&self, node_id: String, addr: String) -> NodeResult<()>;
async fn sign_invoice(&self, invoice: RawBolt11Invoice) -> NodeResult<String>;
Expand Down
3 changes: 2 additions & 1 deletion libs/sdk-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ pub struct MockNodeAPI {

#[tonic::async_trait]
impl NodeAPI for MockNodeAPI {
async fn reconnect(&self) {}
async fn node_credentials(&self) -> NodeResult<Option<NodeCredentials>> {
Err(NodeError::Generic("Not implemented".to_string()))
}
Expand Down Expand Up @@ -423,7 +424,7 @@ impl NodeAPI for MockNodeAPI {
Err(NodeError::Generic("Not implemented".to_string()))
}

async fn start_signer(&self, _shutdown: mpsc::Receiver<()>) {}
async fn start_signer(&self, _shutdown: watch::Receiver<()>) {}

async fn start_keep_alive(&self, _shutdown: watch::Receiver<()>) {}

Expand Down

0 comments on commit 23c95fe

Please sign in to comment.