From c6abf09027b1a68757c2707619f967b24d7bb7fd Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 14 Jan 2025 11:15:21 +0100 Subject: [PATCH] `liquidity`: Allow setting `process_events` callback in `c_bindings` To trigger message processing, we previously had the user set a callback to `PeerManager::process_events` via an `Fn()` callback. This is however not supported by `c_bindings`. Here, we therefore introduce as `ProcessMesssagesCallback` trait that can be used via `LiquidityManager::set_process_msgs_callback_fn`, which is exposed in `c_bindings`. --- lightning-liquidity/src/manager.rs | 73 +++++------------------- lightning-liquidity/src/message_queue.rs | 38 +++++++----- lightning-liquidity/tests/common/mod.rs | 5 +- 3 files changed, 42 insertions(+), 74 deletions(-) diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 1e467c302de..6b6b146af8a 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -7,7 +7,7 @@ use crate::lsps0::ser::{ LSPS_MESSAGE_TYPE_ID, }; use crate::lsps0::service::LSPS0ServiceHandler; -use crate::message_queue::MessageQueue; +use crate::message_queue::{MessageQueue, ProcessMessagesCallback}; use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler}; use crate::lsps1::msgs::LSPS1Message; @@ -17,7 +17,7 @@ use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler}; use crate::lsps2::client::{LSPS2ClientConfig, LSPS2ClientHandler}; use crate::lsps2::msgs::LSPS2Message; use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler}; -use crate::prelude::{new_hash_map, new_hash_set, HashMap, HashSet, ToString, Vec}; +use crate::prelude::{new_hash_map, new_hash_set, Box, HashMap, HashSet, ToString, Vec}; use crate::sync::{Arc, Mutex, RwLock}; use lightning::chain::{self, BestBlock, Confirm, Filter, Listen}; @@ -310,74 +310,27 @@ where { /// let process_msgs_pm = Arc::clone(&my_peer_manager); /// let process_msgs_callback = move || process_msgs_pm.process_events(); /// - /// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback); + /// my_liquidity_manager.set_process_msgs_callback(Box::new(process_msgs_callback)); /// # } /// ``` /// /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events - #[cfg(feature = "std")] - pub fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) { - self.pending_messages.set_process_msgs_callback(callback) + pub fn set_process_msgs_callback(&self, callback: Box) { + self.pending_messages.set_process_msgs_callback(callback); } /// Allows to set a callback that will be called after new messages are pushed to the message /// queue. /// - /// Usually, you'll want to use this to call [`PeerManager::process_events`] to clear the - /// message queue. For example: + /// C bindings don't (currently) know how to map `Box`, and while it could add the + /// following wrapper, doing it in the bindings is currently much more work than simply doing it + /// here. /// - /// ``` - /// # use lightning::io; - /// # use lightning_liquidity::LiquidityManager; - /// # use std::sync::{Arc, RwLock}; - /// # use std::sync::atomic::{AtomicBool, Ordering}; - /// # use std::time::SystemTime; - /// # struct MyStore {} - /// # impl lightning::util::persist::KVStore for MyStore { - /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } - /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } - /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } - /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } - /// # } - /// # struct MyEntropySource {} - /// # impl lightning::sign::EntropySource for MyEntropySource { - /// # fn get_secure_random_bytes(&self) -> [u8; 32] { [0u8; 32] } - /// # } - /// # struct MyEventHandler {} - /// # impl MyEventHandler { - /// # async fn handle_event(&self, _: lightning::events::Event) {} - /// # } - /// # #[derive(Eq, PartialEq, Clone, Hash)] - /// # struct MySocketDescriptor {} - /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor { - /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } - /// # fn disconnect_socket(&mut self) {} - /// # } - /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface; - /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator; - /// # type MyNodeSigner = dyn lightning::sign::NodeSigner; - /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup; - /// # type MyFilter = dyn lightning::chain::Filter; - /// # type MyLogger = dyn lightning::util::logger::Logger; - /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; - /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; - /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; - /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; - /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; - /// # type MyScorer = RwLock, Arc>>; - /// # type MyLiquidityManager = LiquidityManager, Arc, Arc>; - /// # fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_logger: Arc, my_peer_manager: Arc, my_liquidity_manager: Arc) { - /// let process_msgs_pm = Arc::clone(&my_peer_manager); - /// let process_msgs_callback = move || process_msgs_pm.process_events(); - /// - /// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback); - /// # } - /// ``` - /// - /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events - #[cfg(not(feature = "std"))] - pub fn set_process_msgs_callback(&self, callback: impl Fn() + 'static) { - self.pending_messages.set_process_msgs_callback(callback) + /// Hence we simply allow setting a callback function that will be set via + /// [`Self::set_process_msgs_callback`] internally. + #[cfg(c_bindings)] + pub fn set_process_msgs_callback_fn(&self, callback: F) { + self.set_process_msgs_callback(Box::new(callback)); } /// Blocks the current thread until next event is ready and returns it. diff --git a/lightning-liquidity/src/message_queue.rs b/lightning-liquidity/src/message_queue.rs index 89dab8a318e..7b61a87bcd4 100644 --- a/lightning-liquidity/src/message_queue.rs +++ b/lightning-liquidity/src/message_queue.rs @@ -11,10 +11,7 @@ use bitcoin::secp256k1::PublicKey; /// [`LiquidityManager`]: crate::LiquidityManager pub struct MessageQueue { queue: Mutex>, - #[cfg(feature = "std")] - process_msgs_callback: RwLock>>, - #[cfg(not(feature = "std"))] - process_msgs_callback: RwLock>>, + process_msgs_callback: RwLock>>, } impl MessageQueue { @@ -24,14 +21,8 @@ impl MessageQueue { Self { queue, process_msgs_callback } } - #[cfg(feature = "std")] - pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) { - *self.process_msgs_callback.write().unwrap() = Some(Box::new(callback)); - } - - #[cfg(not(feature = "std"))] - pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + 'static) { - *self.process_msgs_callback.write().unwrap() = Some(Box::new(callback)); + pub(crate) fn set_process_msgs_callback(&self, callback: Box) { + *self.process_msgs_callback.write().unwrap() = Some(callback); } pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> { @@ -45,7 +36,28 @@ impl MessageQueue { } if let Some(process_msgs_callback) = self.process_msgs_callback.read().unwrap().as_ref() { - (process_msgs_callback)() + process_msgs_callback.call() } } } + +macro_rules! define_callback { ($($bounds: path),*) => { +/// A callback which will be called to trigger network message processing. +/// +/// Usually, this should call [`PeerManager::process_events`]. +/// +/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events +pub trait ProcessMessagesCallback : $($bounds +)* { + /// The method which is called. + fn call(&self); +} + +impl ProcessMessagesCallback for F { + fn call(&self) { (self)(); } +} +} } + +#[cfg(feature = "std")] +define_callback!(Send, Sync); +#[cfg(not(feature = "std"))] +define_callback!(); diff --git a/lightning-liquidity/tests/common/mod.rs b/lightning-liquidity/tests/common/mod.rs index 8b8507a9f14..faf4cc5a68a 100644 --- a/lightning-liquidity/tests/common/mod.rs +++ b/lightning-liquidity/tests/common/mod.rs @@ -478,7 +478,10 @@ pub(crate) fn create_liquidity_node( let process_msgs_flag = Arc::clone(&check_msgs_processed); let process_msgs_callback = move || process_msgs_flag.store(true, Ordering::Release); - liquidity_manager.set_process_msgs_callback(process_msgs_callback); + #[cfg(not(c_bindings))] + liquidity_manager.set_process_msgs_callback(Box::new(process_msgs_callback)); + #[cfg(c_bindings)] + liquidity_manager.set_process_msgs_callback_fn(process_msgs_callback); Node { channel_manager,