Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into wallet-melt-amount-…
Browse files Browse the repository at this point in the history
…swap
  • Loading branch information
davidcaseria committed Oct 7, 2024
2 parents 8bf14ca + 260f262 commit 3bad82e
Show file tree
Hide file tree
Showing 16 changed files with 393 additions and 85 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
-p cdk-lnbits,
-p cdk-fake-wallet,
--bin cdk-cli,
--bin cdk-mintd
--bin cdk-mintd,
]
steps:
- name: checkout
Expand Down Expand Up @@ -85,9 +85,9 @@ jobs:
]
database:
[
REDB,
SQLITE,
MEMORY
REDB,
SQLITE,
MEMORY,
]
steps:
- name: checkout
Expand Down Expand Up @@ -116,7 +116,7 @@ jobs:
[
REDB,
SQLITE,
MEMORY
MEMORY,
]
steps:
- name: checkout
Expand Down Expand Up @@ -146,9 +146,9 @@ jobs:
-p cdk-axum,
-p cdk-strike,
-p cdk-lnbits,
-p cdk-phoenixd
-p cdk-fake-wallet
-p cdk-cln
-p cdk-phoenixd,
-p cdk-fake-wallet,
-p cdk-cln,
]
steps:
- name: checkout
Expand All @@ -170,8 +170,8 @@ jobs:
matrix:
build-args:
[
-p cdk-sqlite
-p cdk-redb
-p cdk-sqlite,
-p cdk-redb,
]
steps:
- name: checkout
Expand Down Expand Up @@ -199,7 +199,7 @@ jobs:
-p cdk,
-p cdk --no-default-features,
-p cdk --no-default-features --features wallet,
-p cdk-js
-p cdk-js,
]
steps:
- name: checkout
Expand Down
2 changes: 1 addition & 1 deletion crates/cdk-axum/src/router_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ macro_rules! post_cache_wrapper {

post_cache_wrapper!(post_swap, SwapRequest, SwapResponse);
post_cache_wrapper!(post_mint_bolt11, MintBolt11Request, MintBolt11Response);
post_cache_wrapper!(post_melt_bolt11, MeltBolt11Request, MeltBolt11Response);
post_cache_wrapper!(post_melt_bolt11, MeltBolt11Request, MeltQuoteBolt11Response);

pub async fn get_keys(State(state): State<MintState>) -> Result<Json<KeysResponse>, Response> {
let pubkeys = state.mint.pubkeys().await.map_err(|err| {
Expand Down
1 change: 1 addition & 0 deletions crates/cdk-cln/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ cdk = { path = "../cdk", version = "0.4.0", default-features = false, features =
cln-rpc = "0.2.0"
futures = { version = "0.3.28", default-features = false }
tokio = { version = "1", default-features = false }
tokio-util = { version = "0.7.11", default-features = false }
tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
thiserror = "1"
uuid = { version = "1", features = ["v4"] }
165 changes: 141 additions & 24 deletions crates/cdk-cln/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -25,13 +26,15 @@ use cln_rpc::model::requests::{
InvoiceRequest, ListinvoicesRequest, ListpaysRequest, PayRequest, WaitanyinvoiceRequest,
};
use cln_rpc::model::responses::{
ListinvoicesInvoicesStatus, ListpaysPaysStatus, PayStatus, WaitanyinvoiceResponse,
ListinvoicesInvoices, ListinvoicesInvoicesStatus, ListpaysPaysStatus, PayStatus,
WaitanyinvoiceResponse, WaitanyinvoiceStatus,
};
use cln_rpc::model::Request;
use cln_rpc::primitives::{Amount as CLN_Amount, AmountOrAny};
use error::Error;
use futures::{Stream, StreamExt};
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

pub mod error;
Expand All @@ -44,6 +47,8 @@ pub struct Cln {
fee_reserve: FeeReserve,
mint_settings: MintMethodSettings,
melt_settings: MeltMethodSettings,
wait_invoice_cancel_token: CancellationToken,
wait_invoice_is_active: Arc<AtomicBool>,
}

impl Cln {
Expand All @@ -62,6 +67,8 @@ impl Cln {
fee_reserve,
mint_settings,
melt_settings,
wait_invoice_cancel_token: CancellationToken::new(),
wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
})
}
}
Expand All @@ -80,43 +87,123 @@ impl MintLightning for Cln {
}
}

/// Is wait invoice active
fn is_wait_invoice_active(&self) -> bool {
self.wait_invoice_is_active.load(Ordering::SeqCst)
}

/// Cancel wait invoice
fn cancel_wait_invoice(&self) {
self.wait_invoice_cancel_token.cancel()
}

#[allow(clippy::incompatible_msrv)]
// Clippy thinks select is not stable but it compiles fine on MSRV (1.63.0)
async fn wait_any_invoice(
&self,
) -> Result<Pin<Box<dyn Stream<Item = String> + Send>>, Self::Err> {
let last_pay_index = self.get_last_pay_index().await?;
let cln_client = cln_rpc::ClnRpc::new(&self.rpc_socket).await?;

Ok(futures::stream::unfold(
(cln_client, last_pay_index),
|(mut cln_client, mut last_pay_idx)| async move {
let stream = futures::stream::unfold(
(
cln_client,
last_pay_index,
self.wait_invoice_cancel_token.clone(),
Arc::clone(&self.wait_invoice_is_active),
),
|(mut cln_client, mut last_pay_idx, cancel_token, is_active)| async move {
// Set the stream as active
is_active.store(true, Ordering::SeqCst);

loop {
let invoice_res = cln_client
.call(cln_rpc::Request::WaitAnyInvoice(WaitanyinvoiceRequest {
tokio::select! {
_ = cancel_token.cancelled() => {
// Set the stream as inactive
is_active.store(false, Ordering::SeqCst);
// End the stream
return None;
}
result = cln_client.call(cln_rpc::Request::WaitAnyInvoice(WaitanyinvoiceRequest {
timeout: None,
lastpay_index: last_pay_idx,
}))
.await;

let invoice: WaitanyinvoiceResponse = match invoice_res {
Ok(invoice) => invoice,
Err(e) => {
tracing::warn!("Error fetching invoice: {e}");
// Let's not spam CLN with requests on failure
tokio::time::sleep(Duration::from_secs(1)).await;
// Retry same request
continue;
})) => {
match result {
Ok(invoice) => {

// Try to convert the invoice to WaitanyinvoiceResponse
let wait_any_response_result: Result<WaitanyinvoiceResponse, _> =
invoice.try_into();

let wait_any_response = match wait_any_response_result {
Ok(response) => response,
Err(e) => {
tracing::warn!(
"Failed to parse WaitAnyInvoice response: {:?}",
e
);
// Continue to the next iteration without panicking
continue;
}
};

// Check the status of the invoice
// We only want to yield invoices that have been paid
match wait_any_response.status {
WaitanyinvoiceStatus::PAID => (),
WaitanyinvoiceStatus::EXPIRED => continue,
}

last_pay_idx = wait_any_response.pay_index;

let payment_hash = wait_any_response.payment_hash.to_string();

let request_look_up = match wait_any_response.bolt12 {
// If it is a bolt12 payment we need to get the offer_id as this is what we use as the request look up.
// Since this is not returned in the wait any response,
// we need to do a second query for it.
Some(_) => {
match fetch_invoice_by_payment_hash(
&mut cln_client,
&payment_hash,
)
.await
{
Ok(Some(invoice)) => {
if let Some(local_offer_id) = invoice.local_offer_id {
local_offer_id.to_string()
} else {
continue;
}
}
Ok(None) => continue,
Err(e) => {
tracing::warn!(
"Error fetching invoice by payment hash: {e}"
);
continue;
}
}
}
None => payment_hash,
};

return Some((request_look_up, (cln_client, last_pay_idx, cancel_token, is_active)));
}
Err(e) => {
tracing::warn!("Error fetching invoice: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
}
.try_into()
.expect("Wrong response from CLN");

last_pay_idx = invoice.pay_index;

break Some((invoice.payment_hash.to_string(), (cln_client, last_pay_idx)));
}
},
)
.boxed())
.boxed();

Ok(stream)
}

async fn get_payment_quote(
Expand Down Expand Up @@ -425,3 +512,33 @@ fn cln_pays_status_to_mint_state(status: ListpaysPaysStatus) -> MeltQuoteState {
ListpaysPaysStatus::FAILED => MeltQuoteState::Failed,
}
}

async fn fetch_invoice_by_payment_hash(
cln_client: &mut cln_rpc::ClnRpc,
payment_hash: &str,
) -> Result<Option<ListinvoicesInvoices>, Error> {
match cln_client
.call(cln_rpc::Request::ListInvoices(ListinvoicesRequest {
payment_hash: Some(payment_hash.to_string()),
index: None,
invstring: None,
label: None,
limit: None,
offer_id: None,
start: None,
}))
.await
{
Ok(cln_rpc::Response::ListInvoices(invoice_response)) => {
Ok(invoice_response.invoices.first().cloned())
}
Ok(_) => {
tracing::warn!("CLN returned an unexpected response type");
Err(Error::WrongClnResponse)
}
Err(e) => {
tracing::warn!("Error fetching invoice: {e}");
Err(Error::from(e))
}
}
}
1 change: 1 addition & 0 deletions crates/cdk-fake-wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bitcoin = { version = "0.32.2", default-features = false }
cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] }
futures = { version = "0.3.28", default-features = false }
tokio = { version = "1", default-features = false }
tokio-util = { version = "0.7.11", default-features = false }
tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
thiserror = "1"
serde = "1"
Expand Down
14 changes: 14 additions & 0 deletions crates/cdk-fake-wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -33,6 +34,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio::time;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;

pub mod error;

Expand All @@ -47,6 +49,8 @@ pub struct FakeWallet {
payment_states: Arc<Mutex<HashMap<String, MeltQuoteState>>>,
failed_payment_check: Arc<Mutex<HashSet<String>>>,
payment_delay: u64,
wait_invoice_cancel_token: CancellationToken,
wait_invoice_is_active: Arc<AtomicBool>,
}

impl FakeWallet {
Expand All @@ -70,6 +74,8 @@ impl FakeWallet {
payment_states: Arc::new(Mutex::new(payment_states)),
failed_payment_check: Arc::new(Mutex::new(fail_payment_check)),
payment_delay,
wait_invoice_cancel_token: CancellationToken::new(),
wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
}
}
}
Expand Down Expand Up @@ -112,6 +118,14 @@ impl MintLightning for FakeWallet {
}
}

fn is_wait_invoice_active(&self) -> bool {
self.wait_invoice_is_active.load(Ordering::SeqCst)
}

fn cancel_wait_invoice(&self) {
self.wait_invoice_cancel_token.cancel()
}

async fn wait_any_invoice(
&self,
) -> Result<Pin<Box<dyn Stream<Item = String> + Send>>, Self::Err> {
Expand Down
1 change: 1 addition & 0 deletions crates/cdk-lnbits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false }
cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] }
futures = { version = "0.3.28", default-features = false }
tokio = { version = "1", default-features = false }
tokio-util = { version = "0.7.11", default-features = false }
tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] }
thiserror = "1"
# lnbits-rs = "0.2.0"
Expand Down
Loading

0 comments on commit 3bad82e

Please sign in to comment.