Skip to content

Commit

Permalink
Merge pull request #249 from rustaceanrob/fetch-range-1-2
Browse files Browse the repository at this point in the history
Fetch a range of headers from client
  • Loading branch information
rustaceanrob authored Jan 7, 2025
2 parents 3806b1e + f6c24f4 commit bc866ea
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 3 deletions.
18 changes: 18 additions & 0 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate alloc;
use std::{
collections::{BTreeMap, HashSet},
ops::Range,
sync::Arc,
};

Expand Down Expand Up @@ -894,6 +895,23 @@ impl<H: HeaderStore> Chain<H> {
}
}

pub(crate) async fn fetch_header_range(
&self,
range: Range<u32>,
) -> Result<BTreeMap<u32, Header>, HeaderPersistenceError<H::Error>> {
let mut db = self.db.lock().await;
let range_opt = db.load(range).await;
if range_opt.is_err() {
self.dialog
.send_warning(Warning::FailedPersistance {
warning: "Unexpected error fetching a range of headers from the header store"
.to_string(),
})
.await;
}
range_opt.map_err(HeaderPersistenceError::Database)
}

// Reset the compact filter queue because we received a new block
pub(crate) fn clear_compact_filter_queue(&mut self) {
self.cf_header_chain.clear_queue();
Expand Down
39 changes: 37 additions & 2 deletions src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bitcoin::BlockHash;
#[cfg(not(feature = "filter-control"))]
use bitcoin::ScriptBuf;
use bitcoin::Transaction;
use std::time::Duration;
use std::{collections::BTreeMap, ops::Range, time::Duration};
use tokio::sync::mpsc;
pub use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
Expand All @@ -16,7 +16,7 @@ use crate::{Event, Log, TrustedPeer, TxBroadcast};
use super::{error::FetchBlockError, messages::BlockRequest, BlockReceiver, IndexedBlock};
use super::{
error::{ClientError, FetchHeaderError},
messages::{ClientMessage, HeaderRequest},
messages::{BatchHeaderRequest, ClientMessage, HeaderRequest},
};

/// A [`Client`] allows for communication with a running node.
Expand Down Expand Up @@ -74,6 +74,10 @@ impl EventSender {
/// # Errors
///
/// If the node has already stopped running.
///
/// # Panics
///
/// When called within an asynchronus context (e.g `tokio::main`).
pub fn shutdown_blocking(&self) -> Result<(), ClientError> {
self.ntx
.blocking_send(ClientMessage::Shutdown)
Expand Down Expand Up @@ -119,6 +123,10 @@ impl EventSender {
/// # Errors
///
/// If the node has stopped running.
///
/// # Panics
///
/// When called within an asynchronus context (e.g `tokio::main`).
pub fn broadcast_tx_blocking(&self, tx: TxBroadcast) -> Result<(), ClientError> {
self.ntx
.blocking_send(ClientMessage::Broadcast(tx))
Expand All @@ -145,6 +153,10 @@ impl EventSender {
/// # Errors
///
/// If the node has stopped running.
///
/// # Panics
///
/// When called within an asynchronus context (e.g `tokio::main`).
#[cfg(not(feature = "filter-control"))]
pub fn add_script_blocking(&self, script: impl Into<ScriptBuf>) -> Result<(), ClientError> {
self.ntx
Expand Down Expand Up @@ -182,6 +194,10 @@ impl EventSender {
/// # Errors
///
/// If the node has stopped running.
///
/// # Panics
///
/// When called within an asynchronus context (e.g `tokio::main`).
pub fn get_header_blocking(&self, height: u32) -> Result<Header, FetchHeaderError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Result<Header, FetchHeaderError>>();
let message = HeaderRequest::new(tx, height);
Expand All @@ -192,6 +208,25 @@ impl EventSender {
.map_err(|_| FetchHeaderError::RecvError)?
}

/// Get a range of headers by the specified range.
///
/// # Errors
///
/// If the node has stopped running.
pub async fn get_header_range(
&self,
range: Range<u32>,
) -> Result<BTreeMap<u32, Header>, FetchHeaderError> {
let (tx, rx) =
tokio::sync::oneshot::channel::<Result<BTreeMap<u32, Header>, FetchHeaderError>>();
let message = BatchHeaderRequest::new(tx, range);
self.ntx
.send(ClientMessage::GetHeaderBatch(message))
.await
.map_err(|_| FetchHeaderError::SendError)?;
rx.await.map_err(|_| FetchHeaderError::RecvError)?
}

/// Request a block be fetched. Note that this method will request a block
/// from a connected peer's inventory, and may take an indefinite amount of
/// time, until a peer responds.
Expand Down
19 changes: 18 additions & 1 deletion src/core/messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, time::Duration};
use std::{collections::BTreeMap, ops::Range, time::Duration};

#[cfg(feature = "filter-control")]
use bitcoin::BlockHash;
Expand Down Expand Up @@ -158,6 +158,8 @@ pub(crate) enum ClientMessage {
AddPeer(TrustedPeer),
/// Request a header from a specified height.
GetHeader(HeaderRequest),
/// Request a range of headers.
GetHeaderBatch(BatchHeaderRequest),
}

type HeaderSender = tokio::sync::oneshot::Sender<Result<Header, FetchHeaderError>>;
Expand All @@ -174,6 +176,21 @@ impl HeaderRequest {
}
}

type BatchHeaderSender =
tokio::sync::oneshot::Sender<Result<BTreeMap<u32, Header>, FetchHeaderError>>;

#[derive(Debug)]
pub(crate) struct BatchHeaderRequest {
pub(crate) oneshot: BatchHeaderSender,
pub(crate) range: Range<u32>,
}

impl BatchHeaderRequest {
pub(crate) fn new(oneshot: BatchHeaderSender, range: Range<u32>) -> Self {
Self { oneshot, range }
}
}

pub(crate) type BlockSender = tokio::sync::oneshot::Sender<Result<IndexedBlock, FetchBlockError>>;

#[cfg(feature = "filter-control")]
Expand Down
8 changes: 8 additions & 0 deletions src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped).await
};
},
ClientMessage::GetHeaderBatch(request) => {
let chain = self.chain.lock().await;
let range_opt = chain.fetch_header_range(request.range).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() });
let send_result = request.oneshot.send(range_opt);
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped).await
};
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ async fn test_long_chain() {
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
let batch = sender.get_header_range(10_000..10_002).await.unwrap();
assert!(batch.is_empty());
sender.shutdown().await.unwrap();
rpc.stop().unwrap();
}
Expand Down Expand Up @@ -304,6 +306,8 @@ async fn test_sql_reorg() {
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
let batch = sender.get_header_range(0..10).await.unwrap();
assert!(!batch.is_empty());
sender.shutdown().await.unwrap();
// Reorganize the blocks
let old_best = best;
Expand Down

0 comments on commit bc866ea

Please sign in to comment.