From 33bb4a9d7d1860686ca3a9740d98cfa697e181be Mon Sep 17 00:00:00 2001 From: Rob N Date: Thu, 2 Jan 2025 10:58:03 -1000 Subject: [PATCH 1/2] doc(client): mention `panic` condition for blocking calls --- src/core/client.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/core/client.rs b/src/core/client.rs index d3d5fd8..333579d 100644 --- a/src/core/client.rs +++ b/src/core/client.rs @@ -74,6 +74,10 @@ impl EventSender { /// # Errors /// /// If the node has already stopped running. + /// + /// # Panics + /// + /// When called within an asynchronus context (e.g `tokio::main`). pub fn shutdown_blocking(&self) -> Result<(), ClientError> { self.ntx .blocking_send(ClientMessage::Shutdown) @@ -119,6 +123,10 @@ impl EventSender { /// # Errors /// /// If the node has stopped running. + /// + /// # Panics + /// + /// When called within an asynchronus context (e.g `tokio::main`). pub fn broadcast_tx_blocking(&self, tx: TxBroadcast) -> Result<(), ClientError> { self.ntx .blocking_send(ClientMessage::Broadcast(tx)) @@ -145,6 +153,10 @@ impl EventSender { /// # Errors /// /// If the node has stopped running. + /// + /// # Panics + /// + /// When called within an asynchronus context (e.g `tokio::main`). #[cfg(not(feature = "filter-control"))] pub fn add_script_blocking(&self, script: impl Into) -> Result<(), ClientError> { self.ntx @@ -182,6 +194,10 @@ impl EventSender { /// # Errors /// /// If the node has stopped running. + /// + /// # Panics + /// + /// When called within an asynchronus context (e.g `tokio::main`). pub fn get_header_blocking(&self, height: u32) -> Result { let (tx, rx) = tokio::sync::oneshot::channel::>(); let message = HeaderRequest::new(tx, height); From f6c24f4182fa1eb073eb0835d47a2d1580c92c09 Mon Sep 17 00:00:00 2001 From: Rob N Date: Fri, 3 Jan 2025 08:51:22 -1000 Subject: [PATCH 2/2] feat(client): request range of headers from persistence `RangeBounds` is not object safe, so it may not be used to request a range of heights. --- src/chain/chain.rs | 18 ++++++++++++++++++ src/core/client.rs | 23 +++++++++++++++++++++-- src/core/messages.rs | 19 ++++++++++++++++++- src/core/node.rs | 8 ++++++++ tests/core.rs | 4 ++++ 5 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/chain/chain.rs b/src/chain/chain.rs index 463ba30..8ae42b1 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -1,6 +1,7 @@ extern crate alloc; use std::{ collections::{BTreeMap, HashSet}, + ops::Range, sync::Arc, }; @@ -894,6 +895,23 @@ impl Chain { } } + pub(crate) async fn fetch_header_range( + &self, + range: Range, + ) -> Result, HeaderPersistenceError> { + let mut db = self.db.lock().await; + let range_opt = db.load(range).await; + if range_opt.is_err() { + self.dialog + .send_warning(Warning::FailedPersistance { + warning: "Unexpected error fetching a range of headers from the header store" + .to_string(), + }) + .await; + } + range_opt.map_err(HeaderPersistenceError::Database) + } + // Reset the compact filter queue because we received a new block pub(crate) fn clear_compact_filter_queue(&mut self) { self.cf_header_chain.clear_queue(); diff --git a/src/core/client.rs b/src/core/client.rs index 333579d..27f7510 100644 --- a/src/core/client.rs +++ b/src/core/client.rs @@ -4,7 +4,7 @@ use bitcoin::BlockHash; #[cfg(not(feature = "filter-control"))] use bitcoin::ScriptBuf; use bitcoin::Transaction; -use std::time::Duration; +use std::{collections::BTreeMap, ops::Range, time::Duration}; use tokio::sync::mpsc; pub use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; @@ -16,7 +16,7 @@ use crate::{Event, Log, TrustedPeer, TxBroadcast}; use super::{error::FetchBlockError, messages::BlockRequest, BlockReceiver, IndexedBlock}; use super::{ error::{ClientError, FetchHeaderError}, - messages::{ClientMessage, HeaderRequest}, + messages::{BatchHeaderRequest, ClientMessage, HeaderRequest}, }; /// A [`Client`] allows for communication with a running node. @@ -208,6 +208,25 @@ impl EventSender { .map_err(|_| FetchHeaderError::RecvError)? } + /// Get a range of headers by the specified range. + /// + /// # Errors + /// + /// If the node has stopped running. + pub async fn get_header_range( + &self, + range: Range, + ) -> Result, FetchHeaderError> { + let (tx, rx) = + tokio::sync::oneshot::channel::, FetchHeaderError>>(); + let message = BatchHeaderRequest::new(tx, range); + self.ntx + .send(ClientMessage::GetHeaderBatch(message)) + .await + .map_err(|_| FetchHeaderError::SendError)?; + rx.await.map_err(|_| FetchHeaderError::RecvError)? + } + /// Request a block be fetched. Note that this method will request a block /// from a connected peer's inventory, and may take an indefinite amount of /// time, until a peer responds. diff --git a/src/core/messages.rs b/src/core/messages.rs index cb937bc..df92601 100644 --- a/src/core/messages.rs +++ b/src/core/messages.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, time::Duration}; +use std::{collections::BTreeMap, ops::Range, time::Duration}; #[cfg(feature = "filter-control")] use bitcoin::BlockHash; @@ -158,6 +158,8 @@ pub(crate) enum ClientMessage { AddPeer(TrustedPeer), /// Request a header from a specified height. GetHeader(HeaderRequest), + /// Request a range of headers. + GetHeaderBatch(BatchHeaderRequest), } type HeaderSender = tokio::sync::oneshot::Sender>; @@ -174,6 +176,21 @@ impl HeaderRequest { } } +type BatchHeaderSender = + tokio::sync::oneshot::Sender, FetchHeaderError>>; + +#[derive(Debug)] +pub(crate) struct BatchHeaderRequest { + pub(crate) oneshot: BatchHeaderSender, + pub(crate) range: Range, +} + +impl BatchHeaderRequest { + pub(crate) fn new(oneshot: BatchHeaderSender, range: Range) -> Self { + Self { oneshot, range } + } +} + pub(crate) type BlockSender = tokio::sync::oneshot::Sender>; #[cfg(feature = "filter-control")] diff --git a/src/core/node.rs b/src/core/node.rs index 5a99f11..8dcdc22 100644 --- a/src/core/node.rs +++ b/src/core/node.rs @@ -324,6 +324,14 @@ impl Node { if send_result.is_err() { self.dialog.send_warning(Warning::ChannelDropped).await }; + }, + ClientMessage::GetHeaderBatch(request) => { + let chain = self.chain.lock().await; + let range_opt = chain.fetch_header_range(request.range).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() }); + let send_result = request.oneshot.send(range_opt); + if send_result.is_err() { + self.dialog.send_warning(Warning::ChannelDropped).await + }; } } } diff --git a/tests/core.rs b/tests/core.rs index 3944e5d..ebc6b97 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -274,6 +274,8 @@ async fn test_long_chain() { event_rx: mut channel, } = client; sync_assert(&best, &mut channel, &mut log).await; + let batch = sender.get_header_range(10_000..10_002).await.unwrap(); + assert!(batch.is_empty()); sender.shutdown().await.unwrap(); rpc.stop().unwrap(); } @@ -304,6 +306,8 @@ async fn test_sql_reorg() { event_rx: mut channel, } = client; sync_assert(&best, &mut channel, &mut log).await; + let batch = sender.get_header_range(0..10).await.unwrap(); + assert!(!batch.is_empty()); sender.shutdown().await.unwrap(); // Reorganize the blocks let old_best = best;