Skip to content

Commit

Permalink
feat: wait any invoice for bolt12
Browse files Browse the repository at this point in the history
  • Loading branch information
thesimplekid committed Nov 17, 2024
1 parent c4ac11d commit a4789ea
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 34 deletions.
137 changes: 133 additions & 4 deletions crates/cdk-cln/src/bolt12.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use cdk::amount::{amount_for_offer, to_unit, Amount};
Expand All @@ -12,26 +15,152 @@ use cdk::mint;
use cdk::mint::types::PaymentRequest;
use cdk::nuts::{CurrencyUnit, MeltQuoteBolt12Request, MeltQuoteState};
use cdk::util::{hex, unix_time};
use cln_rpc::model::requests::{FetchinvoiceRequest, OfferRequest, PayRequest};
use cln_rpc::model::responses::PayStatus;
use cln_rpc::model::requests::{
FetchinvoiceRequest, OfferRequest, PayRequest, WaitanyinvoiceRequest,
};
use cln_rpc::model::responses::{PayStatus, WaitanyinvoiceResponse, WaitanyinvoiceStatus};
use cln_rpc::model::Request;
use cln_rpc::primitives::Amount as CLN_Amount;
use futures::Stream;
use futures::{Stream, StreamExt};
use lightning::offers::invoice::Bolt12Invoice;
use lightning::offers::offer::Offer;
use uuid::Uuid;

use super::{Cln, Error};
use crate::fetch_invoice_by_payment_hash;

#[async_trait]
impl MintBolt12Lightning for Cln {
type Err = cdk_lightning::Error;

/// Is wait invoice active
fn is_wait_invoice_active(&self) -> bool {
self.wait_invoice_is_active.load(Ordering::SeqCst)
}

/// Cancel wait invoice
fn cancel_wait_invoice(&self) {
self.wait_invoice_cancel_token.cancel()
}

/// Listen for bolt12 offers to be paid
async fn wait_any_offer(
&self,
) -> Result<Pin<Box<dyn Stream<Item = WaitInvoiceResponse> + Send>>, Self::Err> {
todo!()
let last_pay_index = self.get_last_pay_index().await?;
let cln_client = cln_rpc::ClnRpc::new(&self.rpc_socket).await?;

let stream = futures::stream::unfold(
(
cln_client,
last_pay_index,
self.wait_invoice_cancel_token.clone(),
Arc::clone(&self.bolt12_wait_invoice_is_active),
),
|(mut cln_client, mut last_pay_idx, cancel_token, is_active)| async move {
// Set the stream as active
is_active.store(true, Ordering::SeqCst);

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
// Set the stream as inactive
is_active.store(false, Ordering::SeqCst);
// End the stream
return None;
}
result = cln_client.call(cln_rpc::Request::WaitAnyInvoice(WaitanyinvoiceRequest {
timeout: None,
lastpay_index: last_pay_idx,
})) => {
match result {
Ok(invoice) => {

// Try to convert the invoice to WaitanyinvoiceResponse
let wait_any_response_result: Result<WaitanyinvoiceResponse, _> =
invoice.try_into();

let wait_any_response = match wait_any_response_result {
Ok(response) => response,
Err(e) => {
tracing::warn!(
"Failed to parse WaitAnyInvoice response: {:?}",
e
);
// Continue to the next iteration without panicking
continue;
}
};

// Check the status of the invoice
// We only want to yield invoices that have been paid
match wait_any_response.status {
WaitanyinvoiceStatus::PAID => (),
WaitanyinvoiceStatus::EXPIRED => continue,
}

last_pay_idx = wait_any_response.pay_index;

let payment_hash = wait_any_response.payment_hash.to_string();


// TODO: Handle unit conversion
let amount_msats = wait_any_response.amount_received_msat.expect("status is paid there should be an amount");
let amount_sats = amount_msats.msat() / 1000;

let request_lookup_id = match wait_any_response.bolt12 {
// If it is a bolt12 payment we need to get the offer_id as this is what we use as the request look up.
// Since this is not returned in the wait any response,
// we need to do a second query for it.
Some(_) => {
match fetch_invoice_by_payment_hash(
&mut cln_client,
&payment_hash,
)
.await
{
Ok(Some(invoice)) => {
if let Some(local_offer_id) = invoice.local_offer_id {
local_offer_id.to_string()
} else {
continue;
}
}
Ok(None) => continue,
Err(e) => {
tracing::warn!(
"Error fetching invoice by payment hash: {e}"
);
continue;
}
}
}
None => payment_hash.clone(),
};

let response = WaitInvoiceResponse {
request_lookup_id,
payment_amount: amount_sats.into(),
unit: CurrencyUnit::Sat,
payment_id: payment_hash
};

break Some((response, (cln_client, last_pay_idx, cancel_token, is_active)));
}
Err(e) => {
tracing::warn!("Error fetching invoice: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
}
}
},
)
.boxed();

Ok(stream)
}

async fn get_bolt12_payment_quote(
Expand Down
33 changes: 3 additions & 30 deletions crates/cdk-cln/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct Cln {
bolt12_melt: bool,
wait_invoice_cancel_token: CancellationToken,
wait_invoice_is_active: Arc<AtomicBool>,
bolt12_wait_invoice_is_active: Arc<AtomicBool>,
}

impl Cln {
Expand All @@ -69,6 +70,7 @@ impl Cln {
bolt12_melt,
wait_invoice_cancel_token: CancellationToken::new(),
wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
bolt12_wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
})
}
}
Expand Down Expand Up @@ -163,38 +165,9 @@ impl MintLightning for Cln {
let amount_msats = wait_any_response.amount_received_msat.expect("status is paid there should be an amount");
let amount_sats = amount_msats.msat() / 1000;

let request_lookup_id = match wait_any_response.bolt12 {
// If it is a bolt12 payment we need to get the offer_id as this is what we use as the request look up.
// Since this is not returned in the wait any response,
// we need to do a second query for it.
Some(_) => {
match fetch_invoice_by_payment_hash(
&mut cln_client,
&payment_hash,
)
.await
{
Ok(Some(invoice)) => {
if let Some(local_offer_id) = invoice.local_offer_id {
local_offer_id.to_string()
} else {
continue;
}
}
Ok(None) => continue,
Err(e) => {
tracing::warn!(
"Error fetching invoice by payment hash: {e}"
);
continue;
}
}
}
None => payment_hash.clone(),
};

let response = WaitInvoiceResponse {
request_lookup_id,
request_lookup_id: payment_hash.clone(),
payment_amount: amount_sats.into(),
unit: CurrencyUnit::Sat,
payment_id: payment_hash
Expand Down
10 changes: 10 additions & 0 deletions crates/cdk-phoenixd/src/bolt12.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ use crate::Phoenixd;
impl MintBolt12Lightning for Phoenixd {
type Err = cdk_lightning::Error;

fn is_wait_invoice_active(&self) -> bool {
// Paying to PHD bolt12 offer is not supported so this can never be active
false
}

fn cancel_wait_invoice(&self) {
// Paying to PHD bolt12 offer is not supported so there is nothing to cancel
()
}

async fn get_bolt12_payment_quote(
&self,
melt_quote_request: &MeltQuoteBolt12Request,
Expand Down
6 changes: 6 additions & 0 deletions crates/cdk/src/cdk_lightning/bolt12.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ pub trait MintBolt12Lightning {
/// Mint Lightning Error
type Err: Into<Error> + From<Error>;

/// Is wait invoice active
fn is_wait_invoice_active(&self) -> bool;

/// Cancel wait invoice
fn cancel_wait_invoice(&self);

/// Listen for bolt12 offers to be paid
async fn wait_any_offer(
&self,
Expand Down
1 change: 1 addition & 0 deletions crates/cdk/src/mint/mint_nut04.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl Mint {

let amount_paid = quote.amount_paid + payment_amount;

// Since this is the first time we've seen this payment we add it to seen payment.
payment_ids.push(payment_id);

let quote = MintQuote {
Expand Down
58 changes: 58 additions & 0 deletions crates/cdk/src/mint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,64 @@ impl Mint {
Ok(())
}

/// Wait for any offer to be paid
/// For each backend starts a task that waits for any offers to be paid
/// Once invoice is paid mint quote status is updated
#[allow(clippy::incompatible_msrv)]
// Clippy thinks select is not stable but it compiles fine on MSRV (1.63.0)
pub async fn wait_for_paid_offers(&self, shutdown: Arc<Notify>) -> Result<(), Error> {
let mint_arc = Arc::new(self.clone());

let mut join_set = JoinSet::new();

for (key, bolt12) in self.bolt12_backends.iter() {
if !bolt12.is_wait_invoice_active() {
let mint = Arc::clone(&mint_arc);
let bolt12 = Arc::clone(bolt12);
let shutdown = Arc::clone(&shutdown);
let key = key.clone();
join_set.spawn(async move {
if !bolt12.is_wait_invoice_active() {
loop {
tokio::select! {
_ = shutdown.notified() => {
tracing::info!("Shutdown signal received, stopping task for {:?}", key);
bolt12.cancel_wait_invoice();
break;
}
result = bolt12.wait_any_offer() => {
match result {
Ok(mut stream) => {
while let Some(wait_invoice_response) = stream.next().await {
if let Err(err) = mint.pay_mint_quote_for_request_id(wait_invoice_response).await {
tracing::warn!("{:?}", err);
}
}
}
Err(err) => {
tracing::warn!("Could not get invoice stream for {:?}: {}",key, err);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
}
}
});
}
}

// Spawn a task to manage the JoinSet
while let Some(result) = join_set.join_next().await {
match result {
Ok(_) => tracing::info!("A task completed successfully."),
Err(err) => tracing::warn!("A task failed: {:?}", err),
}
}

Ok(())
}

/// Fee required for proof set
#[instrument(skip_all)]
pub async fn get_proofs_fee(&self, proofs: &Proofs) -> Result<Amount, Error> {
Expand Down

0 comments on commit a4789ea

Please sign in to comment.