diff --git a/crates/cdk-cln/src/bolt12.rs b/crates/cdk-cln/src/bolt12.rs index 7e91aa38..37f9c8dc 100644 --- a/crates/cdk-cln/src/bolt12.rs +++ b/crates/cdk-cln/src/bolt12.rs @@ -1,5 +1,8 @@ use std::pin::Pin; use std::str::FromStr; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use cdk::amount::{amount_for_offer, to_unit, Amount}; @@ -12,26 +15,152 @@ use cdk::mint; use cdk::mint::types::PaymentRequest; use cdk::nuts::{CurrencyUnit, MeltQuoteBolt12Request, MeltQuoteState}; use cdk::util::{hex, unix_time}; -use cln_rpc::model::requests::{FetchinvoiceRequest, OfferRequest, PayRequest}; -use cln_rpc::model::responses::PayStatus; +use cln_rpc::model::requests::{ + FetchinvoiceRequest, OfferRequest, PayRequest, WaitanyinvoiceRequest, +}; +use cln_rpc::model::responses::{PayStatus, WaitanyinvoiceResponse, WaitanyinvoiceStatus}; use cln_rpc::model::Request; use cln_rpc::primitives::Amount as CLN_Amount; -use futures::Stream; +use futures::{Stream, StreamExt}; use lightning::offers::invoice::Bolt12Invoice; use lightning::offers::offer::Offer; use uuid::Uuid; use super::{Cln, Error}; +use crate::fetch_invoice_by_payment_hash; #[async_trait] impl MintBolt12Lightning for Cln { type Err = cdk_lightning::Error; + /// Is wait invoice active + fn is_wait_invoice_active(&self) -> bool { + self.wait_invoice_is_active.load(Ordering::SeqCst) + } + + /// Cancel wait invoice + fn cancel_wait_invoice(&self) { + self.wait_invoice_cancel_token.cancel() + } + /// Listen for bolt12 offers to be paid async fn wait_any_offer( &self, ) -> Result + Send>>, Self::Err> { - todo!() + let last_pay_index = self.get_last_pay_index().await?; + let cln_client = cln_rpc::ClnRpc::new(&self.rpc_socket).await?; + + let stream = futures::stream::unfold( + ( + cln_client, + last_pay_index, + self.wait_invoice_cancel_token.clone(), + Arc::clone(&self.bolt12_wait_invoice_is_active), + ), + |(mut cln_client, mut last_pay_idx, cancel_token, is_active)| async move { + // Set the stream as active + is_active.store(true, Ordering::SeqCst); + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + // Set the stream as inactive + is_active.store(false, Ordering::SeqCst); + // End the stream + return None; + } + result = cln_client.call(cln_rpc::Request::WaitAnyInvoice(WaitanyinvoiceRequest { + timeout: None, + lastpay_index: last_pay_idx, + })) => { + match result { + Ok(invoice) => { + + // Try to convert the invoice to WaitanyinvoiceResponse + let wait_any_response_result: Result = + invoice.try_into(); + + let wait_any_response = match wait_any_response_result { + Ok(response) => response, + Err(e) => { + tracing::warn!( + "Failed to parse WaitAnyInvoice response: {:?}", + e + ); + // Continue to the next iteration without panicking + continue; + } + }; + + // Check the status of the invoice + // We only want to yield invoices that have been paid + match wait_any_response.status { + WaitanyinvoiceStatus::PAID => (), + WaitanyinvoiceStatus::EXPIRED => continue, + } + + last_pay_idx = wait_any_response.pay_index; + + let payment_hash = wait_any_response.payment_hash.to_string(); + + + // TODO: Handle unit conversion + let amount_msats = wait_any_response.amount_received_msat.expect("status is paid there should be an amount"); + let amount_sats = amount_msats.msat() / 1000; + + let request_lookup_id = match wait_any_response.bolt12 { + // If it is a bolt12 payment we need to get the offer_id as this is what we use as the request look up. + // Since this is not returned in the wait any response, + // we need to do a second query for it. + Some(_) => { + match fetch_invoice_by_payment_hash( + &mut cln_client, + &payment_hash, + ) + .await + { + Ok(Some(invoice)) => { + if let Some(local_offer_id) = invoice.local_offer_id { + local_offer_id.to_string() + } else { + continue; + } + } + Ok(None) => continue, + Err(e) => { + tracing::warn!( + "Error fetching invoice by payment hash: {e}" + ); + continue; + } + } + } + None => payment_hash.clone(), + }; + + let response = WaitInvoiceResponse { + request_lookup_id, + payment_amount: amount_sats.into(), + unit: CurrencyUnit::Sat, + payment_id: payment_hash + }; + + break Some((response, (cln_client, last_pay_idx, cancel_token, is_active))); + } + Err(e) => { + tracing::warn!("Error fetching invoice: {e}"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + } + } + } + } + }, + ) + .boxed(); + + Ok(stream) } async fn get_bolt12_payment_quote( diff --git a/crates/cdk-cln/src/lib.rs b/crates/cdk-cln/src/lib.rs index 708b01d3..e1d6061e 100644 --- a/crates/cdk-cln/src/lib.rs +++ b/crates/cdk-cln/src/lib.rs @@ -49,6 +49,7 @@ pub struct Cln { bolt12_melt: bool, wait_invoice_cancel_token: CancellationToken, wait_invoice_is_active: Arc, + bolt12_wait_invoice_is_active: Arc, } impl Cln { @@ -69,6 +70,7 @@ impl Cln { bolt12_melt, wait_invoice_cancel_token: CancellationToken::new(), wait_invoice_is_active: Arc::new(AtomicBool::new(false)), + bolt12_wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } } @@ -163,38 +165,9 @@ impl MintLightning for Cln { let amount_msats = wait_any_response.amount_received_msat.expect("status is paid there should be an amount"); let amount_sats = amount_msats.msat() / 1000; - let request_lookup_id = match wait_any_response.bolt12 { - // If it is a bolt12 payment we need to get the offer_id as this is what we use as the request look up. - // Since this is not returned in the wait any response, - // we need to do a second query for it. - Some(_) => { - match fetch_invoice_by_payment_hash( - &mut cln_client, - &payment_hash, - ) - .await - { - Ok(Some(invoice)) => { - if let Some(local_offer_id) = invoice.local_offer_id { - local_offer_id.to_string() - } else { - continue; - } - } - Ok(None) => continue, - Err(e) => { - tracing::warn!( - "Error fetching invoice by payment hash: {e}" - ); - continue; - } - } - } - None => payment_hash.clone(), - }; let response = WaitInvoiceResponse { - request_lookup_id, + request_lookup_id: payment_hash.clone(), payment_amount: amount_sats.into(), unit: CurrencyUnit::Sat, payment_id: payment_hash diff --git a/crates/cdk-phoenixd/src/bolt12.rs b/crates/cdk-phoenixd/src/bolt12.rs index 34cd0f81..93d54915 100644 --- a/crates/cdk-phoenixd/src/bolt12.rs +++ b/crates/cdk-phoenixd/src/bolt12.rs @@ -23,6 +23,16 @@ use crate::Phoenixd; impl MintBolt12Lightning for Phoenixd { type Err = cdk_lightning::Error; + fn is_wait_invoice_active(&self) -> bool { + // Paying to PHD bolt12 offer is not supported so this can never be active + false + } + + fn cancel_wait_invoice(&self) { + // Paying to PHD bolt12 offer is not supported so there is nothing to cancel + () + } + async fn get_bolt12_payment_quote( &self, melt_quote_request: &MeltQuoteBolt12Request, diff --git a/crates/cdk/src/cdk_lightning/bolt12.rs b/crates/cdk/src/cdk_lightning/bolt12.rs index a855a245..06e21def 100644 --- a/crates/cdk/src/cdk_lightning/bolt12.rs +++ b/crates/cdk/src/cdk_lightning/bolt12.rs @@ -18,6 +18,12 @@ pub trait MintBolt12Lightning { /// Mint Lightning Error type Err: Into + From; + /// Is wait invoice active + fn is_wait_invoice_active(&self) -> bool; + + /// Cancel wait invoice + fn cancel_wait_invoice(&self); + /// Listen for bolt12 offers to be paid async fn wait_any_offer( &self, diff --git a/crates/cdk/src/mint/mint_nut04.rs b/crates/cdk/src/mint/mint_nut04.rs index 3d529455..551500ca 100644 --- a/crates/cdk/src/mint/mint_nut04.rs +++ b/crates/cdk/src/mint/mint_nut04.rs @@ -260,6 +260,7 @@ impl Mint { let amount_paid = quote.amount_paid + payment_amount; + // Since this is the first time we've seen this payment we add it to seen payment. payment_ids.push(payment_id); let quote = MintQuote { diff --git a/crates/cdk/src/mint/mod.rs b/crates/cdk/src/mint/mod.rs index 26f294b5..a7bb5e57 100644 --- a/crates/cdk/src/mint/mod.rs +++ b/crates/cdk/src/mint/mod.rs @@ -264,6 +264,64 @@ impl Mint { Ok(()) } + /// Wait for any offer to be paid + /// For each backend starts a task that waits for any offers to be paid + /// Once invoice is paid mint quote status is updated + #[allow(clippy::incompatible_msrv)] + // Clippy thinks select is not stable but it compiles fine on MSRV (1.63.0) + pub async fn wait_for_paid_offers(&self, shutdown: Arc) -> Result<(), Error> { + let mint_arc = Arc::new(self.clone()); + + let mut join_set = JoinSet::new(); + + for (key, bolt12) in self.bolt12_backends.iter() { + if !bolt12.is_wait_invoice_active() { + let mint = Arc::clone(&mint_arc); + let bolt12 = Arc::clone(bolt12); + let shutdown = Arc::clone(&shutdown); + let key = key.clone(); + join_set.spawn(async move { + if !bolt12.is_wait_invoice_active() { + loop { + tokio::select! { + _ = shutdown.notified() => { + tracing::info!("Shutdown signal received, stopping task for {:?}", key); + bolt12.cancel_wait_invoice(); + break; + } + result = bolt12.wait_any_offer() => { + match result { + Ok(mut stream) => { + while let Some(wait_invoice_response) = stream.next().await { + if let Err(err) = mint.pay_mint_quote_for_request_id(wait_invoice_response).await { + tracing::warn!("{:?}", err); + } + } + } + Err(err) => { + tracing::warn!("Could not get invoice stream for {:?}: {}",key, err); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + } + } + } + } + }); + } + } + + // Spawn a task to manage the JoinSet + while let Some(result) = join_set.join_next().await { + match result { + Ok(_) => tracing::info!("A task completed successfully."), + Err(err) => tracing::warn!("A task failed: {:?}", err), + } + } + + Ok(()) + } + /// Fee required for proof set #[instrument(skip_all)] pub async fn get_proofs_fee(&self, proofs: &Proofs) -> Result {