diff --git a/src/builder.rs b/src/builder.rs index 733a99960..7376a7997 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -973,6 +973,12 @@ fn build_with_store_internal( liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager))); + gossip_source.set_gossip_verifier( + Arc::clone(&chain_source), + Arc::clone(&peer_manager), + Arc::clone(&runtime), + ); + let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); diff --git a/src/chain/bitcoind_rpc.rs b/src/chain/bitcoind_rpc.rs index 6e7360601..ccebcf779 100644 --- a/src/chain/bitcoind_rpc.rs +++ b/src/chain/bitcoind_rpc.rs @@ -48,6 +48,10 @@ impl BitcoindRpcClient { Self { rpc_client, latest_mempool_timestamp } } + pub(crate) fn rpc_client(&self) -> Arc { + Arc::clone(&self.rpc_client) + } + pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result { let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx); let tx_json = serde_json::json!(tx_serialized); diff --git a/src/chain/mod.rs b/src/chain/mod.rs index af77e6bee..3670369a6 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -33,6 +33,7 @@ use lightning_transaction_sync::EsploraSyncClient; use lightning_block_sync::init::{synchronize_listeners, validate_best_block_header}; use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader}; +use lightning_block_sync::rpc::RpcClient; use lightning_block_sync::SpvClient; use bdk_esplora::EsploraAsyncExt; @@ -192,6 +193,13 @@ impl ChainSource { } } + pub(crate) fn as_utxo_source(&self) -> Option> { + match self { + Self::BitcoindRpc { bitcoind_rpc_client, .. } => Some(bitcoind_rpc_client.rpc_client()), + _ => None, + } + } + pub(crate) async fn continuously_sync_wallets( &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, channel_manager: Arc, chain_monitor: Arc, diff --git a/src/gossip.rs b/src/gossip.rs index 450b5b5ee..82e803b5e 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -5,20 +5,23 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use crate::chain::ChainSource; use crate::config::RGS_SYNC_TIMEOUT_SECS; -use crate::logger::{log_trace, FilesystemLogger, Logger}; -use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync}; +use crate::logger::{log_error, log_trace, FilesystemLogger, Logger}; +use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync}; use crate::Error; -use lightning::routing::utxo::UtxoLookup; +use lightning_block_sync::gossip::{FutureSpawner, GossipVerifier}; +use std::future::Future; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; pub(crate) enum GossipSource { P2PNetwork { gossip_sync: Arc, + logger: Arc, }, RapidGossipSync { gossip_sync: Arc, @@ -30,12 +33,8 @@ pub(crate) enum GossipSource { impl GossipSource { pub fn new_p2p(network_graph: Arc, logger: Arc) -> Self { - let gossip_sync = Arc::new(P2PGossipSync::new( - network_graph, - None::>, - logger, - )); - Self::P2PNetwork { gossip_sync } + let gossip_sync = Arc::new(P2PGossipSync::new(network_graph, None, Arc::clone(&logger))); + Self::P2PNetwork { gossip_sync, logger } } pub fn new_rgs( @@ -58,9 +57,30 @@ impl GossipSource { } } + pub(crate) fn set_gossip_verifier( + &self, chain_source: Arc, peer_manager: Arc, + runtime: Arc>>>, + ) { + match self { + _ => (), + Self::P2PNetwork { gossip_sync, logger } => { + if let Some(utxo_source) = chain_source.as_utxo_source() { + let spawner = RuntimeSpawner::new(Arc::clone(&runtime), Arc::clone(&logger)); + let gossip_verifier = Arc::new(GossipVerifier::new( + utxo_source, + spawner, + Arc::clone(gossip_sync), + peer_manager, + )); + gossip_sync.add_utxo_lookup(Some(gossip_verifier)); + } + }, + } + } + pub async fn update_rgs_snapshot(&self) -> Result { match self { - Self::P2PNetwork { gossip_sync: _ } => Ok(0), + Self::P2PNetwork { gossip_sync: _, .. } => Ok(0), Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => { let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire); let query_url = format!("{}/{}", server_url, query_timestamp); @@ -101,3 +121,30 @@ impl GossipSource { } } } + +pub(crate) struct RuntimeSpawner { + runtime: Arc>>>, + logger: Arc, +} + +impl RuntimeSpawner { + pub(crate) fn new( + runtime: Arc>>>, logger: Arc, + ) -> Self { + Self { runtime, logger } + } +} + +impl FutureSpawner for RuntimeSpawner { + fn spawn + Send + 'static>(&self, future: T) { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { + log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen."); + debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen."); + return; + } + + let runtime = rt_lock.as_ref().unwrap(); + runtime.spawn(future); + } +}