From bddb7259fb86ece378bfe13dda68ec013d841a7a Mon Sep 17 00:00:00 2001 From: John Cantrell Date: Fri, 23 Sep 2022 16:18:29 -0400 Subject: [PATCH] offer chain push instead of constant polling as option --- senseicore/src/chain/manager.rs | 18 ++++++++++++++++-- senseicore/src/config.rs | 2 ++ senseicore/src/services/admin.rs | 6 ++++++ src/http/admin.rs | 18 ++++++++++++++++++ src/main.rs | 5 +++++ 5 files changed, 47 insertions(+), 2 deletions(-) diff --git a/senseicore/src/chain/manager.rs b/senseicore/src/chain/manager.rs index c2d5355..a357b69 100644 --- a/senseicore/src/chain/manager.rs +++ b/senseicore/src/chain/manager.rs @@ -1,6 +1,6 @@ use std::{ sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, time::Duration, @@ -33,6 +33,7 @@ pub struct SenseiChainManager { pub broadcaster: Arc, poller_paused: Arc, poller_running: Arc, + chain_update_available: Arc, poller_handle: Mutex>>, } @@ -48,8 +49,11 @@ impl SenseiChainManager { let listener_poller = listener.clone(); let poller_paused = Arc::new(AtomicBool::new(false)); let poller_running = Arc::new(AtomicBool::new(true)); + let chain_update_available = Arc::new(AtomicUsize::new(0)); + let chain_update_available_poller = chain_update_available.clone(); let poller_paused_poller = poller_paused.clone(); let poller_running_poller = poller_running.clone(); + let poller_handle = tokio::spawn(async move { let mut cache = UnboundedCache::new(); let chain_tip = init::validate_best_block_header(block_source_poller.clone()) @@ -59,8 +63,13 @@ impl SenseiChainManager { let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, listener_poller); while poller_running_poller.load(Ordering::Relaxed) { - if !poller_paused_poller.load(Ordering::Relaxed) { + let updates_available = chain_update_available_poller.load(Ordering::Relaxed) > 0; + let paused = poller_paused_poller.load(Ordering::Relaxed); + if (config.poll_for_chain_updates || updates_available) && !paused { let _tip = spv_client.poll_best_tip().await.unwrap(); + if updates_available { + chain_update_available_poller.fetch_sub(1, Ordering::Relaxed); + } } tokio::time::sleep(Duration::from_secs(1)).await; } @@ -71,6 +80,7 @@ impl SenseiChainManager { listener, poller_paused, poller_running, + chain_update_available, block_source, fee_estimator: Arc::new(SenseiFeeEstimator { fee_estimator }), broadcaster, @@ -78,6 +88,10 @@ impl SenseiChainManager { }) } + pub fn chain_updated(&self) { + self.chain_update_available.fetch_add(1, Ordering::Relaxed); + } + pub async fn stop(&self) { self.poller_running.store(false, Ordering::Relaxed); let handle = self.poller_handle.lock().await.take().unwrap(); diff --git a/senseicore/src/config.rs b/senseicore/src/config.rs index 568dd77..8f107f9 100644 --- a/senseicore/src/config.rs +++ b/senseicore/src/config.rs @@ -36,6 +36,7 @@ pub struct SenseiConfig { pub http_notifier_url: Option, pub http_notifier_token: Option, pub region: Option, + pub poll_for_chain_updates: bool, } impl Default for SenseiConfig { @@ -63,6 +64,7 @@ impl Default for SenseiConfig { http_notifier_url: None, http_notifier_token: None, region: None, + poll_for_chain_updates: true, } } } diff --git a/senseicore/src/services/admin.rs b/senseicore/src/services/admin.rs index f54dab2..d52e134 100644 --- a/senseicore/src/services/admin.rs +++ b/senseicore/src/services/admin.rs @@ -135,6 +135,7 @@ pub enum AdminRequest { msg_hex: String, }, GetNetworkGraph {}, + ChainUpdated {}, } #[derive(Serialize, Debug)] @@ -196,6 +197,7 @@ pub enum AdminResponse { nodes: Vec, channels: Vec, }, + ChainUpdated {}, Error(Error), } @@ -603,6 +605,10 @@ impl AdminService { let _res = self.p2p.p2p_gossip.handle_channel_update(&msg); Ok(AdminResponse::GossipChannelUpdate {}) } + AdminRequest::ChainUpdated {} => { + self.chain_manager.chain_updated(); + Ok(AdminResponse::ChainUpdated {}) + } AdminRequest::GetNetworkGraph {} => { let graph = self.p2p.network_graph.read_only(); let channels = graph.channels(); diff --git a/src/http/admin.rs b/src/http/admin.rs index 66dad31..aba321a 100644 --- a/src/http/admin.rs +++ b/src/http/admin.rs @@ -345,6 +345,7 @@ pub fn add_routes(router: Router) -> Router { .route("/v1/login", post(login_admin)) .route("/v1/logout", post(logout)) .route("/v1/peers/connect", post(connect_gossip_peer)) + .route("/v1/chain/updated", post(chain_updated)) .route("/v1/ldk/network/route", post(find_route)) .route("/v1/ldk/network/path/successful", post(path_successful)) .route("/v1/ldk/network/path/failed", post(path_failed)) @@ -405,6 +406,23 @@ pub async fn connect_gossip_peer( } } +pub async fn chain_updated( + Extension(admin_service): Extension>, + cookies: Cookies, + AuthHeader { macaroon: _, token }: AuthHeader, +) -> Result, StatusCode> { + let authenticated = authenticate_request(&admin_service, "chain", &cookies, token).await?; + + if authenticated { + match admin_service.call(AdminRequest::ChainUpdated {}).await { + Ok(response) => Ok(Json(response)), + Err(_err) => Err(StatusCode::UNAUTHORIZED), + } + } else { + Err(StatusCode::UNAUTHORIZED) + } +} + pub async fn find_route( Extension(admin_service): Extension>, cookies: Cookies, diff --git a/src/main.rs b/src/main.rs index 74b03c6..b581199 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,6 +109,8 @@ struct SenseiArgs { http_notifier_token: Option, #[clap(long, env = "REGION")] region: Option, + #[clap(long, env = "POLL_FOR_CHAIN_UPDATES")] + poll_for_chain_updates: Option, } pub type AdminRequestResponse = (AdminRequest, Sender); @@ -198,6 +200,9 @@ fn main() { if let Some(region) = args.region { config.region = Some(region) } + if let Some(poll_for_chain_updates) = args.poll_for_chain_updates { + config.poll_for_chain_updates = poll_for_chain_updates + } if !config.database_url.starts_with("postgres:") && !config.database_url.starts_with("mysql:") { let sqlite_path = format!("{}/{}/{}", sensei_dir, config.network, config.database_url);