From 515991f8d297c8fc584aef30a38938ddca6e603d Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 6 Oct 2024 07:53:13 +0200 Subject: [PATCH 1/9] fix: cache melt response --- crates/cdk-axum/src/router_handlers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/cdk-axum/src/router_handlers.rs b/crates/cdk-axum/src/router_handlers.rs index 10039f0bb..0ee99d896 100644 --- a/crates/cdk-axum/src/router_handlers.rs +++ b/crates/cdk-axum/src/router_handlers.rs @@ -47,7 +47,7 @@ macro_rules! post_cache_wrapper { post_cache_wrapper!(post_swap, SwapRequest, SwapResponse); post_cache_wrapper!(post_mint_bolt11, MintBolt11Request, MintBolt11Response); -post_cache_wrapper!(post_melt_bolt11, MeltBolt11Request, MeltBolt11Response); +post_cache_wrapper!(post_melt_bolt11, MeltBolt11Request, MeltQuoteBolt11Response); pub async fn get_keys(State(state): State) -> Result, Response> { let pubkeys = state.mint.pubkeys().await.map_err(|err| { From 7865f3dc17aafa2ab40ee2ebc324a13dcf93c4fb Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 6 Oct 2024 16:38:38 +0200 Subject: [PATCH 2/9] feat: add cancel to wait invoice --- crates/cdk-cln/Cargo.toml | 1 + crates/cdk-cln/src/lib.rs | 165 ++++++++++++++++++++++++---- crates/cdk-fake-wallet/src/lib.rs | 8 ++ crates/cdk-lnbits/src/lib.rs | 8 ++ crates/cdk-lnd/src/lib.rs | 8 ++ crates/cdk-phoenixd/src/lib.rs | 8 ++ crates/cdk-strike/src/lib.rs | 8 ++ crates/cdk/src/cdk_lightning/mod.rs | 6 + crates/cdk/src/mint/mod.rs | 13 ++- 9 files changed, 196 insertions(+), 29 deletions(-) diff --git a/crates/cdk-cln/Cargo.toml b/crates/cdk-cln/Cargo.toml index 5a8e43f85..a5c505ede 100644 --- a/crates/cdk-cln/Cargo.toml +++ b/crates/cdk-cln/Cargo.toml @@ -16,6 +16,7 @@ cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = cln-rpc = "0.2.0" futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" uuid = { version = "1", features = ["v4"] } diff --git a/crates/cdk-cln/src/lib.rs b/crates/cdk-cln/src/lib.rs index a3c203963..1e8f609c4 100644 --- a/crates/cdk-cln/src/lib.rs +++ b/crates/cdk-cln/src/lib.rs @@ -6,6 +6,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -25,13 +26,15 @@ use cln_rpc::model::requests::{ InvoiceRequest, ListinvoicesRequest, ListpaysRequest, PayRequest, WaitanyinvoiceRequest, }; use cln_rpc::model::responses::{ - ListinvoicesInvoicesStatus, ListpaysPaysStatus, PayStatus, WaitanyinvoiceResponse, + ListinvoicesInvoices, ListinvoicesInvoicesStatus, ListpaysPaysStatus, PayStatus, + WaitanyinvoiceResponse, WaitanyinvoiceStatus, }; use cln_rpc::model::Request; use cln_rpc::primitives::{Amount as CLN_Amount, AmountOrAny}; use error::Error; use futures::{Stream, StreamExt}; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use uuid::Uuid; pub mod error; @@ -44,6 +47,8 @@ pub struct Cln { fee_reserve: FeeReserve, mint_settings: MintMethodSettings, melt_settings: MeltMethodSettings, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl Cln { @@ -62,6 +67,8 @@ impl Cln { fee_reserve, mint_settings, melt_settings, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } } @@ -80,43 +87,123 @@ impl MintLightning for Cln { } } + /// Is wait invoice active + fn is_wait_invoice_active(&self) -> bool { + self.wait_invoice_is_active.load(Ordering::SeqCst) + } + + /// Cancel wait invoice + fn cancel_wait_invoice(&self) { + self.wait_invoice_cancel_token.cancel() + } + + #[allow(clippy::incompatible_msrv)] + // Clippy thinks select is not stable but it compiles fine on MSRV (1.63.0) async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { let last_pay_index = self.get_last_pay_index().await?; let cln_client = cln_rpc::ClnRpc::new(&self.rpc_socket).await?; - Ok(futures::stream::unfold( - (cln_client, last_pay_index), - |(mut cln_client, mut last_pay_idx)| async move { + let stream = futures::stream::unfold( + ( + cln_client, + last_pay_index, + self.wait_invoice_cancel_token.clone(), + Arc::clone(&self.wait_invoice_is_active), + ), + |(mut cln_client, mut last_pay_idx, cancel_token, is_active)| async move { + // Set the stream as active + is_active.store(true, Ordering::SeqCst); + loop { - let invoice_res = cln_client - .call(cln_rpc::Request::WaitAnyInvoice(WaitanyinvoiceRequest { + tokio::select! { + _ = cancel_token.cancelled() => { + // Set the stream as inactive + is_active.store(false, Ordering::SeqCst); + // End the stream + return None; + } + result = cln_client.call(cln_rpc::Request::WaitAnyInvoice(WaitanyinvoiceRequest { timeout: None, lastpay_index: last_pay_idx, - })) - .await; - - let invoice: WaitanyinvoiceResponse = match invoice_res { - Ok(invoice) => invoice, - Err(e) => { - tracing::warn!("Error fetching invoice: {e}"); - // Let's not spam CLN with requests on failure - tokio::time::sleep(Duration::from_secs(1)).await; - // Retry same request - continue; + })) => { + match result { + Ok(invoice) => { + + // Try to convert the invoice to WaitanyinvoiceResponse + let wait_any_response_result: Result = + invoice.try_into(); + + let wait_any_response = match wait_any_response_result { + Ok(response) => response, + Err(e) => { + tracing::warn!( + "Failed to parse WaitAnyInvoice response: {:?}", + e + ); + // Continue to the next iteration without panicking + continue; + } + }; + + // Check the status of the invoice + // We only want to yield invoices that have been paid + match wait_any_response.status { + WaitanyinvoiceStatus::PAID => (), + WaitanyinvoiceStatus::EXPIRED => continue, + } + + last_pay_idx = wait_any_response.pay_index; + + let payment_hash = wait_any_response.payment_hash.to_string(); + + let request_look_up = match wait_any_response.bolt12 { + // If it is a bolt12 payment we need to get the offer_id as this is what we use as the request look up. + // Since this is not returned in the wait any response, + // we need to do a second query for it. + Some(_) => { + match fetch_invoice_by_payment_hash( + &mut cln_client, + &payment_hash, + ) + .await + { + Ok(Some(invoice)) => { + if let Some(local_offer_id) = invoice.local_offer_id { + local_offer_id.to_string() + } else { + continue; + } + } + Ok(None) => continue, + Err(e) => { + tracing::warn!( + "Error fetching invoice by payment hash: {e}" + ); + continue; + } + } + } + None => payment_hash, + }; + + return Some((request_look_up, (cln_client, last_pay_idx, cancel_token, is_active))); + } + Err(e) => { + tracing::warn!("Error fetching invoice: {e}"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + } } } - .try_into() - .expect("Wrong response from CLN"); - - last_pay_idx = invoice.pay_index; - - break Some((invoice.payment_hash.to_string(), (cln_client, last_pay_idx))); } }, ) - .boxed()) + .boxed(); + + Ok(stream) } async fn get_payment_quote( @@ -425,3 +512,33 @@ fn cln_pays_status_to_mint_state(status: ListpaysPaysStatus) -> MeltQuoteState { ListpaysPaysStatus::FAILED => MeltQuoteState::Failed, } } + +async fn fetch_invoice_by_payment_hash( + cln_client: &mut cln_rpc::ClnRpc, + payment_hash: &str, +) -> Result, Error> { + match cln_client + .call(cln_rpc::Request::ListInvoices(ListinvoicesRequest { + payment_hash: Some(payment_hash.to_string()), + index: None, + invstring: None, + label: None, + limit: None, + offer_id: None, + start: None, + })) + .await + { + Ok(cln_rpc::Response::ListInvoices(invoice_response)) => { + Ok(invoice_response.invoices.first().cloned()) + } + Ok(_) => { + tracing::warn!("CLN returned an unexpected response type"); + Err(Error::WrongClnResponse) + } + Err(e) => { + tracing::warn!("Error fetching invoice: {e}"); + Err(Error::from(e)) + } + } +} diff --git a/crates/cdk-fake-wallet/src/lib.rs b/crates/cdk-fake-wallet/src/lib.rs index 43cbcb963..fae9b90f5 100644 --- a/crates/cdk-fake-wallet/src/lib.rs +++ b/crates/cdk-fake-wallet/src/lib.rs @@ -112,6 +112,14 @@ impl MintLightning for FakeWallet { } } + fn is_wait_invoice_active(&self) -> bool { + todo!() + } + + fn cancel_wait_invoice(&self) { + todo!() + } + async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { diff --git a/crates/cdk-lnbits/src/lib.rs b/crates/cdk-lnbits/src/lib.rs index 6dfe315ef..f0100b85a 100644 --- a/crates/cdk-lnbits/src/lib.rs +++ b/crates/cdk-lnbits/src/lib.rs @@ -80,6 +80,14 @@ impl MintLightning for LNbits { } } + fn is_wait_invoice_active(&self) -> bool { + todo!() + } + + fn cancel_wait_invoice(&self) { + todo!() + } + async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { diff --git a/crates/cdk-lnd/src/lib.rs b/crates/cdk-lnd/src/lib.rs index f177a2442..9e9e6fd19 100644 --- a/crates/cdk-lnd/src/lib.rs +++ b/crates/cdk-lnd/src/lib.rs @@ -88,6 +88,14 @@ impl MintLightning for Lnd { } } + fn is_wait_invoice_active(&self) -> bool { + todo!() + } + + fn cancel_wait_invoice(&self) { + todo!() + } + async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { diff --git a/crates/cdk-phoenixd/src/lib.rs b/crates/cdk-phoenixd/src/lib.rs index c3c66b85c..f73efcea4 100644 --- a/crates/cdk-phoenixd/src/lib.rs +++ b/crates/cdk-phoenixd/src/lib.rs @@ -86,6 +86,14 @@ impl MintLightning for Phoenixd { } } + fn is_wait_invoice_active(&self) -> bool { + todo!() + } + + fn cancel_wait_invoice(&self) { + todo!() + } + async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { diff --git a/crates/cdk-strike/src/lib.rs b/crates/cdk-strike/src/lib.rs index c918005ec..a9e3de937 100644 --- a/crates/cdk-strike/src/lib.rs +++ b/crates/cdk-strike/src/lib.rs @@ -78,6 +78,14 @@ impl MintLightning for Strike { } } + fn is_wait_invoice_active(&self) -> bool { + todo!() + } + + fn cancel_wait_invoice(&self) { + todo!() + } + async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { diff --git a/crates/cdk/src/cdk_lightning/mod.rs b/crates/cdk/src/cdk_lightning/mod.rs index 463a82aaa..eb94fb4b2 100644 --- a/crates/cdk/src/cdk_lightning/mod.rs +++ b/crates/cdk/src/cdk_lightning/mod.rs @@ -85,6 +85,12 @@ pub trait MintLightning { &self, ) -> Result + Send>>, Self::Err>; + /// Is wait invoice active + fn is_wait_invoice_active(&self) -> bool; + + /// Cancel wait invoice + fn cancel_wait_invoice(&self); + /// Check the status of an incoming payment async fn check_incoming_invoice_status( &self, diff --git a/crates/cdk/src/mint/mod.rs b/crates/cdk/src/mint/mod.rs index 802af0978..55d7c3249 100644 --- a/crates/cdk/src/mint/mod.rs +++ b/crates/cdk/src/mint/mod.rs @@ -190,15 +190,17 @@ impl Mint { let mut join_set = JoinSet::new(); for (key, ln) in self.ln.iter() { - let mint = Arc::clone(&mint_arc); - let ln = Arc::clone(ln); - let shutdown = Arc::clone(&shutdown); - let key = *key; - join_set.spawn(async move { + if !ln.is_wait_invoice_active() { + let mint = Arc::clone(&mint_arc); + let ln = Arc::clone(ln); + let shutdown = Arc::clone(&shutdown); + let key = *key; + join_set.spawn(async move { loop { tokio::select! { _ = shutdown.notified() => { tracing::info!("Shutdown signal received, stopping task for {:?}", key); + ln.cancel_wait_invoice(); break; } result = ln.wait_any_invoice() => { @@ -219,6 +221,7 @@ impl Mint { } } }); + } } // Spawn a task to manage the JoinSet From 1e4a7af34f89adf2a9392a28710e508573bdc6f5 Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 6 Oct 2024 19:11:34 +0200 Subject: [PATCH 3/9] feat: phd cancel invoice --- crates/cdk-phoenixd/Cargo.toml | 1 + crates/cdk-phoenixd/src/lib.rs | 68 +++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/crates/cdk-phoenixd/Cargo.toml b/crates/cdk-phoenixd/Cargo.toml index e39ca5fa6..cfeb98b81 100644 --- a/crates/cdk-phoenixd/Cargo.toml +++ b/crates/cdk-phoenixd/Cargo.toml @@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false } cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] } futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" # phoenixd-rs = "0.3.0" diff --git a/crates/cdk-phoenixd/src/lib.rs b/crates/cdk-phoenixd/src/lib.rs index f73efcea4..22f9cda68 100644 --- a/crates/cdk-phoenixd/src/lib.rs +++ b/crates/cdk-phoenixd/src/lib.rs @@ -4,6 +4,7 @@ #![warn(rustdoc::bare_urls)] use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::anyhow; @@ -24,6 +25,7 @@ use futures::{Stream, StreamExt}; use phoenixd_rs::webhooks::WebhookResponse; use phoenixd_rs::{InvoiceRequest, Phoenixd as PhoenixdApi}; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; pub mod error; @@ -36,6 +38,8 @@ pub struct Phoenixd { fee_reserve: FeeReserve, receiver: Arc>>>, webhook_url: String, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl Phoenixd { @@ -57,6 +61,8 @@ impl Phoenixd { fee_reserve, receiver, webhook_url, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } @@ -87,11 +93,11 @@ impl MintLightning for Phoenixd { } fn is_wait_invoice_active(&self) -> bool { - todo!() + self.wait_invoice_is_active.load(Ordering::SeqCst) } fn cancel_wait_invoice(&self) { - todo!() + self.wait_invoice_cancel_token.cancel() } async fn wait_any_invoice( @@ -106,29 +112,55 @@ impl MintLightning for Phoenixd { let phoenixd_api = self.phoenixd_api.clone(); + let cancel_token = self.wait_invoice_cancel_token.clone(); + Ok(futures::stream::unfold( - (receiver, phoenixd_api), - |(mut receiver, phoenixd_api)| async move { - match receiver.recv().await { - Some(msg) => { - let check = phoenixd_api.get_incoming_invoice(&msg.payment_hash).await; - - match check { - Ok(state) => { - if state.is_paid { - Some((msg.payment_hash, (receiver, phoenixd_api))) - } else { + (receiver, phoenixd_api, cancel_token, + Arc::clone(&self.wait_invoice_is_active), + ), + |(mut receiver, phoenixd_api, cancel_token, is_active)| async move { + + is_active.store(true, Ordering::SeqCst); + tokio::select! { + _ = cancel_token.cancelled() => { + // Stream is cancelled + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for phonixd invoice ending"); + return None; + } + msg_option = receiver.recv() => { + match msg_option { + Some(msg) => { + let check = phoenixd_api.get_incoming_invoice(&msg.payment_hash).await; + + match check { + Ok(state) => { + if state.is_paid { + // Yield the payment hash and continue the stream + Some((msg.payment_hash, (receiver, phoenixd_api, cancel_token, is_active))) + } else { + // Invoice not paid yet, continue waiting + // We need to continue the stream, so we return the same state + None + } + } + Err(e) => { + // Log the error and continue + tracing::warn!("Error checking invoice state: {:?}", e); None } } - _ => None, + } + None => { + // The receiver stream has ended + None } } - None => None, } - }, - ) - .boxed()) + } + }, + ) + .boxed()) } async fn get_payment_quote( From e09084f02d2c27f5148affc30de2a024437c524f Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 6 Oct 2024 19:25:27 +0200 Subject: [PATCH 4/9] feat: lnd cancel invoice --- crates/cdk-lnd/Cargo.toml | 1 + crates/cdk-lnd/src/lib.rs | 63 +++++++++++++++++++++++++++++++-------- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/crates/cdk-lnd/Cargo.toml b/crates/cdk-lnd/Cargo.toml index e98e65e09..e0e99329b 100644 --- a/crates/cdk-lnd/Cargo.toml +++ b/crates/cdk-lnd/Cargo.toml @@ -15,5 +15,6 @@ cdk = { path = "../cdk", version= "0.4.0", default-features = false, features = fedimint-tonic-lnd = "0.2.0" futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" diff --git a/crates/cdk-lnd/src/lib.rs b/crates/cdk-lnd/src/lib.rs index 9e9e6fd19..0d6202204 100644 --- a/crates/cdk-lnd/src/lib.rs +++ b/crates/cdk-lnd/src/lib.rs @@ -8,6 +8,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::anyhow; @@ -30,6 +31,7 @@ use fedimint_tonic_lnd::lnrpc::FeeLimit; use fedimint_tonic_lnd::Client; use futures::{Stream, StreamExt}; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; pub mod error; @@ -43,6 +45,8 @@ pub struct Lnd { fee_reserve: FeeReserve, mint_settings: MintMethodSettings, melt_settings: MeltMethodSettings, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl Lnd { @@ -70,6 +74,8 @@ impl Lnd { fee_reserve, mint_settings, melt_settings, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } } @@ -89,11 +95,11 @@ impl MintLightning for Lnd { } fn is_wait_invoice_active(&self) -> bool { - todo!() + self.wait_invoice_is_active.load(Ordering::SeqCst) } fn cancel_wait_invoice(&self) { - todo!() + self.wait_invoice_cancel_token.cancel() } async fn wait_any_invoice( @@ -116,19 +122,52 @@ impl MintLightning for Lnd { .unwrap() .into_inner(); - Ok(futures::stream::unfold(stream, |mut stream| async move { - match stream.message().await { - Ok(Some(msg)) => { - if msg.state == 1 { - Some((hex::encode(msg.r_hash), stream)) - } else { + let cancel_token = self.wait_invoice_cancel_token.clone(); + + Ok(futures::stream::unfold( + ( + stream, + cancel_token, + Arc::clone(&self.wait_invoice_is_active), + ), + |(mut stream, cancel_token, is_active)| async move { + is_active.store(true, Ordering::SeqCst); + + tokio::select! { + _ = cancel_token.cancelled() => { + // Stream is cancelled + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for lnd invoice ending"); + return None; + + } + msg = stream.message() => { + + match msg { + Ok(Some(msg)) => { + if msg.state == 1 { + Some((hex::encode(msg.r_hash), (stream, cancel_token, is_active))) + } else { + None + } + } + Ok(None) => { + is_active.store(false, Ordering::SeqCst); + tracing::info!("LND invoice stream ended."); None + }, // End of stream + Err(err) => { + is_active.store(false, Ordering::SeqCst); + tracing::warn!("Encounrdered error in LND invoice stream. Stream ending"); + tracing::error!("{:?}", err); + None + + }, // Handle errors gracefully, ends the stream on error + } } } - Ok(None) => None, // End of stream - Err(_) => None, // Handle errors gracefully, ends the stream on error - } - }) + }, + ) .boxed()) } From 7212fa32a8342bc3231a0a9632051e7057157e33 Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 6 Oct 2024 19:36:28 +0200 Subject: [PATCH 5/9] chore: clippy --- crates/cdk-lnd/src/lib.rs | 2 +- crates/cdk-phoenixd/src/lib.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/cdk-lnd/src/lib.rs b/crates/cdk-lnd/src/lib.rs index 0d6202204..c5ecbeb8f 100644 --- a/crates/cdk-lnd/src/lib.rs +++ b/crates/cdk-lnd/src/lib.rs @@ -138,7 +138,7 @@ impl MintLightning for Lnd { // Stream is cancelled is_active.store(false, Ordering::SeqCst); tracing::info!("Waiting for lnd invoice ending"); - return None; + None } msg = stream.message() => { diff --git a/crates/cdk-phoenixd/src/lib.rs b/crates/cdk-phoenixd/src/lib.rs index 22f9cda68..4aecfac6b 100644 --- a/crates/cdk-phoenixd/src/lib.rs +++ b/crates/cdk-phoenixd/src/lib.rs @@ -100,6 +100,7 @@ impl MintLightning for Phoenixd { self.wait_invoice_cancel_token.cancel() } + #[allow(clippy::incompatible_msrv)] async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { @@ -126,7 +127,7 @@ impl MintLightning for Phoenixd { // Stream is cancelled is_active.store(false, Ordering::SeqCst); tracing::info!("Waiting for phonixd invoice ending"); - return None; + None } msg_option = receiver.recv() => { match msg_option { From c82e6bdcab7f390f7816833ebc6240aa45eb76d9 Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 6 Oct 2024 19:56:43 +0200 Subject: [PATCH 6/9] feat: lnbits and strike kill --- crates/cdk-lnbits/Cargo.toml | 1 + crates/cdk-lnbits/src/lib.rs | 62 ++++++++++++++++++++++++++---------- crates/cdk-strike/Cargo.toml | 1 + crates/cdk-strike/src/lib.rs | 38 ++++++++++++++++++---- 4 files changed, 80 insertions(+), 22 deletions(-) diff --git a/crates/cdk-lnbits/Cargo.toml b/crates/cdk-lnbits/Cargo.toml index e8d230582..ee9ba6d8d 100644 --- a/crates/cdk-lnbits/Cargo.toml +++ b/crates/cdk-lnbits/Cargo.toml @@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false } cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] } futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" # lnbits-rs = "0.2.0" diff --git a/crates/cdk-lnbits/src/lib.rs b/crates/cdk-lnbits/src/lib.rs index f0100b85a..64e7bef7e 100644 --- a/crates/cdk-lnbits/src/lib.rs +++ b/crates/cdk-lnbits/src/lib.rs @@ -4,6 +4,7 @@ #![warn(rustdoc::bare_urls)] use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::anyhow; @@ -26,6 +27,7 @@ use futures::Stream; use lnbits_rs::api::invoice::CreateInvoiceRequest; use lnbits_rs::LNBitsClient; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; pub mod error; @@ -38,6 +40,8 @@ pub struct LNbits { fee_reserve: FeeReserve, receiver: Arc>>>, webhook_url: String, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl LNbits { @@ -62,6 +66,8 @@ impl LNbits { receiver, fee_reserve, webhook_url, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } } @@ -81,13 +87,14 @@ impl MintLightning for LNbits { } fn is_wait_invoice_active(&self) -> bool { - todo!() + self.wait_invoice_is_active.load(Ordering::SeqCst) } fn cancel_wait_invoice(&self) { - todo!() + self.wait_invoice_cancel_token.cancel() } + #[allow(clippy::incompatible_msrv)] async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { @@ -100,25 +107,48 @@ impl MintLightning for LNbits { let lnbits_api = self.lnbits_api.clone(); + let cancel_token = self.wait_invoice_cancel_token.clone(); + Ok(futures::stream::unfold( - (receiver, lnbits_api), - |(mut receiver, lnbits_api)| async move { - match receiver.recv().await { - Some(msg) => { - let check = lnbits_api.is_invoice_paid(&msg).await; - - match check { - Ok(state) => { - if state { - Some((msg, (receiver, lnbits_api))) - } else { - None + ( + receiver, + lnbits_api, + cancel_token, + Arc::clone(&self.wait_invoice_is_active), + ), + |(mut receiver, lnbits_api, cancel_token, is_active)| async move { + is_active.store(true, Ordering::SeqCst); + + tokio::select! { + _ = cancel_token.cancelled() => { + // Stream is cancelled + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for phonixd invoice ending"); + None + } + msg_option = receiver.recv() => { + match msg_option { + Some(msg) => { + let check = lnbits_api.is_invoice_paid(&msg).await; + + match check { + Ok(state) => { + if state { + Some((msg, (receiver, lnbits_api, cancel_token, is_active))) + } else { + None + } } + _ => None, } - _ => None, } + None => { + is_active.store(true, Ordering::SeqCst); + None + }, + } + } - None => None, } }, ) diff --git a/crates/cdk-strike/Cargo.toml b/crates/cdk-strike/Cargo.toml index f48d53329..c1ccf737c 100644 --- a/crates/cdk-strike/Cargo.toml +++ b/crates/cdk-strike/Cargo.toml @@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false } cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] } futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" uuid = { version = "1", features = ["v4"] } diff --git a/crates/cdk-strike/src/lib.rs b/crates/cdk-strike/src/lib.rs index a9e3de937..76eb0f256 100644 --- a/crates/cdk-strike/src/lib.rs +++ b/crates/cdk-strike/src/lib.rs @@ -4,6 +4,7 @@ #![warn(rustdoc::bare_urls)] use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::{anyhow, bail}; @@ -27,6 +28,7 @@ use strike_rs::{ PayInvoiceQuoteRequest, Strike as StrikeApi, }; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use uuid::Uuid; pub mod error; @@ -40,6 +42,8 @@ pub struct Strike { unit: CurrencyUnit, receiver: Arc>>>, webhook_url: String, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl Strike { @@ -60,6 +64,8 @@ impl Strike { receiver, unit, webhook_url, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } } @@ -79,13 +85,14 @@ impl MintLightning for Strike { } fn is_wait_invoice_active(&self) -> bool { - todo!() + self.wait_invoice_is_active.load(Ordering::SeqCst) } fn cancel_wait_invoice(&self) { - todo!() + self.wait_invoice_cancel_token.cancel() } + #[allow(clippy::incompatible_msrv)] async fn wait_any_invoice( &self, ) -> Result + Send>>, Self::Err> { @@ -101,18 +108,34 @@ impl MintLightning for Strike { .ok_or(anyhow!("No receiver"))?; let strike_api = self.strike_api.clone(); + let cancel_token = self.wait_invoice_cancel_token.clone(); Ok(futures::stream::unfold( - (receiver, strike_api), - |(mut receiver, strike_api)| async move { - match receiver.recv().await { + ( + receiver, + strike_api, + cancel_token, + Arc::clone(&self.wait_invoice_is_active), + ), + |(mut receiver, strike_api, cancel_token, is_active)| async move { + tokio::select! { + + _ = cancel_token.cancelled() => { + // Stream is cancelled + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for phonixd invoice ending"); + None + } + + msg_option = receiver.recv() => { + match msg_option { Some(msg) => { let check = strike_api.get_incoming_invoice(&msg).await; match check { Ok(state) => { if state.state == InvoiceState::Paid { - Some((msg, (receiver, strike_api))) + Some((msg, (receiver, strike_api, cancel_token, is_active))) } else { None } @@ -122,6 +145,9 @@ impl MintLightning for Strike { } None => None, } + + } + } }, ) .boxed()) From 065b4e248fab5aa5849cd75145d547d9ee6431c1 Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 6 Oct 2024 19:58:22 +0200 Subject: [PATCH 7/9] feat: fake wallet --- crates/cdk-fake-wallet/Cargo.toml | 1 + crates/cdk-fake-wallet/src/lib.rs | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/cdk-fake-wallet/Cargo.toml b/crates/cdk-fake-wallet/Cargo.toml index 0cf03b831..8d1b5dffe 100644 --- a/crates/cdk-fake-wallet/Cargo.toml +++ b/crates/cdk-fake-wallet/Cargo.toml @@ -15,6 +15,7 @@ bitcoin = { version = "0.32.2", default-features = false } cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] } futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" serde = "1" diff --git a/crates/cdk-fake-wallet/src/lib.rs b/crates/cdk-fake-wallet/src/lib.rs index fae9b90f5..e0fba0727 100644 --- a/crates/cdk-fake-wallet/src/lib.rs +++ b/crates/cdk-fake-wallet/src/lib.rs @@ -8,6 +8,7 @@ use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -33,6 +34,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; use tokio::time; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::CancellationToken; pub mod error; @@ -47,6 +49,8 @@ pub struct FakeWallet { payment_states: Arc>>, failed_payment_check: Arc>>, payment_delay: u64, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl FakeWallet { @@ -70,6 +74,8 @@ impl FakeWallet { payment_states: Arc::new(Mutex::new(payment_states)), failed_payment_check: Arc::new(Mutex::new(fail_payment_check)), payment_delay, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), } } } @@ -113,11 +119,11 @@ impl MintLightning for FakeWallet { } fn is_wait_invoice_active(&self) -> bool { - todo!() + self.wait_invoice_is_active.load(Ordering::SeqCst) } fn cancel_wait_invoice(&self) { - todo!() + self.wait_invoice_cancel_token.cancel() } async fn wait_any_invoice( From 943d00bf20e89ba15ef4dd8e400b974d6effdfab Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 6 Oct 2024 20:57:33 +0200 Subject: [PATCH 8/9] feat: only start waiting if inactive --- crates/cdk/src/mint/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/cdk/src/mint/mod.rs b/crates/cdk/src/mint/mod.rs index 55d7c3249..0699cf7f7 100644 --- a/crates/cdk/src/mint/mod.rs +++ b/crates/cdk/src/mint/mod.rs @@ -196,6 +196,7 @@ impl Mint { let shutdown = Arc::clone(&shutdown); let key = *key; join_set.spawn(async move { + if !ln.is_wait_invoice_active() { loop { tokio::select! { _ = shutdown.notified() => { @@ -218,6 +219,7 @@ impl Mint { } } } + } } } }); From 260f262c395056ae243ccc8cfa0e9de793c5fd20 Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Sun, 6 Oct 2024 22:12:24 +0200 Subject: [PATCH 9/9] CI yml: add missing commas in build-args list --- .github/workflows/ci.yml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6394a6abf..cd3956f8c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,7 +56,7 @@ jobs: -p cdk-lnbits, -p cdk-fake-wallet, --bin cdk-cli, - --bin cdk-mintd + --bin cdk-mintd, ] steps: - name: checkout @@ -85,9 +85,9 @@ jobs: ] database: [ - REDB, - SQLITE, - MEMORY + REDB, + SQLITE, + MEMORY, ] steps: - name: checkout @@ -116,7 +116,7 @@ jobs: [ REDB, SQLITE, - MEMORY + MEMORY, ] steps: - name: checkout @@ -146,9 +146,9 @@ jobs: -p cdk-axum, -p cdk-strike, -p cdk-lnbits, - -p cdk-phoenixd - -p cdk-fake-wallet - -p cdk-cln + -p cdk-phoenixd, + -p cdk-fake-wallet, + -p cdk-cln, ] steps: - name: checkout @@ -170,8 +170,8 @@ jobs: matrix: build-args: [ - -p cdk-sqlite - -p cdk-redb + -p cdk-sqlite, + -p cdk-redb, ] steps: - name: checkout @@ -199,7 +199,7 @@ jobs: -p cdk, -p cdk --no-default-features, -p cdk --no-default-features --features wallet, - -p cdk-js + -p cdk-js, ] steps: - name: checkout