Skip to content

Commit

Permalink
Merge pull request #121 from L2-Technology/rapid-gossip-sync
Browse files Browse the repository at this point in the history
add support for rapid gossip sync
  • Loading branch information
johncantrell97 authored Sep 28, 2022
2 parents 3cb1345 + c5df758 commit bf6f29c
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 106 deletions.
30 changes: 30 additions & 0 deletions senseicore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ use bitcoin::Network;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Clone)]
pub enum P2PConfig {
Local,
Remote(String, String),
RapidGossipSync(String),
}

#[derive(Clone, Serialize, Deserialize)]
pub struct SenseiConfig {
#[serde(skip)]
Expand All @@ -37,6 +44,7 @@ pub struct SenseiConfig {
pub http_notifier_token: Option<String>,
pub region: Option<String>,
pub poll_for_chain_updates: bool,
pub rapid_gossip_sync_server_host: Option<String>,
}

impl Default for SenseiConfig {
Expand Down Expand Up @@ -65,6 +73,7 @@ impl Default for SenseiConfig {
http_notifier_token: None,
region: None,
poll_for_chain_updates: true,
rapid_gossip_sync_server_host: None,
}
}
}
Expand Down Expand Up @@ -116,4 +125,25 @@ impl SenseiConfig {
)
.expect("failed to write config");
}

pub fn get_p2p_config(&self) -> P2PConfig {
if self.remote_p2p_configured() {
P2PConfig::Remote(
self.remote_p2p_host.as_ref().unwrap().clone(),
self.remote_p2p_token.as_ref().unwrap().clone(),
)
} else if self.rapid_gossip_sync_configured() {
P2PConfig::RapidGossipSync(self.rapid_gossip_sync_server_host.as_ref().unwrap().clone())
} else {
P2PConfig::Local
}
}

pub fn remote_p2p_configured(&self) -> bool {
self.remote_p2p_host.is_some() && self.remote_p2p_token.is_some()
}

pub fn rapid_gossip_sync_configured(&self) -> bool {
self.rapid_gossip_sync_server_host.is_some()
}
}
76 changes: 69 additions & 7 deletions senseicore/src/p2p/background_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,63 @@ use std::time::{Duration, Instant};
use crate::node::{NetworkGraph, RoutingPeerManager};
use crate::persist::SenseiPersister;

use lightning_rapid_gossip_sync::RapidGossipSync;

use super::router::AnyScorer;

const PING_TIMER: u64 = 10;
/// Prune the network graph of stale entries hourly.
const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
const SCORER_PERSIST_TIMER: u64 = 30;
const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
const SCORER_PERSIST_TIMER: u64 = 30;
const RAPID_GOSSIP_SYNC_TIMER: u64 = 60 * 60;
const FIRST_RAPID_GOSSIP_SYNC_TIMER: u64 = 0;

pub struct BackgroundProcessor {
peer_manager: Arc<RoutingPeerManager>,
peer_manager: Option<Arc<RoutingPeerManager>>,
scorer: Arc<Mutex<AnyScorer>>,
network_graph: Arc<NetworkGraph>,
persister: Arc<SenseiPersister>,
stop_signal: Arc<AtomicBool>,
rapid_gossip_sync_server_host: Option<String>,
}

impl BackgroundProcessor {
pub fn new(
peer_manager: Arc<RoutingPeerManager>,
peer_manager: Option<Arc<RoutingPeerManager>>,
scorer: Arc<Mutex<AnyScorer>>,
network_graph: Arc<NetworkGraph>,
persister: Arc<SenseiPersister>,
stop_signal: Arc<AtomicBool>,
rapid_gossip_sync_server_host: Option<String>,
) -> Self {
Self {
peer_manager,
scorer,
network_graph,
persister,
stop_signal,
rapid_gossip_sync_server_host,
}
}

pub async fn process(&self) {
let mut last_prune_call = Instant::now();
let mut last_scorer_persist_call = Instant::now();
let mut last_ping_call = Instant::now();
let mut last_rgs_sync_call = Instant::now();
let mut have_pruned = false;
let mut have_rapid_gossip_synced = false;
let mut interval = tokio::time::interval(Duration::from_millis(50));
loop {
interval.tick().await;
if last_ping_call.elapsed().as_secs() > PING_TIMER {
self.peer_manager.process_events();
self.peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();

if let Some(peer_manager) = &self.peer_manager {
if last_ping_call.elapsed().as_secs() > PING_TIMER {
peer_manager.process_events();
peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();
}
}

// Note that we want to run a graph prune once not long after startup before
Expand All @@ -73,6 +85,56 @@ impl BackgroundProcessor {
have_pruned = true;
}

if let Some(rapid_gossip_sync_server_host) = &self.rapid_gossip_sync_server_host {
if last_rgs_sync_call.elapsed().as_secs()
> if have_rapid_gossip_synced {
RAPID_GOSSIP_SYNC_TIMER
} else {
FIRST_RAPID_GOSSIP_SYNC_TIMER
}
{
let rapid_sync = RapidGossipSync::new(self.network_graph.clone());
let last_rapid_gossip_sync_timestamp = self
.network_graph
.get_last_rapid_gossip_sync_timestamp()
.unwrap_or(0);
let rapid_gossip_sync_uri = format!(
"{}/snapshot/{}",
rapid_gossip_sync_server_host, last_rapid_gossip_sync_timestamp
);
let update_data = match reqwest::get(&rapid_gossip_sync_uri).await {
Ok(response) => {
match response.bytes().await {
Ok(bytes) => Some(bytes.to_vec()),
Err(e) => {
eprintln!("failed to convert rapid gossip sync response to bytes: {:?}", e);
None
}
}
}
Err(e) => {
eprintln!(
"failed to fetch rapid gossip sync update at {} with error: {:?}",
rapid_gossip_sync_uri, e
);
None
}
};

if let Some(update_data) = update_data {
if let Err(e) = rapid_sync.update_network_graph(&update_data[..]) {
eprintln!(
"failed to update network graph with rapid gossip sync data: {:?}",
e
);
}
}

last_rgs_sync_call = Instant::now();
have_rapid_gossip_synced = true;
}
}

if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
let scorer = self.scorer.lock().unwrap();
if let AnyScorer::Local(scorer) = scorer.deref() {
Expand Down
34 changes: 34 additions & 0 deletions senseicore/src/p2p/bubble_gossip_route_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::router::RemoteSenseiInfo;
pub enum AnyP2PGossipHandler {
Remote(RemoteGossipMessageHandler),
Local(NetworkGraphMessageHandler),
None,
}

impl AnyP2PGossipHandler {
Expand All @@ -29,6 +30,9 @@ impl MessageSendEventsProvider for AnyP2PGossipHandler {
AnyP2PGossipHandler::Local(local_handler) => {
local_handler.get_and_clear_pending_msg_events()
}
AnyP2PGossipHandler::None => {
vec![]
}
}
}
}
Expand All @@ -41,6 +45,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
match self {
AnyP2PGossipHandler::Remote(handler) => handler.handle_node_announcement(msg),
AnyP2PGossipHandler::Local(handler) => handler.handle_node_announcement(msg),
AnyP2PGossipHandler::None => {
panic!("handle_node_announcement called without a P2P Gossip Handler")
}
}
}

Expand All @@ -51,6 +58,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
match self {
AnyP2PGossipHandler::Remote(handler) => handler.handle_channel_announcement(msg),
AnyP2PGossipHandler::Local(handler) => handler.handle_channel_announcement(msg),
AnyP2PGossipHandler::None => {
panic!("handle_channel_announcement called without a P2P Gossip Handler")
}
}
}

Expand All @@ -61,6 +71,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
match self {
AnyP2PGossipHandler::Remote(handler) => handler.handle_channel_update(msg),
AnyP2PGossipHandler::Local(handler) => handler.handle_channel_update(msg),
AnyP2PGossipHandler::None => {
panic!("handle_channel_update called without a P2P Gossip Handler")
}
}
}

Expand All @@ -80,6 +93,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
AnyP2PGossipHandler::Local(handler) => {
handler.get_next_channel_announcements(starting_point, batch_amount)
}
AnyP2PGossipHandler::None => {
panic!("get_next_channel_announcements called without a P2P Gossip Handler")
}
}
}

Expand All @@ -95,6 +111,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
AnyP2PGossipHandler::Local(handler) => {
handler.get_next_node_announcements(starting_point, batch_amount)
}
AnyP2PGossipHandler::None => {
panic!("get_next_node_announcements called without a P2P Gossip Handler")
}
}
}

Expand All @@ -106,6 +125,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
match self {
AnyP2PGossipHandler::Remote(handler) => handler.peer_connected(their_node_id, init),
AnyP2PGossipHandler::Local(handler) => handler.peer_connected(their_node_id, init),
AnyP2PGossipHandler::None => {
panic!("peer_connected called without a P2P Gossip Handler")
}
}
}

Expand All @@ -121,6 +143,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
AnyP2PGossipHandler::Local(handler) => {
handler.handle_reply_channel_range(their_node_id, msg)
}
AnyP2PGossipHandler::None => {
panic!("handle_reply_channel_range called without a P2P Gossip Handler")
}
}
}

Expand All @@ -136,6 +161,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
AnyP2PGossipHandler::Local(handler) => {
handler.handle_reply_short_channel_ids_end(their_node_id, msg)
}
AnyP2PGossipHandler::None => {
panic!("handle_reply_short_channel_ids_end called without a P2P Gossip Handler")
}
}
}

Expand All @@ -151,6 +179,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
AnyP2PGossipHandler::Local(handler) => {
handler.handle_query_channel_range(their_node_id, msg)
}
AnyP2PGossipHandler::None => {
panic!("handle_query_channel_range called without a P2P Gossip Handler")
}
}
}

Expand All @@ -166,6 +197,9 @@ impl RoutingMessageHandler for AnyP2PGossipHandler {
AnyP2PGossipHandler::Local(handler) => {
handler.handle_query_short_channel_ids(their_node_id, msg)
}
AnyP2PGossipHandler::None => {
panic!("handle_query_short_channel_ids called without a P2P Gossip Handler")
}
}
}
}
Expand Down
Loading

0 comments on commit bf6f29c

Please sign in to comment.