From a4c0b1e19e1ef232a5d63285732e5d80d45f9e0e Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 8 Jan 2024 08:38:58 +0100 Subject: [PATCH] Implement `LiquidityManager` event handling --- bindings/ldk_node.udl | 1 + src/error.rs | 5 ++ src/liquidity.rs | 157 ++++++++++++++++++++++++++++++++++++------ 3 files changed, 141 insertions(+), 22 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 5ebe2b4c4..087bc1c17 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -128,6 +128,7 @@ enum NodeError { "InvalidNetwork", "DuplicatePayment", "InsufficientFunds", + "LiquiditySourceUnavailable", }; [Error] diff --git a/src/error.rs b/src/error.rs index 6f68dbad3..115b4c570 100644 --- a/src/error.rs +++ b/src/error.rs @@ -63,6 +63,8 @@ pub enum Error { DuplicatePayment, /// The available funds are insufficient to complete the given operation. InsufficientFunds, + /// The given operation failed due to the required liquidity source being unavailable. + LiquiditySourceUnavailable, } impl fmt::Display for Error { @@ -106,6 +108,9 @@ impl fmt::Display for Error { Self::InsufficientFunds => { write!(f, "The available funds are insufficient to complete the given operation.") } + Self::LiquiditySourceUnavailable => { + write!(f, "The given operation failed due to the required liquidity source being unavailable.") + } } } } diff --git a/src/liquidity.rs b/src/liquidity.rs index baeb0047a..2cbcc15cd 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -1,3 +1,4 @@ +use crate::logger::{log_error, Logger}; use crate::types::{ChannelManager, KeysManager, LiquidityManager, PeerManager}; use crate::Config; @@ -5,16 +6,19 @@ use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::SocketAddress; use lightning::ln::peer_handler::CustomMessageHandler; use lightning::ln::wire::CustomMessageReader; -use lightning::util::logger::Logger; use lightning::util::persist::KVStore; use lightning_liquidity::events::Event; use lightning_liquidity::lsps0::msgs::RawLSPSMessage; use lightning_liquidity::lsps2::event::LSPS2ClientEvent; +use lightning_liquidity::lsps2::msgs::OpeningFeeParams; use bitcoin::secp256k1::PublicKey; +use tokio::sync::oneshot; + +use std::collections::HashMap; use std::ops::Deref; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; pub(crate) enum LiquiditySource where @@ -25,6 +29,8 @@ where address: SocketAddress, node_id: PublicKey, token: Option, + pending_fee_requests: Mutex>>, + pending_buy_requests: Mutex>>, channel_manager: Arc>, keys_manager: Arc, liquidity_manager: Arc>, @@ -42,10 +48,14 @@ where channel_manager: Arc>, keys_manager: Arc, liquidity_manager: Arc>, config: Arc, logger: L, ) -> Self { + let pending_fee_requests = Mutex::new(HashMap::new()); + let pending_buy_requests = Mutex::new(HashMap::new()); Self::LSPS2 { address, node_id, token, + pending_fee_requests, + pending_buy_requests, channel_manager, keys_manager, liquidity_manager, @@ -71,27 +81,114 @@ where pub(crate) async fn handle_next_event(&self) { match self { Self::None => {} - Self::LSPS2 { liquidity_manager, .. } => { - match liquidity_manager.next_event_async().await { - Event::LSPS2Client(LSPS2ClientEvent::GetInfoResponse { - jit_channel_id: _, - counterparty_node_id: _, - opening_fee_params_menu: _, - min_payment_size_msat: _, - max_payment_size_msat: _, - user_channel_id: _, - }) => {} - Event::LSPS2Client(LSPS2ClientEvent::InvoiceGenerationReady { - counterparty_node_id: _, - intercept_scid: _, - cltv_expiry_delta: _, - payment_size_msat: _, - client_trusts_lsp: _, - user_channel_id: _, - }) => {} - _ => {} + Self::LSPS2 { + liquidity_manager, + node_id, + pending_fee_requests, + pending_buy_requests, + logger, + .. + } => match liquidity_manager.next_event_async().await { + Event::LSPS2Client(LSPS2ClientEvent::GetInfoResponse { + jit_channel_id, + counterparty_node_id, + opening_fee_params_menu, + min_payment_size_msat, + max_payment_size_msat, + user_channel_id, + }) => { + if &counterparty_node_id != node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + if let Some(sender) = + pending_fee_requests.lock().unwrap().remove(&user_channel_id) + { + let response = LSPS2FeeResponse { + jit_channel_id, + opening_fee_params_menu, + min_payment_size_msat, + max_payment_size_msat, + user_channel_id, + }; + + match sender.send(response) { + Ok(()) => (), + Err(e) => { + log_error!( + logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + } + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + logger, + "Received response from liquidity service for unknown request." + ); + } } - } + Event::LSPS2Client(LSPS2ClientEvent::InvoiceGenerationReady { + counterparty_node_id, + intercept_scid, + cltv_expiry_delta, + user_channel_id, + .. + }) => { + if &counterparty_node_id != node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + if let Some(sender) = + pending_buy_requests.lock().unwrap().remove(&user_channel_id) + { + let response = + LSPS2BuyResponse { intercept_scid, cltv_expiry_delta, user_channel_id }; + + match sender.send(response) { + Ok(()) => (), + Err(e) => { + log_error!( + logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + } + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + logger, + "Received response from liquidity service for unknown request." + ); + } + } + _ => {} + }, } } } @@ -150,3 +247,19 @@ where } } } + +#[derive(Debug, Clone)] +pub(crate) struct LSPS2FeeResponse { + jit_channel_id: u128, + opening_fee_params_menu: Vec, + min_payment_size_msat: u64, + max_payment_size_msat: u64, + user_channel_id: u128, +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS2BuyResponse { + intercept_scid: u64, + cltv_expiry_delta: u32, + user_channel_id: u128, +}