Skip to content

Commit

Permalink
Implement LiquidityManager event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Jan 9, 2024
1 parent 0ed98a0 commit a4c0b1e
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 22 deletions.
1 change: 1 addition & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ enum NodeError {
"InvalidNetwork",
"DuplicatePayment",
"InsufficientFunds",
"LiquiditySourceUnavailable",
};

[Error]
Expand Down
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub enum Error {
DuplicatePayment,
/// The available funds are insufficient to complete the given operation.
InsufficientFunds,
/// The given operation failed due to the required liquidity source being unavailable.
LiquiditySourceUnavailable,
}

impl fmt::Display for Error {
Expand Down Expand Up @@ -106,6 +108,9 @@ impl fmt::Display for Error {
Self::InsufficientFunds => {
write!(f, "The available funds are insufficient to complete the given operation.")
}
Self::LiquiditySourceUnavailable => {
write!(f, "The given operation failed due to the required liquidity source being unavailable.")
}
}
}
}
Expand Down
157 changes: 135 additions & 22 deletions src/liquidity.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use crate::logger::{log_error, Logger};
use crate::types::{ChannelManager, KeysManager, LiquidityManager, PeerManager};
use crate::Config;

use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::SocketAddress;
use lightning::ln::peer_handler::CustomMessageHandler;
use lightning::ln::wire::CustomMessageReader;
use lightning::util::logger::Logger;
use lightning::util::persist::KVStore;
use lightning_liquidity::events::Event;
use lightning_liquidity::lsps0::msgs::RawLSPSMessage;
use lightning_liquidity::lsps2::event::LSPS2ClientEvent;
use lightning_liquidity::lsps2::msgs::OpeningFeeParams;

use bitcoin::secp256k1::PublicKey;

use tokio::sync::oneshot;

use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

pub(crate) enum LiquiditySource<K: KVStore + Sync + Send + 'static, L: Deref>
where
Expand All @@ -25,6 +29,8 @@ where
address: SocketAddress,
node_id: PublicKey,
token: Option<String>,
pending_fee_requests: Mutex<HashMap<u128, oneshot::Sender<LSPS2FeeResponse>>>,
pending_buy_requests: Mutex<HashMap<u128, oneshot::Sender<LSPS2BuyResponse>>>,
channel_manager: Arc<ChannelManager<K>>,
keys_manager: Arc<KeysManager>,
liquidity_manager: Arc<LiquidityManager<K>>,
Expand All @@ -42,10 +48,14 @@ where
channel_manager: Arc<ChannelManager<K>>, keys_manager: Arc<KeysManager>,
liquidity_manager: Arc<LiquidityManager<K>>, config: Arc<Config>, logger: L,
) -> Self {
let pending_fee_requests = Mutex::new(HashMap::new());
let pending_buy_requests = Mutex::new(HashMap::new());
Self::LSPS2 {
address,
node_id,
token,
pending_fee_requests,
pending_buy_requests,
channel_manager,
keys_manager,
liquidity_manager,
Expand All @@ -71,27 +81,114 @@ where
pub(crate) async fn handle_next_event(&self) {
match self {
Self::None => {}
Self::LSPS2 { liquidity_manager, .. } => {
match liquidity_manager.next_event_async().await {
Event::LSPS2Client(LSPS2ClientEvent::GetInfoResponse {
jit_channel_id: _,
counterparty_node_id: _,
opening_fee_params_menu: _,
min_payment_size_msat: _,
max_payment_size_msat: _,
user_channel_id: _,
}) => {}
Event::LSPS2Client(LSPS2ClientEvent::InvoiceGenerationReady {
counterparty_node_id: _,
intercept_scid: _,
cltv_expiry_delta: _,
payment_size_msat: _,
client_trusts_lsp: _,
user_channel_id: _,
}) => {}
_ => {}
Self::LSPS2 {
liquidity_manager,
node_id,
pending_fee_requests,
pending_buy_requests,
logger,
..
} => match liquidity_manager.next_event_async().await {
Event::LSPS2Client(LSPS2ClientEvent::GetInfoResponse {
jit_channel_id,
counterparty_node_id,
opening_fee_params_menu,
min_payment_size_msat,
max_payment_size_msat,
user_channel_id,
}) => {
if &counterparty_node_id != node_id {
debug_assert!(
false,
"Received response from unexpected LSP counterparty. This should never happen."
);
log_error!(
logger,
"Received response from unexpected LSP counterparty. This should never happen."
);
return;
}

if let Some(sender) =
pending_fee_requests.lock().unwrap().remove(&user_channel_id)
{
let response = LSPS2FeeResponse {
jit_channel_id,
opening_fee_params_menu,
min_payment_size_msat,
max_payment_size_msat,
user_channel_id,
};

match sender.send(response) {
Ok(()) => (),
Err(e) => {
log_error!(
logger,
"Failed to handle response from liquidity service: {:?}",
e
);
}
}
} else {
debug_assert!(
false,
"Received response from liquidity service for unknown request."
);
log_error!(
logger,
"Received response from liquidity service for unknown request."
);
}
}
}
Event::LSPS2Client(LSPS2ClientEvent::InvoiceGenerationReady {
counterparty_node_id,
intercept_scid,
cltv_expiry_delta,
user_channel_id,
..
}) => {
if &counterparty_node_id != node_id {
debug_assert!(
false,
"Received response from unexpected LSP counterparty. This should never happen."
);
log_error!(
logger,
"Received response from unexpected LSP counterparty. This should never happen."
);
return;
}

if let Some(sender) =
pending_buy_requests.lock().unwrap().remove(&user_channel_id)
{
let response =
LSPS2BuyResponse { intercept_scid, cltv_expiry_delta, user_channel_id };

match sender.send(response) {
Ok(()) => (),
Err(e) => {
log_error!(
logger,
"Failed to handle response from liquidity service: {:?}",
e
);
}
}
} else {
debug_assert!(
false,
"Received response from liquidity service for unknown request."
);
log_error!(
logger,
"Received response from liquidity service for unknown request."
);
}
}
_ => {}
},
}
}
}
Expand Down Expand Up @@ -150,3 +247,19 @@ where
}
}
}

#[derive(Debug, Clone)]
pub(crate) struct LSPS2FeeResponse {
jit_channel_id: u128,
opening_fee_params_menu: Vec<OpeningFeeParams>,
min_payment_size_msat: u64,
max_payment_size_msat: u64,
user_channel_id: u128,
}

#[derive(Debug, Clone)]
pub(crate) struct LSPS2BuyResponse {
intercept_scid: u64,
cltv_expiry_delta: u32,
user_channel_id: u128,
}

0 comments on commit a4c0b1e

Please sign in to comment.