diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 3b0f075eba..af0a67c229 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -4,40 +4,159 @@ #![warn(missing_docs)] use bdk_chain::{ - bitcoin::{Block, Transaction, Txid}, + bitcoin::{Block, Transaction}, + indexed_tx_graph::Indexer, local_chain::CheckPoint, - BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, + Append, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, }; pub use bitcoincore_rpc; use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi}; -use std::collections::HashSet; +use std::fmt::Debug; -/// An update emitted from [`BitcoindRpcEmitter`]. This can either be of a block or a subset of +/// An update emitted from [`Emitter`]. This can either be of a block or a subset of /// mempool transactions. #[derive(Debug, Clone)] -pub enum BitcoindRpcUpdate { +pub enum EmittedUpdate { /// An emitted block. - Block { - /// The checkpoint constructed from the block's height/hash and connected to the previous - /// block. - cp: CheckPoint, - /// The actual block of the blockchain. - block: Box, - }, + Block(EmittedBlock), /// An emitted subset of mempool transactions. /// - /// [`BitcoindRpcEmitter`] attempts to avoid re-emitting transactions. - Mempool { - /// The checkpoint of the last-seen tip. - cp: CheckPoint, - /// Subset of mempool transactions. - txs: Vec<(Transaction, u64)>, - }, + /// [`Emitter`] attempts to avoid re-emitting transactions. + Mempool(EmittedMempool), } -/// A closure that transforms a [`BitcoindRpcUpdate`] into a [`ConfirmationHeightAnchor`]. +impl EmittedUpdate { + /// Returns whether the update is of a subset of the mempool. + pub fn is_mempool(&self) -> bool { + matches!(self, Self::Mempool { .. }) + } + + /// Returns whether the update is of a block. + pub fn is_block(&self) -> bool { + matches!(self, Self::Block { .. }) + } + + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + match self { + EmittedUpdate::Block(e) => e.checkpoint(), + EmittedUpdate::Mempool(e) => e.checkpoint(), + } + } + + /// Transforms the emitted update into a [`TxGraph`] update. + /// + /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so + /// they do not get included in the [`TxGraph`] update. We have provided two closures; + /// [`empty_filter`] and [`indexer_filter`] for this purpose. + /// + /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type. + /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create + /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. + pub fn into_tx_graph_update(self, tx_filter: F, anchor_map: M) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + M: Fn(&CheckPoint, &Block, usize) -> A, + A: Clone + Ord + PartialOrd, + { + match self { + EmittedUpdate::Block(e) => e.into_tx_graph_update(tx_filter, anchor_map), + EmittedUpdate::Mempool(e) => e.into_tx_graph_update(tx_filter), + } + } +} + +/// An emitted block. +#[derive(Debug, Clone)] +pub struct EmittedBlock { + /// The checkpoint constructed from the block's height/hash and connected to the previous block. + pub cp: CheckPoint, + /// The actual block of the chain. + pub block: Block, +} + +impl EmittedBlock { + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + self.cp.clone() + } + + /// Transforms the emitted update into a [`TxGraph`] update. + pub fn into_tx_graph_update(self, mut tx_filter: F, anchor_map: M) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + M: Fn(&CheckPoint, &Block, usize) -> A, + A: Clone + Ord + PartialOrd, + { + let mut tx_graph = TxGraph::default(); + let tx_iter = self + .block + .txdata + .iter() + .enumerate() + .filter(move |(_, tx)| tx_filter(tx)); + for (tx_pos, tx) in tx_iter { + let txid = tx.txid(); + let _ = tx_graph.insert_anchor(txid, anchor_map(&self.cp, &self.block, tx_pos)); + let _ = tx_graph.insert_tx(tx.clone()); + } + tx_graph + } +} + +/// An emitted subset of mempool transactions. +#[derive(Debug, Clone)] +pub struct EmittedMempool { + /// The checkpoint of the last-seen tip. + pub cp: CheckPoint, + /// Subset of mempool transactions. + pub txs: Vec<(Transaction, u64)>, +} + +impl EmittedMempool { + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + self.cp.clone() + } + + /// Transforms the emitted mempool into a [`TxGraph`] update. + pub fn into_tx_graph_update(self, mut tx_filter: F) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + A: Clone + Ord + PartialOrd, + { + let mut tx_graph = TxGraph::default(); + let tx_iter = self.txs.into_iter().filter(move |(tx, _)| tx_filter(tx)); + for (tx, seen_at) in tx_iter { + let _ = tx_graph.insert_seen_at(tx.txid(), seen_at); + let _ = tx_graph.insert_tx(tx); + } + tx_graph + } +} + +/// Creates a closure that filters transactions based on an [`Indexer`] implementation. +pub fn indexer_filter<'i, I: Indexer>( + indexer: &'i mut I, + changeset: &'i mut I::Additions, +) -> impl FnMut(&Transaction) -> bool + 'i +where + I::Additions: bdk_chain::Append, +{ + |tx| { + changeset.append(indexer.index_tx(tx)); + indexer.is_tx_relevant(tx) + } +} + +/// Returns an empty filter-closure. +pub fn empty_filter() -> impl FnMut(&Transaction) -> bool { + |_| true +} + +/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationHeightAnchor`]. /// -/// This is to be used as an input to [`BitcoindRpcUpdate::into_update`]. +/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`]. pub fn confirmation_height_anchor( cp: &CheckPoint, _block: &Block, @@ -50,9 +169,9 @@ pub fn confirmation_height_anchor( } } -/// A closure that transforms a [`BitcoindRpcUpdate`] into a [`ConfirmationTimeAnchor`]. +/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationTimeAnchor`]. /// -/// This is to be used as an input to [`BitcoindRpcUpdate::into_update`]. +/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`]. pub fn confirmation_time_anchor( cp: &CheckPoint, block: &Block, @@ -66,66 +185,21 @@ pub fn confirmation_time_anchor( } } -impl BitcoindRpcUpdate { - /// Returns whether the update is of a subset of the mempool. - pub fn is_mempool(&self) -> bool { - matches!(self, Self::Mempool { .. }) - } - - /// Returns whether the update is of a block. - pub fn is_block(&self) -> bool { - matches!(self, Self::Block { .. }) - } - - /// Transforms the [`BitcoindRpcUpdate`] into a [`TxGraph`] update. - /// - /// The [`CheckPoint`] tip is also returned. This is the block height and hash that the - /// [`TxGraph`] update is created from. - /// - /// The `anchor` parameter specifies the anchor type of the update. - /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create - /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. - pub fn into_update(self, anchor: F) -> (CheckPoint, TxGraph) - where - A: Clone + Ord + PartialOrd, - F: Fn(&CheckPoint, &Block, usize) -> A, - { - let mut tx_graph = TxGraph::default(); - match self { - BitcoindRpcUpdate::Block { cp, block } => { - for (tx_pos, tx) in block.txdata.iter().enumerate() { - let txid = tx.txid(); - let _ = tx_graph.insert_anchor(txid, anchor(&cp, &block, tx_pos)); - let _ = tx_graph.insert_tx(tx.clone()); - } - (cp, tx_graph) - } - BitcoindRpcUpdate::Mempool { cp, txs } => { - for (tx, seen_at) in txs { - let _ = tx_graph.insert_seen_at(tx.txid(), seen_at); - let _ = tx_graph.insert_tx(tx); - } - (cp, tx_graph) - } - } - } -} - /// A structure that emits updates for [`bdk_chain`] structures, sourcing blockchain data from /// [`bitcoincore_rpc::Client`]. /// -/// Updates are of type [`BitcoindRpcUpdate`], where each update can either be of a whole block, or -/// a subset of the mempool. +/// Updates are of type [`Emitter`], where each update can either be of a whole block, or a subset +/// of the mempool. /// -/// A [`BitcoindRpcEmitter`] emits updates starting from the `fallback_height` provided in [`new`], -/// or if `last_cp` is provided, we start from the height above the agreed-upon blockhash (between +/// A [`Emitter`] emits updates starting from the `fallback_height` provided in [`new`], or if +/// `last_cp` is provided, we start from the height above the agreed-upon blockhash (between /// `last_cp` and the state of `bitcoind`). Blocks are emitted in sequence (ascending order), and /// the mempool contents emitted if the last emission is the chain tip. /// /// # [`Iterator`] implementation /// -/// [`BitcoindRpcEmitter`] implements [`Iterator`] in a way such that even after [`Iterator::next`] -/// returns [`None`], subsequent calls may resume returning [`Some`]. +/// [`Emitter`] implements [`Iterator`] in a way such that even after [`Iterator::next`] returns +/// [`None`], subsequent calls may resume returning [`Some`]. /// /// Returning [`None`] means that the previous call to [`next`] is the mempool. This is useful if /// the caller wishes to update once. @@ -142,38 +216,47 @@ impl BitcoindRpcUpdate { /// } /// ``` /// -/// Alternatively, if the caller wishes to keep [`BitcoindRpcEmitter`] in a dedicated update-thread, -/// the caller can continue to poll [`next`] (potentially with a delay). +/// Alternatively, if the caller wishes to keep [`Emitter`] in a dedicated update-thread, the caller +/// can continue to poll [`next`] (potentially with a delay). /// -/// [`new`]: BitcoindRpcEmitter::new +/// [`new`]: Emitter::new /// [`next`]: Iterator::next -pub struct BitcoindRpcEmitter<'a> { +pub struct Emitter<'a> { client: &'a Client, fallback_height: u32, last_cp: Option, last_info: Option, - - seen_txids: HashSet, - last_emission_was_mempool: bool, } -impl<'a> Iterator for BitcoindRpcEmitter<'a> { - /// Represents an emitted item. - type Item = Result; +// impl<'a> Iterator for BitcoindRpcEmitter<'a> { +// /// Represents an emitted item. +// type Item = Result; - fn next(&mut self) -> Option { - if self.last_emission_was_mempool { - self.last_emission_was_mempool = false; - None - } else { - Some(self.next_update()) +// fn next(&mut self) -> Option { +// if self.last_emission_was_mempool { +// self.last_emission_was_mempool = false; +// None +// } else { +// Some(self.next_update()) +// } +// } +// } + +impl<'a> IntoIterator for Emitter<'a> { + type Item = as Iterator>::Item; + type IntoIter = Iter<'a>; + + fn into_iter(self) -> Self::IntoIter { + Iter { + emitter: self, + last_emission_was_mempool: false, } } } -impl<'a> BitcoindRpcEmitter<'a> { - /// Constructs a new [`BitcoindRpcEmitter`] with the provided [`bitcoincore_rpc::Client`]. +impl<'a> Emitter<'a> { + /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`]. /// /// * `fallback_height` is the block height to start from if `last_cp` is not provided, or a /// point of agreement is not found. @@ -184,131 +267,142 @@ impl<'a> BitcoindRpcEmitter<'a> { fallback_height, last_cp, last_info: None, - seen_txids: HashSet::new(), - last_emission_was_mempool: false, } } - /// Continuously poll [`bitcoincore_rpc::Client`] until an update is found. - pub fn next_update(&mut self) -> Result { - loop { - match self.poll()? { - Some(item) => return Ok(item), - None => continue, - }; - } + /// Emits the whole mempool contents. + pub fn emit_mempool(&self) -> Result { + let txs = self + .client + .get_raw_mempool()? + .into_iter() + .map( + |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> { + let first_seen = self + .client + .get_mempool_entry(&txid) + .map(|entry| entry.time)?; + let tx = self.client.get_raw_transaction(&txid, None)?; + Ok((tx, first_seen)) + }, + ) + .collect::, _>>()?; + let cp = match &self.last_cp { + Some(cp) => cp.clone(), + None => { + let hash = self.client.get_best_block_hash()?; + let height = self.client.get_block_info(&hash)?.height as u32; + CheckPoint::new(BlockId { height, hash }) + } + }; + Ok(EmittedMempool { cp, txs }) } - /// Performs a single round of polling [`bitcoincore_rpc::Client`] and updating the internal - /// state. This returns [`Ok(Some(BitcoindRpcUpdate))`] if an update is found. - pub fn poll(&mut self) -> Result, bitcoincore_rpc::Error> { - let client = self.client; - self.last_emission_was_mempool = false; - - match (&mut self.last_cp, &mut self.last_info) { - // If `last_cp` and `last_info` are both none, we need to emit from the - // `fallback_height`. `last_cp` and `last_info` will both be updated to the emitted - // block. - (last_cp @ None, last_info @ None) => { - let info = - client.get_block_info(&client.get_block_hash(self.fallback_height as _)?)?; - let block = self.client.get_block(&info.hash)?; - let cp = CheckPoint::new(BlockId { - height: info.height as _, - hash: info.hash, - }); - *last_cp = Some(cp.clone()); - *last_info = Some(info); - Ok(Some(BitcoindRpcUpdate::Block { - cp, - block: Box::new(block), - })) - } - // If `last_cp` exists, but `last_info` does not, it means we have not fetched a - // block from the client yet, but we have a previous checkpoint which we can use to - // find the point of agreement with. - // - // We don't emit in this match case. Instead, we set the state to either: - // * { last_cp: Some, last_info: Some } : When we find a point of agreement. - // * { last_cp: None, last_indo: None } : When we cannot find a point of agreement. - (last_cp @ Some(_), last_info @ None) => { - for cp in last_cp.clone().iter().flat_map(CheckPoint::iter) { - let cp_block = cp.block_id(); - - let info = client.get_block_info(&cp_block.hash)?; - if info.confirmations < 0 { - // block is not in the main chain - continue; - } - // agreement found - *last_cp = Some(cp); - *last_info = Some(info); - return Ok(None); - } + /// Emits the next block (if any). + pub fn emit_block(&mut self) -> Result, bitcoincore_rpc::Error> { + enum PollResponse { + /// A new block that is in chain is found. Congratulations! + Block { + cp: CheckPoint, + info: GetBlockResult, + }, + /// This either signals that we have reached the tip, or that the blocks ahead are not + /// in the best chain. In either case, we need to find the agreement point again. + NoMoreBlocks, + /// We have exhausted the local checkpoint history and there is no agreement point. We + /// should emit from the fallback height for the next round. + AgreementPointNotFound, + /// We have found an agreement point! Do not emit this one, emit the one higher. + AgreementPointFound { + cp: CheckPoint, + info: GetBlockResult, + }, + } - // no point of agreement found, next call will emit block @ fallback height - *last_cp = None; - *last_info = None; - Ok(None) - } - // If `last_cp` and `last_info` is both `Some`, we either emit a block at - // `last_info.nextblockhash` (if it exists), or we emit a subset of the mempool. - (Some(last_cp), last_info @ Some(_)) => { - // find next block - match last_info.as_ref().unwrap().nextblockhash { - Some(next_hash) => { - let info = self.client.get_block_info(&next_hash)?; + fn poll(emitter: &mut Emitter) -> Result { + let client = emitter.client; + match (&mut emitter.last_cp, &mut emitter.last_info) { + (None, None) => { + let info = client + .get_block_info(&client.get_block_hash(emitter.fallback_height as _)?)?; + let cp = CheckPoint::new(BlockId { + height: info.height as _, + hash: info.hash, + }); + Ok(PollResponse::Block { cp, info }) + } + (Some(last_cp), None) => { + for cp in last_cp.iter() { + let cp_block = cp.block_id(); + let info = client.get_block_info(&cp_block.hash)?; if info.confirmations < 0 { - *last_info = None; - return Ok(None); + // block is not in the main chain + continue; } - - let block = self.client.get_block(&info.hash)?; - let cp = last_cp - .clone() - .push(BlockId { - height: info.height as _, - hash: info.hash, - }) - .expect("must extend from checkpoint"); - - *last_cp = cp.clone(); - *last_info = Some(info); - - Ok(Some(BitcoindRpcUpdate::Block { - cp, - block: Box::new(block), - })) + // agreement point found + return Ok(PollResponse::AgreementPointFound { cp, info }); } - None => { - let mempool_txs = client - .get_raw_mempool()? - .into_iter() - .filter(|&txid| self.seen_txids.insert(txid)) - .map( - |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> { - let first_seen = - client.get_mempool_entry(&txid).map(|entry| entry.time)?; - let tx = client.get_raw_transaction(&txid, None)?; - Ok((tx, first_seen)) - }, - ) - .collect::, _>>()?; - - // After a mempool emission, we want to find the point of agreement in - // the next round. - *last_info = None; - - self.last_emission_was_mempool = true; - Ok(Some(BitcoindRpcUpdate::Mempool { - txs: mempool_txs, - cp: last_cp.clone(), - })) + // no agreement point found + Ok(PollResponse::AgreementPointNotFound) + } + (Some(last_cp), Some(last_info)) => { + let next_hash = match last_info.nextblockhash { + None => return Ok(PollResponse::NoMoreBlocks), + Some(next_hash) => next_hash, + }; + let info = client.get_block_info(&next_hash)?; + if info.confirmations < 0 { + return Ok(PollResponse::NoMoreBlocks); } + let cp = last_cp + .clone() + .push(BlockId { + height: info.height as _, + hash: info.hash, + }) + .expect("must extend from checkpoint"); + Ok(PollResponse::Block { cp, info }) + } + (None, Some(last_info)) => unreachable!( + "info cannot exist without checkpoint: info={:#?}", + last_info + ), + } + } + + loop { + match poll(self)? { + PollResponse::Block { cp, info } => { + let block = self.client.get_block(&info.hash)?; + self.last_cp = Some(cp.clone()); + self.last_info = Some(info); + return Ok(Some(EmittedBlock { cp, block })); + } + PollResponse::NoMoreBlocks => { + // we have reached the tip, try find agreement point in next round + self.last_info = None; + return Ok(None); + } + PollResponse::AgreementPointNotFound => { + self.last_cp = None; + self.last_info = None; + continue; + } + PollResponse::AgreementPointFound { cp, info } => { + self.last_cp = Some(cp); + self.last_info = Some(info); + continue; } } - (None, Some(info)) => unreachable!("got info with no checkpoint? info={:#?}", info), + } + } + + /// Continuously poll [`bitcoincore_rpc::Client`] until an update is found. + pub fn next_update(&mut self) -> Result { + match self.emit_block()? { + Some(emitted_block) => Ok(EmittedUpdate::Block(emitted_block)), + None => self.emit_mempool().map(EmittedUpdate::Mempool), } } } @@ -317,7 +411,7 @@ impl<'a> BitcoindRpcEmitter<'a> { pub trait BitcoindRpcErrorExt { /// Returns whether the error is a "not found" error. /// - /// This is useful since [`BitcoindRpcEmitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as + /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as /// [`Iterator::Item`]. fn is_not_found_error(&self) -> bool; } @@ -332,3 +426,22 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { } } } + +/// Iterates (TODO) +pub struct Iter<'a> { + emitter: Emitter<'a>, + last_emission_was_mempool: bool, +} + +impl<'a> Iterator for Iter<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.last_emission_was_mempool { + self.last_emission_was_mempool = false; + None + } else { + Some(self.emitter.next_update()) + } + } +} diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs index 21cca8b704..024d212f68 100644 --- a/example-crates/example_rpc/src/main.rs +++ b/example-crates/example_rpc/src/main.rs @@ -10,13 +10,14 @@ use std::{ use bdk_bitcoind_rpc::{ bitcoincore_rpc::{Auth, Client, RpcApi}, - confirmation_time_anchor, BitcoindRpcEmitter, BitcoindRpcUpdate, + EmittedUpdate, Emitter, }; use bdk_chain::{ bitcoin::{Address, Transaction}, - keychain::LocalChangeSet, + indexed_tx_graph::IndexedAdditions, + keychain::{DerivationAdditions, LocalChangeSet}, local_chain::{self, LocalChain}, - BlockId, ConfirmationTimeAnchor, IndexedTxGraph, + Append, BlockId, ConfirmationTimeAnchor, IndexedTxGraph, }; use example_cli::{ anyhow, @@ -150,13 +151,13 @@ fn main() -> anyhow::Result<()> { } => { graph.lock().unwrap().index.set_lookahead_for_all(lookahead); - let (chan, recv) = sync_channel::<(BitcoindRpcUpdate, u32)>(CHANNEL_BOUND); + let (chan, recv) = sync_channel::<(EmittedUpdate, u32)>(CHANNEL_BOUND); let prev_cp = chain.lock().unwrap().tip(); let join_handle = std::thread::spawn(move || -> anyhow::Result<()> { let mut tip_height = Option::::None; - for item in BitcoindRpcEmitter::new(&rpc_client, fallback_height, prev_cp) { + for item in Emitter::new(&rpc_client, fallback_height, prev_cp).into_iter() { let item = item?; let is_block = !item.is_mempool(); let is_mempool = item.is_mempool(); @@ -185,19 +186,28 @@ fn main() -> anyhow::Result<()> { for (item, tip_height) in recv { let is_mempool = item.is_mempool(); - let (tip, graph_update) = item.into_update(confirmation_time_anchor); + // item.into_tx_graph_update(tx_filter, anchor_map) + // let (tip, graph_update) = item.into_update(confirmation_time_anchor); + let tip = item.checkpoint(); let current_height = tip.height(); let db_changeset = { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); + let mut index_additions = DerivationAdditions::default(); + let graph_update = item.into_tx_graph_update( + bdk_bitcoind_rpc::indexer_filter(&mut graph.index, &mut index_additions), + bdk_bitcoind_rpc::confirmation_time_anchor, + ); + let chain_changeset = chain.apply_update(local_chain::Update { tip, introduce_older_blocks: false, })?; - let indexed_additions = graph.prune_and_apply_update(graph_update); + let mut indexed_additions = graph.prune_and_apply_update(graph_update); + indexed_additions.append(IndexedAdditions::from(index_additions)); ChangeSet { indexed_additions,