diff --git a/crates/cdk-lnbits/Cargo.toml b/crates/cdk-lnbits/Cargo.toml index e8d23058..ee9ba6d8 100644 --- a/crates/cdk-lnbits/Cargo.toml +++ b/crates/cdk-lnbits/Cargo.toml @@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false } cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] } futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" # lnbits-rs = "0.2.0" diff --git a/crates/cdk-lnbits/src/lib.rs b/crates/cdk-lnbits/src/lib.rs index f0100b85..64e7bef7 100644 --- a/crates/cdk-lnbits/src/lib.rs +++ b/crates/cdk-lnbits/src/lib.rs @@ -4,6 +4,7 @@ #![warn(rustdoc::bare_urls)] use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::anyhow; @@ -26,6 +27,7 @@ use futures::Stream; use lnbits_rs::api::invoice::CreateInvoiceRequest; use lnbits_rs::LNBitsClient; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; pub mod error; @@ -38,6 +40,8 @@ pub struct LNbits { fee_reserve: FeeReserve, receiver: Arc>>>, webhook_url: String, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl LNbits { @@ -62,6 +66,8 @@ impl LNbits { receiver, fee_reserve, webhook_url, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } } @@ -81,13 +87,14 @@ impl MintLightning for LNbits { } fn is_wait_invoice_active(&self) -> bool { - todo!() + self.wait_invoice_is_active.load(Ordering::SeqCst) } fn cancel_wait_invoice(&self) { - todo!() + self.wait_invoice_cancel_token.cancel() } + #[allow(clippy::incompatible_msrv)] async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { @@ -100,25 +107,48 @@ impl MintLightning for LNbits { let lnbits_api = self.lnbits_api.clone(); + let cancel_token = self.wait_invoice_cancel_token.clone(); + Ok(futures::stream::unfold( - (receiver, lnbits_api), - |(mut receiver, lnbits_api)| async move { - match receiver.recv().await { - Some(msg) => { - let check = lnbits_api.is_invoice_paid(&msg).await; - - match check { - Ok(state) => { - if state { - Some((msg, (receiver, lnbits_api))) - } else { - None + ( + receiver, + lnbits_api, + cancel_token, + Arc::clone(&self.wait_invoice_is_active), + ), + |(mut receiver, lnbits_api, cancel_token, is_active)| async move { + is_active.store(true, Ordering::SeqCst); + + tokio::select! { + _ = cancel_token.cancelled() => { + // Stream is cancelled + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for phonixd invoice ending"); + None + } + msg_option = receiver.recv() => { + match msg_option { + Some(msg) => { + let check = lnbits_api.is_invoice_paid(&msg).await; + + match check { + Ok(state) => { + if state { + Some((msg, (receiver, lnbits_api, cancel_token, is_active))) + } else { + None + } } + _ => None, } - _ => None, } + None => { + is_active.store(true, Ordering::SeqCst); + None + }, + } + } - None => None, } }, ) diff --git a/crates/cdk-strike/Cargo.toml b/crates/cdk-strike/Cargo.toml index f48d5332..c1ccf737 100644 --- a/crates/cdk-strike/Cargo.toml +++ b/crates/cdk-strike/Cargo.toml @@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false } cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] } futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" uuid = { version = "1", features = ["v4"] } diff --git a/crates/cdk-strike/src/lib.rs b/crates/cdk-strike/src/lib.rs index a9e3de93..76eb0f25 100644 --- a/crates/cdk-strike/src/lib.rs +++ b/crates/cdk-strike/src/lib.rs @@ -4,6 +4,7 @@ #![warn(rustdoc::bare_urls)] use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::{anyhow, bail}; @@ -27,6 +28,7 @@ use strike_rs::{ PayInvoiceQuoteRequest, Strike as StrikeApi, }; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use uuid::Uuid; pub mod error; @@ -40,6 +42,8 @@ pub struct Strike { unit: CurrencyUnit, receiver: Arc>>>, webhook_url: String, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl Strike { @@ -60,6 +64,8 @@ impl Strike { receiver, unit, webhook_url, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } } @@ -79,13 +85,14 @@ impl MintLightning for Strike { } fn is_wait_invoice_active(&self) -> bool { - todo!() + self.wait_invoice_is_active.load(Ordering::SeqCst) } fn cancel_wait_invoice(&self) { - todo!() + self.wait_invoice_cancel_token.cancel() } + #[allow(clippy::incompatible_msrv)] async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { @@ -101,18 +108,34 @@ impl MintLightning for Strike { .ok_or(anyhow!("No receiver"))?; let strike_api = self.strike_api.clone(); + let cancel_token = self.wait_invoice_cancel_token.clone(); Ok(futures::stream::unfold( - (receiver, strike_api), - |(mut receiver, strike_api)| async move { - match receiver.recv().await { + ( + receiver, + strike_api, + cancel_token, + Arc::clone(&self.wait_invoice_is_active), + ), + |(mut receiver, strike_api, cancel_token, is_active)| async move { + tokio::select! { + + _ = cancel_token.cancelled() => { + // Stream is cancelled + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for phonixd invoice ending"); + None + } + + msg_option = receiver.recv() => { + match msg_option { Some(msg) => { let check = strike_api.get_incoming_invoice(&msg).await; match check { Ok(state) => { if state.state == InvoiceState::Paid { - Some((msg, (receiver, strike_api))) + Some((msg, (receiver, strike_api, cancel_token, is_active))) } else { None } @@ -122,6 +145,9 @@ impl MintLightning for Strike { } None => None, } + + } + } }, ) .boxed())