From 0fa68a6fa813c7829a73ca73e75d9544f51175d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 6 Mar 2024 13:04:12 +0800 Subject: [PATCH 1/8] feat(chain): add `query` and `query_from` methods to `CheckPoint` These methods allow us to query for checkpoints contained within the linked list by height. This is useful to determine checkpoints to fetch for chain sources without having to refer back to the `LocalChain`. Currently this is not implemented efficiently, but in the future, we will change `CheckPoint` to use a skip list structure. --- crates/bdk/src/wallet/mod.rs | 11 +- crates/bitcoind_rpc/tests/test_emitter.rs | 18 +- crates/chain/src/local_chain.rs | 185 ++++++++++++-------- crates/chain/src/tx_graph.rs | 10 +- crates/chain/tests/test_indexed_tx_graph.rs | 13 +- crates/chain/tests/test_local_chain.rs | 46 +++++ crates/esplora/tests/blocking_ext.rs | 4 +- 7 files changed, 182 insertions(+), 105 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 4eb686eb4..6bd9d9b34 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -1128,18 +1128,13 @@ impl Wallet { // anchor tx to checkpoint with lowest height that is >= position's height let anchor = self .chain - .blocks() - .range(height..) - .next() + .query_from(height) .ok_or(InsertTxError::ConfirmationHeightCannotBeGreaterThanTip { tip_height: self.chain.tip().height(), tx_height: height, }) - .map(|(&anchor_height, &hash)| ConfirmationTimeHeightAnchor { - anchor_block: BlockId { - height: anchor_height, - hash, - }, + .map(|anchor_cp| ConfirmationTimeHeightAnchor { + anchor_block: anchor_cp.block_id(), confirmation_height: height, confirmation_time: time, })?; diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 2161db0df..97946da99 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -57,12 +57,15 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { } assert_eq!( - local_chain.blocks(), - &exp_hashes + local_chain + .iter_checkpoints() + .map(|cp| (cp.height(), cp.hash())) + .collect::>(), + exp_hashes .iter() .enumerate() .map(|(i, hash)| (i as u32, *hash)) - .collect(), + .collect::>(), "final local_chain state is unexpected", ); @@ -110,12 +113,15 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { } assert_eq!( - local_chain.blocks(), - &exp_hashes + local_chain + .iter_checkpoints() + .map(|cp| (cp.height(), cp.hash())) + .collect::>(), + exp_hashes .iter() .enumerate() .map(|(i, hash)| (i as u32, *hash)) - .collect(), + .collect::>(), "final local_chain state is unexpected after reorg", ); diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index 9be62dee3..f90b28e85 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -5,6 +5,7 @@ use core::convert::Infallible; use crate::collections::BTreeMap; use crate::{BlockId, ChainOracle}; use alloc::sync::Arc; +use alloc::vec::Vec; use bitcoin::block::Header; use bitcoin::BlockHash; @@ -148,6 +149,23 @@ impl CheckPoint { pub fn iter(&self) -> CheckPointIter { self.clone().into_iter() } + + /// Query for checkpoint at `height`. + /// + /// Returns `None` if checkpoint at `height` does not exist`. + pub fn query(&self, height: u32) -> Option { + self.iter() + // optimization to avoid iterating the entire chain if we do not get a direct hit + .take_while(|cp| cp.height() >= height) + .find(|cp| cp.height() == height) + } + + /// Query for checkpoint that is greater or equal to `height`. + /// + /// Returns `None` if no checkpoints has a height equal or greater than `height`. + pub fn query_from(&self, height: u32) -> Option { + self.iter().take_while(|cp| cp.height() >= height).last() + } } /// Iterates over checkpoints backwards. @@ -205,18 +223,28 @@ pub struct Update { #[derive(Debug, Clone)] pub struct LocalChain { tip: CheckPoint, - index: BTreeMap, } impl PartialEq for LocalChain { fn eq(&self, other: &Self) -> bool { - self.index == other.index + self.iter_checkpoints() + .map(|cp| cp.block_id()) + .collect::>() + == other + .iter_checkpoints() + .map(|cp| cp.block_id()) + .collect::>() } } +// TODO: Figure out whether we can get rid of this impl From for BTreeMap { fn from(value: LocalChain) -> Self { - value.index + value + .tip + .iter() + .map(|cp| (cp.height(), cp.hash())) + .collect() } } @@ -228,18 +256,16 @@ impl ChainOracle for LocalChain { block: BlockId, chain_tip: BlockId, ) -> Result, Self::Error> { - if block.height > chain_tip.height { - return Ok(None); + let chain_tip_cp = match self.tip.query(chain_tip.height) { + // we can only determine whether `block` is in chain of `chain_tip` if `chain_tip` can + // be identified in chain + Some(cp) if cp.hash() == chain_tip.hash => cp, + _ => return Ok(None), + }; + match chain_tip_cp.query(block.height) { + Some(cp) => Ok(Some(cp.hash() == block.hash)), + None => Ok(None), } - Ok( - match ( - self.index.get(&block.height), - self.index.get(&chain_tip.height), - ) { - (Some(cp), Some(tip_cp)) => Some(*cp == block.hash && *tip_cp == chain_tip.hash), - _ => None, - }, - ) } fn get_chain_tip(&self) -> Result { @@ -250,7 +276,7 @@ impl ChainOracle for LocalChain { impl LocalChain { /// Get the genesis hash. pub fn genesis_hash(&self) -> BlockHash { - self.index.get(&0).copied().expect("must have genesis hash") + self.tip.query(0).expect("genesis must exist").hash() } /// Construct [`LocalChain`] from genesis `hash`. @@ -259,7 +285,6 @@ impl LocalChain { let height = 0; let chain = Self { tip: CheckPoint::new(BlockId { height, hash }), - index: core::iter::once((height, hash)).collect(), }; let changeset = chain.initial_changeset(); (chain, changeset) @@ -276,7 +301,6 @@ impl LocalChain { let (mut chain, _) = Self::from_genesis_hash(genesis_hash); chain.apply_changeset(&changeset)?; - debug_assert!(chain._check_index_is_consistent_with_tip()); debug_assert!(chain._check_changeset_is_applied(&changeset)); Ok(chain) @@ -284,18 +308,11 @@ impl LocalChain { /// Construct a [`LocalChain`] from a given `checkpoint` tip. pub fn from_tip(tip: CheckPoint) -> Result { - let mut chain = Self { - tip, - index: BTreeMap::new(), - }; - chain.reindex(0); - - if chain.index.get(&0).copied().is_none() { + let genesis_cp = tip.iter().last().expect("must have at least one element"); + if genesis_cp.height() != 0 { return Err(MissingGenesisError); } - - debug_assert!(chain._check_index_is_consistent_with_tip()); - Ok(chain) + Ok(Self { tip }) } /// Constructs a [`LocalChain`] from a [`BTreeMap`] of height to [`BlockHash`]. @@ -303,12 +320,11 @@ impl LocalChain { /// The [`BTreeMap`] enforces the height order. However, the caller must ensure the blocks are /// all of the same chain. pub fn from_blocks(blocks: BTreeMap) -> Result { - if !blocks.contains_key(&0) { + if blocks.get(&0).is_none() { return Err(MissingGenesisError); } let mut tip: Option = None; - for block in &blocks { match tip { Some(curr) => { @@ -321,13 +337,9 @@ impl LocalChain { } } - let chain = Self { - index: blocks, + Ok(Self { tip: tip.expect("already checked to have genesis"), - }; - - debug_assert!(chain._check_index_is_consistent_with_tip()); - Ok(chain) + }) } /// Get the highest checkpoint. @@ -494,9 +506,7 @@ impl LocalChain { None => LocalChain::from_blocks(extension)?.tip(), }; self.tip = new_tip; - self.reindex(start_height); - debug_assert!(self._check_index_is_consistent_with_tip()); debug_assert!(self._check_changeset_is_applied(changeset)); } @@ -509,16 +519,16 @@ impl LocalChain { /// /// Replacing the block hash of an existing checkpoint will result in an error. pub fn insert_block(&mut self, block_id: BlockId) -> Result { - if let Some(&original_hash) = self.index.get(&block_id.height) { + if let Some(original_cp) = self.tip.query(block_id.height) { + let original_hash = original_cp.hash(); if original_hash != block_id.hash { return Err(AlterCheckPointError { height: block_id.height, original_hash, update_hash: Some(block_id.hash), }); - } else { - return Ok(ChangeSet::default()); } + return Ok(ChangeSet::default()); } let mut changeset = ChangeSet::default(); @@ -542,33 +552,41 @@ impl LocalChain { /// This will fail with [`MissingGenesisError`] if the caller attempts to disconnect from the /// genesis block. pub fn disconnect_from(&mut self, block_id: BlockId) -> Result { - if self.index.get(&block_id.height) != Some(&block_id.hash) { - return Ok(ChangeSet::default()); - } - - let changeset = self - .index - .range(block_id.height..) - .map(|(&height, _)| (height, None)) - .collect::(); - self.apply_changeset(&changeset).map(|_| changeset) - } - - /// Reindex the heights in the chain from (and including) `from` height - fn reindex(&mut self, from: u32) { - let _ = self.index.split_off(&from); - for cp in self.iter_checkpoints() { - if cp.height() < from { + let mut remove_from = Option::::None; + let mut changeset = ChangeSet::default(); + for cp in self.tip().iter() { + let cp_id = cp.block_id(); + if cp_id.height < block_id.height { break; } - self.index.insert(cp.height(), cp.hash()); + changeset.insert(cp_id.height, None); + if cp_id == block_id { + remove_from = Some(cp); + } } + self.tip = match remove_from.map(|cp| cp.prev()) { + // The checkpoint below the earliest checkpoint to remove will be the new tip. + Some(Some(new_tip)) => new_tip, + // If there is no checkpoint below the earliest checkpoint to remove, it means the + // "earliest checkpoint to remove" is the genesis block. We disallow removing the + // genesis block. + Some(None) => return Err(MissingGenesisError), + // If there is nothing to remove, we return an empty changeset. + None => return Ok(ChangeSet::default()), + }; + Ok(changeset) } /// Derives an initial [`ChangeSet`], meaning that it can be applied to an empty chain to /// recover the current chain. pub fn initial_changeset(&self) -> ChangeSet { - self.index.iter().map(|(k, v)| (*k, Some(*v))).collect() + self.tip + .iter() + .map(|cp| { + let block_id = cp.block_id(); + (block_id.height, Some(block_id.hash)) + }) + .collect() } /// Iterate over checkpoints in descending height order. @@ -578,28 +596,43 @@ impl LocalChain { } } - /// Get a reference to the internal index mapping the height to block hash. - pub fn blocks(&self) -> &BTreeMap { - &self.index - } - - fn _check_index_is_consistent_with_tip(&self) -> bool { - let tip_history = self - .tip - .iter() - .map(|cp| (cp.height(), cp.hash())) - .collect::>(); - self.index == tip_history - } - fn _check_changeset_is_applied(&self, changeset: &ChangeSet) -> bool { - for (height, exp_hash) in changeset { - if self.index.get(height) != exp_hash.as_ref() { - return false; + let mut curr_cp = self.tip.clone(); + for (height, exp_hash) in changeset.iter().rev() { + match curr_cp.query(*height) { + Some(query_cp) => { + if query_cp.height() != *height || Some(query_cp.hash()) != *exp_hash { + return false; + } + curr_cp = query_cp; + } + None => { + if exp_hash.is_some() { + return false; + } + } } } true } + + /// Query for checkpoint at given `height` (if it exists). + /// + /// This is a shorthand for calling [`CheckPoint::query`] on the [`tip`]. + /// + /// [`tip`]: LocalChain::tip + pub fn query(&self, height: u32) -> Option { + self.tip.query(height) + } + + /// Query for checkpoint that is greater or equal to `height`. + /// + /// This is a shorthand for calling [`CheckPoint::query_from`] on the [`tip`]. + /// + /// [`tip`]: LocalChain::tip + pub fn query_from(&self, height: u32) -> Option { + self.tip.query_from(height) + } } /// An error which occurs when a [`LocalChain`] is constructed without a genesis checkpoint. diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index 30d020ecb..f80a20713 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -725,13 +725,13 @@ impl TxGraph { }; let mut has_missing_height = false; for anchor_block in tx_anchors.iter().map(Anchor::anchor_block) { - match chain.blocks().get(&anchor_block.height) { + match chain.query(anchor_block.height) { None => { has_missing_height = true; continue; } - Some(chain_hash) => { - if chain_hash == &anchor_block.hash { + Some(chain_cp) => { + if chain_cp.hash() == anchor_block.hash { return true; } } @@ -749,7 +749,7 @@ impl TxGraph { .filter_map(move |(a, _)| { let anchor_block = a.anchor_block(); if Some(anchor_block.height) != last_height_emitted - && !chain.blocks().contains_key(&anchor_block.height) + && chain.query(anchor_block.height).is_none() { last_height_emitted = Some(anchor_block.height); Some(anchor_block.height) @@ -1299,7 +1299,7 @@ impl ChangeSet { A: Anchor, { self.anchor_heights() - .filter(move |height| !local_chain.blocks().contains_key(height)) + .filter(move |&height| local_chain.query(height).is_none()) } } diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 3fcaf2d19..0fd2a71b8 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -7,7 +7,7 @@ use bdk_chain::{ indexed_tx_graph::{self, IndexedTxGraph}, keychain::{self, Balance, KeychainTxOutIndex}, local_chain::LocalChain, - tx_graph, BlockId, ChainPosition, ConfirmationHeightAnchor, + tx_graph, ChainPosition, ConfirmationHeightAnchor, }; use bitcoin::{secp256k1::Secp256k1, OutPoint, Script, ScriptBuf, Transaction, TxIn, TxOut}; use miniscript::Descriptor; @@ -212,10 +212,8 @@ fn test_list_owned_txouts() { ( *tx, local_chain - .blocks() - .get(&height) - .cloned() - .map(|hash| BlockId { height, hash }) + .query(height) + .map(|cp| cp.block_id()) .map(|anchor_block| ConfirmationHeightAnchor { anchor_block, confirmation_height: anchor_block.height, @@ -230,9 +228,8 @@ fn test_list_owned_txouts() { |height: u32, graph: &IndexedTxGraph>| { let chain_tip = local_chain - .blocks() - .get(&height) - .map(|&hash| BlockId { height, hash }) + .query(height) + .map(|cp| cp.block_id()) .unwrap_or_else(|| panic!("block must exist at {}", height)); let txouts = graph .graph() diff --git a/crates/chain/tests/test_local_chain.rs b/crates/chain/tests/test_local_chain.rs index c1a1cd7f9..1a1371a54 100644 --- a/crates/chain/tests/test_local_chain.rs +++ b/crates/chain/tests/test_local_chain.rs @@ -528,6 +528,52 @@ fn checkpoint_from_block_ids() { } } +#[test] +fn checkpoint_query() { + struct TestCase { + chain: LocalChain, + /// The heights we want to call [`CheckPoint::query`] with, represented as an inclusive + /// range. + /// + /// If a [`CheckPoint`] exists at that height, we expect [`CheckPoint::query`] to return + /// it. If not, [`CheckPoint::query`] should return `None`. + query_range: (u32, u32), + } + + let test_cases = [ + TestCase { + chain: local_chain![(0, h!("_")), (1, h!("A"))], + query_range: (0, 2), + }, + TestCase { + chain: local_chain![(0, h!("_")), (2, h!("B")), (3, h!("C"))], + query_range: (0, 3), + }, + ]; + + for t in test_cases.into_iter() { + let tip = t.chain.tip(); + for h in t.query_range.0..=t.query_range.1 { + let query_result = tip.query(h); + + // perform an exhausitive search for the checkpoint at height `h` + let exp_hash = t + .chain + .iter_checkpoints() + .find(|cp| cp.height() == h) + .map(|cp| cp.hash()); + + match query_result { + Some(cp) => { + assert_eq!(Some(cp.hash()), exp_hash); + assert_eq!(cp.height(), h); + } + None => assert!(exp_hash.is_none()), + } + } + } +} + #[test] fn local_chain_apply_header_connected_to() { fn header_from_prev_blockhash(prev_blockhash: BlockHash) -> Header { diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 9e39a93c9..304d36065 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -360,8 +360,8 @@ fn update_local_chain() -> anyhow::Result<()> { for height in t.request_heights { let exp_blockhash = blocks.get(height).expect("block must exist in bitcoind"); assert_eq!( - chain.blocks().get(height), - Some(exp_blockhash), + chain.query(*height).map(|cp| cp.hash()), + Some(*exp_blockhash), "[{}:{}] block {}:{} must exist in final chain", i, t.name, From 8d8287d15f84980211f6c4dcc803874545f87cf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 25 Mar 2024 12:30:31 +0800 Subject: [PATCH 2/8] feat(chain): impl `PartialEq` on `CheckPoint` We impl `PartialEq` on `CheckPoint` instead of directly on `LocalChain`. We also made the implementation more efficient. --- crates/chain/src/local_chain.rs | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index f90b28e85..62a28a368 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -5,7 +5,6 @@ use core::convert::Infallible; use crate::collections::BTreeMap; use crate::{BlockId, ChainOracle}; use alloc::sync::Arc; -use alloc::vec::Vec; use bitcoin::block::Header; use bitcoin::BlockHash; @@ -35,6 +34,14 @@ struct CPInner { prev: Option>, } +impl PartialEq for CheckPoint { + fn eq(&self, other: &Self) -> bool { + let self_cps = self.iter().map(|cp| cp.block_id()); + let other_cps = other.iter().map(|cp| cp.block_id()); + self_cps.eq(other_cps) + } +} + impl CheckPoint { /// Construct a new base block at the front of a linked list. pub fn new(block: BlockId) -> Self { @@ -206,7 +213,7 @@ impl IntoIterator for CheckPoint { /// Script-pubkey based syncing mechanisms may not introduce transactions in a chronological order /// so some updates require introducing older blocks (to anchor older transactions). For /// script-pubkey based syncing, `introduce_older_blocks` would typically be `true`. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Update { /// The update chain's new tip. pub tip: CheckPoint, @@ -220,23 +227,11 @@ pub struct Update { } /// This is a local implementation of [`ChainOracle`]. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct LocalChain { tip: CheckPoint, } -impl PartialEq for LocalChain { - fn eq(&self, other: &Self) -> bool { - self.iter_checkpoints() - .map(|cp| cp.block_id()) - .collect::>() - == other - .iter_checkpoints() - .map(|cp| cp.block_id()) - .collect::>() - } -} - // TODO: Figure out whether we can get rid of this impl From for BTreeMap { fn from(value: LocalChain) -> Self { From bcb297bd64f562980a1f7fbf8cba134ca23e443d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 25 Mar 2024 13:44:01 +0800 Subject: [PATCH 3/8] feat(testenv): add `make_checkpoint_tip` This creates a checkpoint linked list which contains all blocks. --- crates/testenv/src/lib.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/crates/testenv/src/lib.rs b/crates/testenv/src/lib.rs index b836387c1..1c6f2de92 100644 --- a/crates/testenv/src/lib.rs +++ b/crates/testenv/src/lib.rs @@ -1,7 +1,11 @@ -use bdk_chain::bitcoin::{ - address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash, - secp256k1::rand::random, Address, Amount, Block, BlockHash, CompactTarget, ScriptBuf, - ScriptHash, Transaction, TxIn, TxOut, Txid, +use bdk_chain::{ + bitcoin::{ + address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash, + secp256k1::rand::random, Address, Amount, Block, BlockHash, CompactTarget, ScriptBuf, + ScriptHash, Transaction, TxIn, TxOut, Txid, + }, + local_chain::CheckPoint, + BlockId, }; use bitcoincore_rpc::{ bitcoincore_rpc_json::{GetBlockTemplateModes, GetBlockTemplateRules}, @@ -234,6 +238,18 @@ impl TestEnv { .send_to_address(address, amount, None, None, None, None, None, None)?; Ok(txid) } + + /// Create a checkpoint linked list of all the blocks in the chain. + pub fn make_checkpoint_tip(&self) -> CheckPoint { + CheckPoint::from_block_ids((0_u32..).map_while(|height| { + self.bitcoind + .client + .get_block_hash(height as u64) + .ok() + .map(|hash| BlockId { height, hash }) + })) + .expect("must craft tip") + } } #[cfg(test)] From 581fdb64ab164211fdc4cc121efbffd7618fc1e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 25 Mar 2024 13:39:21 +0800 Subject: [PATCH 4/8] feat(esplora)!: remove `EsploraExt::update_local_chain` Previously, we would update the `TxGraph` and `KeychainTxOutIndex` first, then create a second update for `LocalChain`. This required locking the receiving structures 3 times (instead of twice, which is optimal). This PR eliminates this requirement by making use of the new `query` method of `CheckPoint`. Examples are also updated to use the new API. --- crates/chain/src/local_chain.rs | 2 +- crates/esplora/src/async_ext.rs | 528 ++++++++++------- crates/esplora/src/blocking_ext.rs | 552 +++++++++++------- crates/esplora/src/lib.rs | 22 +- crates/esplora/tests/async_ext.rs | 70 ++- crates/esplora/tests/blocking_ext.rs | 107 +++- example-crates/example_esplora/src/main.rs | 82 ++- .../wallet_esplora_async/src/main.rs | 12 +- .../wallet_esplora_blocking/src/main.rs | 24 +- 9 files changed, 851 insertions(+), 548 deletions(-) diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index 62a28a368..aad281b1e 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -159,7 +159,7 @@ impl CheckPoint { /// Query for checkpoint at `height`. /// - /// Returns `None` if checkpoint at `height` does not exist`. + /// Returns `None` if checkpoint at `height` does not exist. pub fn query(&self, height: u32) -> Option { self.iter() // optimization to avoid iterating the entire chain if we do not get a direct hit diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 9e25eedfb..7924a8863 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,5 +1,8 @@ +use std::collections::BTreeSet; + use async_trait::async_trait; use bdk_chain::collections::btree_map; +use bdk_chain::Anchor; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, collections::BTreeMap, @@ -9,7 +12,7 @@ use bdk_chain::{ use esplora_client::TxStatus; use futures::{stream::FuturesOrdered, TryStreamExt}; -use crate::anchor_from_status; +use crate::{anchor_from_status, FullScanUpdate, SyncUpdate}; /// [`esplora_client::Error`] type Error = Box; @@ -22,36 +25,15 @@ type Error = Box; #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] pub trait EsploraAsyncExt { - /// Prepare a [`LocalChain`] update with blocks fetched from Esplora. - /// - /// * `local_tip` is the previous tip of [`LocalChain::tip`]. - /// * `request_heights` is the block heights that we are interested in fetching from Esplora. - /// - /// The result of this method can be applied to [`LocalChain::apply_update`]. - /// - /// ## Consistency - /// - /// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org - /// during the call. The size of re-org we can tollerate is server dependent but will be at - /// least 10. - /// - /// [`LocalChain`]: bdk_chain::local_chain::LocalChain - /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip - /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update - async fn update_local_chain( - &self, - local_tip: CheckPoint, - request_heights: impl IntoIterator + Send> + Send, - ) -> Result; - - /// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and - /// returns a [`TxGraph`] and a map of last active indices. + /// Scan keychain scripts for transactions against Esplora, returning an update that can be + /// applied to the receiving structures. /// + /// * `local_tip`: the previously seen tip from [`LocalChain::tip`]. /// * `keychain_spks`: keychains that we want to scan transactions for /// - /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated - /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in - /// parallel. + /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no + /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to + /// make in parallel. /// /// ## Note /// @@ -65,19 +47,23 @@ pub trait EsploraAsyncExt { /// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing). /// /// A `stop_gap` of 0 will be treated as a `stop_gap` of 1. + /// + /// [`LocalChain::tip`]: local_chain::LocalChain::tip async fn full_scan( &self, + local_tip: CheckPoint, keychain_spks: BTreeMap< K, impl IntoIterator + Send> + Send, >, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error>; + ) -> Result, Error>; /// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// specified and return a [`TxGraph`]. /// + /// * `local_tip`: the previously seen tip from [`LocalChain::tip`]. /// * `misc_spks`: scripts that we want to sync transactions for /// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we @@ -86,210 +72,216 @@ pub trait EsploraAsyncExt { /// If the scripts to sync are unknown, such as when restoring or importing a keychain that /// may include scripts that have been used, use [`full_scan`] with the keychain. /// + /// [`LocalChain::tip`]: local_chain::LocalChain::tip /// [`full_scan`]: EsploraAsyncExt::full_scan async fn sync( &self, + local_tip: CheckPoint, misc_spks: impl IntoIterator + Send> + Send, txids: impl IntoIterator + Send> + Send, outpoints: impl IntoIterator + Send> + Send, parallel_requests: usize, - ) -> Result, Error>; + ) -> Result; } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl EsploraAsyncExt for esplora_client::AsyncClient { - async fn update_local_chain( - &self, - local_tip: CheckPoint, - request_heights: impl IntoIterator + Send> + Send, - ) -> Result { - // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are - // consistent. - let mut fetched_blocks = self - .get_blocks(None) - .await? - .into_iter() - .map(|b| (b.time.height, b.id)) - .collect::>(); - let new_tip_height = fetched_blocks - .keys() - .last() - .copied() - .expect("must have atleast one block"); - - // Fetch blocks of heights that the caller is interested in, skipping blocks that are - // already fetched when constructing `fetched_blocks`. - for height in request_heights { - // do not fetch blocks higher than remote tip - if height > new_tip_height { - continue; - } - // only fetch what is missing - if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) { - // ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent - // with the chain at the time of `get_blocks` above (there could have been a deep - // re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's - // not possible to have a re-org deeper than that. - entry.insert(self.get_block_hash(height).await?); - } - } - - // Ensure `fetched_blocks` can create an update that connects with the original chain by - // finding a "Point of Agreement". - for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { - if height > new_tip_height { - continue; - } - - let fetched_hash = match fetched_blocks.entry(height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => { - *entry.insert(self.get_block_hash(height).await?) - } - }; - - // We have found point of agreement so the update will connect! - if fetched_hash == local_hash { - break; - } - } - - Ok(local_chain::Update { - tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from)) - .expect("must be in height order"), - introduce_older_blocks: true, - }) - } - async fn full_scan( &self, + local_tip: CheckPoint, keychain_spks: BTreeMap< K, impl IntoIterator + Send> + Send, >, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error> { - type TxsOfSpkIndex = (u32, Vec); - let parallel_requests = Ord::max(parallel_requests, 1); - let mut graph = TxGraph::::default(); - let mut last_active_indexes = BTreeMap::::new(); - let stop_gap = Ord::max(stop_gap, 1); - - for (keychain, spks) in keychain_spks { - let mut spks = spks.into_iter(); - let mut last_index = Option::::None; - let mut last_active_index = Option::::None; - - loop { - let handles = spks - .by_ref() - .take(parallel_requests) - .map(|(spk_index, spk)| { - let client = self.clone(); - async move { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen).await?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Result::<_, Error>::Ok((spk_index, spk_txs)); - } - } - } - }) - .collect::>(); + ) -> Result, Error> { + let update_blocks = init_chain_update(self, &local_tip).await?; + let (tx_graph, last_active_indices) = + full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?; + let local_chain = + finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?; + Ok(FullScanUpdate { + local_chain, + tx_graph, + last_active_indices, + }) + } - if handles.is_empty() { - break; - } + async fn sync( + &self, + local_tip: CheckPoint, + misc_spks: impl IntoIterator + Send> + Send, + txids: impl IntoIterator + Send> + Send, + outpoints: impl IntoIterator + Send> + Send, + parallel_requests: usize, + ) -> Result { + let update_blocks = init_chain_update(self, &local_tip).await?; + let tx_graph = + sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?; + let local_chain = + finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?; + Ok(SyncUpdate { + tx_graph, + local_chain, + }) + } +} - for (index, txs) in handles.try_collect::>().await? { - last_index = Some(index); - if !txs.is_empty() { - last_active_index = Some(index); - } - for tx in txs { - let _ = graph.insert_tx(tx.to_tx()); - if let Some(anchor) = anchor_from_status(&tx.status) { - let _ = graph.insert_anchor(tx.txid, anchor); - } +/// Create the initial chain update. +/// +/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the +/// update can connect to the `start_tip`. +/// +/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and +/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for +/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use +/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when +/// alternating between chain-sources. +#[doc(hidden)] +pub async fn init_chain_update( + client: &esplora_client::AsyncClient, + local_tip: &CheckPoint, +) -> Result, Error> { + // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are + // consistent. + let mut fetched_blocks = client + .get_blocks(None) + .await? + .into_iter() + .map(|b| (b.time.height, b.id)) + .collect::>(); + let new_tip_height = fetched_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); - let previous_outputs = tx.vin.iter().filter_map(|vin| { - let prevout = vin.prevout.as_ref()?; - Some(( - OutPoint { - txid: vin.txid, - vout: vin.vout, - }, - TxOut { - script_pubkey: prevout.scriptpubkey.clone(), - value: prevout.value, - }, - )) - }); - - for (outpoint, txout) in previous_outputs { - let _ = graph.insert_txout(outpoint, txout); - } - } - } + // Ensure `fetched_blocks` can create an update that connects with the original chain by + // finding a "Point of Agreement". + for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { + if height > new_tip_height { + continue; + } - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { - break; - } + let fetched_hash = match fetched_blocks.entry(height) { + btree_map::Entry::Occupied(entry) => *entry.get(), + btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height).await?), + }; + + // We have found point of agreement so the update will connect! + if fetched_hash == local_hash { + break; + } + } + + Ok(fetched_blocks) +} + +/// Fetches missing checkpoints and finalizes the [`local_chain::Update`]. +/// +/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an +/// existing checkpoint/block under `local_tip` or `update_blocks`. +#[doc(hidden)] +pub async fn finalize_chain_update( + client: &esplora_client::AsyncClient, + local_tip: &CheckPoint, + anchors: &BTreeSet<(A, Txid)>, + mut update_blocks: BTreeMap, +) -> Result { + let update_tip_height = update_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); + + // We want to have a corresponding checkpoint per height. We iterate the heights of anchors + // backwards, comparing it against our `local_tip`'s chain and our current set of + // `update_blocks` to see if a corresponding checkpoint already exists. + let anchor_heights = anchors + .iter() + .rev() + .map(|(a, _)| a.anchor_block().height) + // filter out heights that surpass the update tip + .filter(|h| *h <= update_tip_height) + // filter out duplicate heights + .filter({ + let mut prev_height = Option::::None; + move |h| match prev_height.replace(*h) { + None => true, + Some(prev_h) => prev_h != *h, } + }); - if let Some(last_active_index) = last_active_index { - last_active_indexes.insert(keychain, last_active_index); + // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of + // checkpoints more efficient. + let mut curr_cp = local_tip.clone(); + + for h in anchor_heights { + if let Some(cp) = curr_cp.query_from(h) { + curr_cp = cp.clone(); + if cp.height() == h { + continue; } } - - Ok((graph, last_active_indexes)) + if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) { + entry.insert(client.get_block_hash(h).await?); + } } - async fn sync( - &self, - misc_spks: impl IntoIterator + Send> + Send, - txids: impl IntoIterator + Send> + Send, - outpoints: impl IntoIterator + Send> + Send, - parallel_requests: usize, - ) -> Result, Error> { - let mut graph = self - .full_scan( - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), - usize::MAX, - parallel_requests, - ) - .await - .map(|(g, _)| g)?; - - let mut txids = txids.into_iter(); + Ok(local_chain::Update { + tip: CheckPoint::from_block_ids( + update_blocks + .into_iter() + .map(|(height, hash)| BlockId { height, hash }), + ) + .expect("must be in order"), + introduce_older_blocks: true, + }) +} + +/// This performs a full scan to get an update for the [`TxGraph`] and +/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex). +#[doc(hidden)] +pub async fn full_scan_for_index_and_graph( + client: &esplora_client::AsyncClient, + keychain_spks: BTreeMap< + K, + impl IntoIterator + Send> + Send, + >, + stop_gap: usize, + parallel_requests: usize, +) -> Result<(TxGraph, BTreeMap), Error> { + type TxsOfSpkIndex = (u32, Vec); + let parallel_requests = Ord::max(parallel_requests, 1); + let mut graph = TxGraph::::default(); + let mut last_active_indexes = BTreeMap::::new(); + + for (keychain, spks) in keychain_spks { + let mut spks = spks.into_iter(); + let mut last_index = Option::::None; + let mut last_active_index = Option::::None; + loop { - let handles = txids + let handles = spks .by_ref() .take(parallel_requests) - .filter(|&txid| graph.get_tx(txid).is_none()) - .map(|txid| { - let client = self.clone(); - async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) } + .map(|(spk_index, spk)| { + let client = client.clone(); + async move { + let mut last_seen = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_seen).await?; + let tx_count = txs.len(); + last_seen = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break Result::<_, Error>::Ok((spk_index, spk_txs)); + } + } + } }) .collect::>(); @@ -297,38 +289,128 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { break; } - for (txid, status) in handles.try_collect::>().await? { - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + for (index, txs) in handles.try_collect::>().await? { + last_index = Some(index); + if !txs.is_empty() { + last_active_index = Some(index); + } + for tx in txs { + let _ = graph.insert_tx(tx.to_tx()); + if let Some(anchor) = anchor_from_status(&tx.status) { + let _ = graph.insert_anchor(tx.txid, anchor); + } + + let previous_outputs = tx.vin.iter().filter_map(|vin| { + let prevout = vin.prevout.as_ref()?; + Some(( + OutPoint { + txid: vin.txid, + vout: vin.vout, + }, + TxOut { + script_pubkey: prevout.scriptpubkey.clone(), + value: prevout.value, + }, + )) + }); + + for (outpoint, txout) in previous_outputs { + let _ = graph.insert_txout(outpoint, txout); + } } } + + let last_index = last_index.expect("Must be set since handles wasn't empty."); + let gap_limit_reached = if let Some(i) = last_active_index { + last_index >= i.saturating_add(stop_gap as u32) + } else { + last_index + 1 >= stop_gap as u32 + }; + if gap_limit_reached { + break; + } } - for op in outpoints.into_iter() { - if graph.get_tx(op.txid).is_none() { - if let Some(tx) = self.get_tx(&op.txid).await? { - let _ = graph.insert_tx(tx); - } - let status = self.get_tx_status(&op.txid).await?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(op.txid, anchor); - } + if let Some(last_active_index) = last_active_index { + last_active_indexes.insert(keychain, last_active_index); + } + } + + Ok((graph, last_active_indexes)) +} + +#[doc(hidden)] +pub async fn sync_for_index_and_graph( + client: &esplora_client::AsyncClient, + misc_spks: impl IntoIterator + Send> + Send, + txids: impl IntoIterator + Send> + Send, + outpoints: impl IntoIterator + Send> + Send, + parallel_requests: usize, +) -> Result, Error> { + let mut graph = full_scan_for_index_and_graph( + client, + [( + (), + misc_spks + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + )] + .into(), + usize::MAX, + parallel_requests, + ) + .await + .map(|(g, _)| g)?; + + let mut txids = txids.into_iter(); + loop { + let handles = txids + .by_ref() + .take(parallel_requests) + .filter(|&txid| graph.get_tx(txid).is_none()) + .map(|txid| { + let client = client.clone(); + async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) } + }) + .collect::>(); + + if handles.is_empty() { + break; + } + + for (txid, status) in handles.try_collect::>().await? { + if let Some(anchor) = anchor_from_status(&status) { + let _ = graph.insert_anchor(txid, anchor); } + } + } - if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? { - if let Some(txid) = op_status.txid { - if graph.get_tx(txid).is_none() { - if let Some(tx) = self.get_tx(&txid).await? { - let _ = graph.insert_tx(tx); - } - let status = self.get_tx_status(&txid).await?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); - } + for op in outpoints.into_iter() { + if graph.get_tx(op.txid).is_none() { + if let Some(tx) = client.get_tx(&op.txid).await? { + let _ = graph.insert_tx(tx); + } + let status = client.get_tx_status(&op.txid).await?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = graph.insert_anchor(op.txid, anchor); + } + } + + if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _).await? { + if let Some(txid) = op_status.txid { + if graph.get_tx(txid).is_none() { + if let Some(tx) = client.get_tx(&txid).await? { + let _ = graph.insert_tx(tx); + } + let status = client.get_tx_status(&txid).await?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = graph.insert_anchor(txid, anchor); } } } } - Ok(graph) } + + Ok(graph) } diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 9cd11e819..56267cca5 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,7 +1,10 @@ +use std::collections::BTreeSet; use std::thread::JoinHandle; +use std::usize; use bdk_chain::collections::btree_map; use bdk_chain::collections::BTreeMap; +use bdk_chain::Anchor; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, local_chain::{self, CheckPoint}, @@ -10,9 +13,11 @@ use bdk_chain::{ use esplora_client::TxStatus; use crate::anchor_from_status; +use crate::FullScanUpdate; +use crate::SyncUpdate; /// [`esplora_client::Error`] -type Error = Box; +pub type Error = Box; /// Trait to extend the functionality of [`esplora_client::BlockingClient`]. /// @@ -20,36 +25,15 @@ type Error = Box; /// /// [crate-level documentation]: crate pub trait EsploraExt { - /// Prepare a [`LocalChain`] update with blocks fetched from Esplora. - /// - /// * `local_tip` is the previous tip of [`LocalChain::tip`]. - /// * `request_heights` is the block heights that we are interested in fetching from Esplora. - /// - /// The result of this method can be applied to [`LocalChain::apply_update`]. - /// - /// ## Consistency - /// - /// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org - /// during the call. The size of re-org we can tollerate is server dependent but will be at - /// least 10. - /// - /// [`LocalChain`]: bdk_chain::local_chain::LocalChain - /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip - /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update - fn update_local_chain( - &self, - local_tip: CheckPoint, - request_heights: impl IntoIterator, - ) -> Result; - - /// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and - /// returns a [`TxGraph`] and a map of last active indices. + /// Scan keychain scripts for transactions against Esplora, returning an update that can be + /// applied to the receiving structures. /// + /// * `local_tip`: the previously seen tip from [`LocalChain::tip`]. /// * `keychain_spks`: keychains that we want to scan transactions for /// - /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated - /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in - /// parallel. + /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no + /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to + /// make in parallel. /// /// ## Note /// @@ -63,16 +47,20 @@ pub trait EsploraExt { /// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing). /// /// A `stop_gap` of 0 will be treated as a `stop_gap` of 1. + /// + /// [`LocalChain::tip`]: local_chain::LocalChain::tip fn full_scan( &self, + local_tip: CheckPoint, keychain_spks: BTreeMap>, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error>; + ) -> Result, Error>; /// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// specified and return a [`TxGraph`]. /// + /// * `local_tip`: the previously seen tip from [`LocalChain::tip`]. /// * `misc_spks`: scripts that we want to sync transactions for /// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we @@ -81,251 +69,365 @@ pub trait EsploraExt { /// If the scripts to sync are unknown, such as when restoring or importing a keychain that /// may include scripts that have been used, use [`full_scan`] with the keychain. /// + /// [`LocalChain::tip`]: local_chain::LocalChain::tip /// [`full_scan`]: EsploraExt::full_scan fn sync( &self, + local_tip: CheckPoint, misc_spks: impl IntoIterator, txids: impl IntoIterator, outpoints: impl IntoIterator, parallel_requests: usize, - ) -> Result, Error>; + ) -> Result; } impl EsploraExt for esplora_client::BlockingClient { - fn update_local_chain( + fn full_scan( &self, local_tip: CheckPoint, - request_heights: impl IntoIterator, - ) -> Result { - // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are - // consistent. - let mut fetched_blocks = self - .get_blocks(None)? - .into_iter() - .map(|b| (b.time.height, b.id)) - .collect::>(); - let new_tip_height = fetched_blocks - .keys() - .last() - .copied() - .expect("must atleast have one block"); - - // Fetch blocks of heights that the caller is interested in, skipping blocks that are - // already fetched when constructing `fetched_blocks`. - for height in request_heights { - // do not fetch blocks higher than remote tip - if height > new_tip_height { - continue; - } - // only fetch what is missing - if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) { - // ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent - // with the chain at the time of `get_blocks` above (there could have been a deep - // re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's - // not possible to have a re-org deeper than that. - entry.insert(self.get_block_hash(height)?); - } - } - - // Ensure `fetched_blocks` can create an update that connects with the original chain by - // finding a "Point of Agreement". - for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { - if height > new_tip_height { - continue; - } - - let fetched_hash = match fetched_blocks.entry(height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => *entry.insert(self.get_block_hash(height)?), - }; - - // We have found point of agreement so the update will connect! - if fetched_hash == local_hash { - break; - } - } - - Ok(local_chain::Update { - tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from)) - .expect("must be in height order"), - introduce_older_blocks: true, + keychain_spks: BTreeMap>, + stop_gap: usize, + parallel_requests: usize, + ) -> Result, Error> { + let update_blocks = init_chain_update_blocking(self, &local_tip)?; + let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking( + self, + keychain_spks, + stop_gap, + parallel_requests, + )?; + let local_chain = finalize_chain_update_blocking( + self, + &local_tip, + tx_graph.all_anchors(), + update_blocks, + )?; + Ok(FullScanUpdate { + local_chain, + tx_graph, + last_active_indices, }) } - fn full_scan( + fn sync( &self, - keychain_spks: BTreeMap>, - stop_gap: usize, + local_tip: CheckPoint, + misc_spks: impl IntoIterator, + txids: impl IntoIterator, + outpoints: impl IntoIterator, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error> { - type TxsOfSpkIndex = (u32, Vec); - let parallel_requests = Ord::max(parallel_requests, 1); - let mut graph = TxGraph::::default(); - let mut last_active_indexes = BTreeMap::::new(); - let stop_gap = Ord::max(stop_gap, 1); - - for (keychain, spks) in keychain_spks { - let mut spks = spks.into_iter(); - let mut last_index = Option::::None; - let mut last_active_index = Option::::None; - - loop { - let handles = spks - .by_ref() - .take(parallel_requests) - .map(|(spk_index, spk)| { - std::thread::spawn({ - let client = self.clone(); - move || -> Result { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen)?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Ok((spk_index, spk_txs)); - } - } - } - }) - }) - .collect::>>>(); + ) -> Result { + let update_blocks = init_chain_update_blocking(self, &local_tip)?; + let tx_graph = sync_for_index_and_graph_blocking( + self, + misc_spks, + txids, + outpoints, + parallel_requests, + )?; + let local_chain = finalize_chain_update_blocking( + self, + &local_tip, + tx_graph.all_anchors(), + update_blocks, + )?; + Ok(SyncUpdate { + local_chain, + tx_graph, + }) + } +} - if handles.is_empty() { - break; - } +/// Create the initial chain update. +/// +/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the +/// update can connect to the `start_tip`. +/// +/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and +/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for +/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use +/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when +/// alternating between chain-sources. +#[doc(hidden)] +pub fn init_chain_update_blocking( + client: &esplora_client::BlockingClient, + local_tip: &CheckPoint, +) -> Result, Error> { + // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are + // consistent. + let mut fetched_blocks = client + .get_blocks(None)? + .into_iter() + .map(|b| (b.time.height, b.id)) + .collect::>(); + let new_tip_height = fetched_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); - for handle in handles { - let (index, txs) = handle.join().expect("thread must not panic")?; - last_index = Some(index); - if !txs.is_empty() { - last_active_index = Some(index); - } - for tx in txs { - let _ = graph.insert_tx(tx.to_tx()); - if let Some(anchor) = anchor_from_status(&tx.status) { - let _ = graph.insert_anchor(tx.txid, anchor); - } + // Ensure `fetched_blocks` can create an update that connects with the original chain by + // finding a "Point of Agreement". + for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { + if height > new_tip_height { + continue; + } - let previous_outputs = tx.vin.iter().filter_map(|vin| { - let prevout = vin.prevout.as_ref()?; - Some(( - OutPoint { - txid: vin.txid, - vout: vin.vout, - }, - TxOut { - script_pubkey: prevout.scriptpubkey.clone(), - value: prevout.value, - }, - )) - }); - - for (outpoint, txout) in previous_outputs { - let _ = graph.insert_txout(outpoint, txout); - } - } - } + let fetched_hash = match fetched_blocks.entry(height) { + btree_map::Entry::Occupied(entry) => *entry.get(), + btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height)?), + }; - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { - break; - } + // We have found point of agreement so the update will connect! + if fetched_hash == local_hash { + break; + } + } + + Ok(fetched_blocks) +} + +/// Fetches missing checkpoints and finalizes the [`local_chain::Update`]. +/// +/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an +/// existing checkpoint/block under `local_tip` or `update_blocks`. +#[doc(hidden)] +pub fn finalize_chain_update_blocking( + client: &esplora_client::BlockingClient, + local_tip: &CheckPoint, + anchors: &BTreeSet<(A, Txid)>, + mut update_blocks: BTreeMap, +) -> Result { + let update_tip_height = update_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); + + // We want to have a corresponding checkpoint per height. We iterate the heights of anchors + // backwards, comparing it against our `local_tip`'s chain and our current set of + // `update_blocks` to see if a corresponding checkpoint already exists. + let anchor_heights = anchors + .iter() + .rev() + .map(|(a, _)| a.anchor_block().height) + // filter out heights that surpass the update tip + .filter(|h| *h <= update_tip_height) + // filter out duplicate heights + .filter({ + let mut prev_height = Option::::None; + move |h| match prev_height.replace(*h) { + None => true, + Some(prev_h) => prev_h != *h, } + }); + + // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of + // checkpoints more efficient. + let mut curr_cp = local_tip.clone(); - if let Some(last_active_index) = last_active_index { - last_active_indexes.insert(keychain, last_active_index); + for h in anchor_heights { + if let Some(cp) = curr_cp.query_from(h) { + curr_cp = cp.clone(); + if cp.height() == h { + continue; } } - - Ok((graph, last_active_indexes)) + if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) { + entry.insert(client.get_block_hash(h)?); + } } - fn sync( - &self, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - parallel_requests: usize, - ) -> Result, Error> { - let mut graph = self - .full_scan( - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), - usize::MAX, - parallel_requests, - ) - .map(|(g, _)| g)?; - - let mut txids = txids.into_iter(); + Ok(local_chain::Update { + tip: CheckPoint::from_block_ids( + update_blocks + .into_iter() + .map(|(height, hash)| BlockId { height, hash }), + ) + .expect("must be in order"), + introduce_older_blocks: true, + }) +} + +/// This performs a full scan to get an update for the [`TxGraph`] and +/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex). +#[doc(hidden)] +pub fn full_scan_for_index_and_graph_blocking( + client: &esplora_client::BlockingClient, + keychain_spks: BTreeMap>, + stop_gap: usize, + parallel_requests: usize, +) -> Result<(TxGraph, BTreeMap), Error> { + type TxsOfSpkIndex = (u32, Vec); + let parallel_requests = Ord::max(parallel_requests, 1); + let mut tx_graph = TxGraph::::default(); + let mut last_active_indices = BTreeMap::::new(); + + for (keychain, spks) in keychain_spks { + let mut spks = spks.into_iter(); + let mut last_index = Option::::None; + let mut last_active_index = Option::::None; + loop { - let handles = txids + let handles = spks .by_ref() .take(parallel_requests) - .filter(|&txid| graph.get_tx(txid).is_none()) - .map(|txid| { + .map(|(spk_index, spk)| { std::thread::spawn({ - let client = self.clone(); - move || { - client - .get_tx_status(&txid) - .map_err(Box::new) - .map(|s| (txid, s)) + let client = client.clone(); + move || -> Result { + let mut last_seen = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_seen)?; + let tx_count = txs.len(); + last_seen = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break Ok((spk_index, spk_txs)); + } + } } }) }) - .collect::>>>(); + .collect::>>>(); if handles.is_empty() { break; } for handle in handles { - let (txid, status) = handle.join().expect("thread must not panic")?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + let (index, txs) = handle.join().expect("thread must not panic")?; + last_index = Some(index); + if !txs.is_empty() { + last_active_index = Some(index); + } + for tx in txs { + let _ = tx_graph.insert_tx(tx.to_tx()); + if let Some(anchor) = anchor_from_status(&tx.status) { + let _ = tx_graph.insert_anchor(tx.txid, anchor); + } + + let previous_outputs = tx.vin.iter().filter_map(|vin| { + let prevout = vin.prevout.as_ref()?; + Some(( + OutPoint { + txid: vin.txid, + vout: vin.vout, + }, + TxOut { + script_pubkey: prevout.scriptpubkey.clone(), + value: prevout.value, + }, + )) + }); + + for (outpoint, txout) in previous_outputs { + let _ = tx_graph.insert_txout(outpoint, txout); + } } } + + let last_index = last_index.expect("Must be set since handles wasn't empty."); + let gap_limit_reached = if let Some(i) = last_active_index { + last_index >= i.saturating_add(stop_gap as u32) + } else { + last_index + 1 >= stop_gap as u32 + }; + if gap_limit_reached { + break; + } } - for op in outpoints { - if graph.get_tx(op.txid).is_none() { - if let Some(tx) = self.get_tx(&op.txid)? { - let _ = graph.insert_tx(tx); - } - let status = self.get_tx_status(&op.txid)?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(op.txid, anchor); - } + if let Some(last_active_index) = last_active_index { + last_active_indices.insert(keychain, last_active_index); + } + } + + Ok((tx_graph, last_active_indices)) +} + +#[doc(hidden)] +pub fn sync_for_index_and_graph_blocking( + client: &esplora_client::BlockingClient, + misc_spks: impl IntoIterator, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + parallel_requests: usize, +) -> Result, Error> { + let (mut tx_graph, _) = full_scan_for_index_and_graph_blocking( + client, + { + let mut keychains = BTreeMap::new(); + keychains.insert( + (), + misc_spks + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + ); + keychains + }, + usize::MAX, + parallel_requests, + )?; + + let mut txids = txids.into_iter(); + loop { + let handles = txids + .by_ref() + .take(parallel_requests) + .filter(|&txid| tx_graph.get_tx(txid).is_none()) + .map(|txid| { + std::thread::spawn({ + let client = client.clone(); + move || { + client + .get_tx_status(&txid) + .map_err(Box::new) + .map(|s| (txid, s)) + } + }) + }) + .collect::>>>(); + + if handles.is_empty() { + break; + } + + for handle in handles { + let (txid, status) = handle.join().expect("thread must not panic")?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = tx_graph.insert_anchor(txid, anchor); } + } + } - if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? { - if let Some(txid) = op_status.txid { - if graph.get_tx(txid).is_none() { - if let Some(tx) = self.get_tx(&txid)? { - let _ = graph.insert_tx(tx); - } - let status = self.get_tx_status(&txid)?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); - } + for op in outpoints { + if tx_graph.get_tx(op.txid).is_none() { + if let Some(tx) = client.get_tx(&op.txid)? { + let _ = tx_graph.insert_tx(tx); + } + let status = client.get_tx_status(&op.txid)?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = tx_graph.insert_anchor(op.txid, anchor); + } + } + + if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _)? { + if let Some(txid) = op_status.txid { + if tx_graph.get_tx(txid).is_none() { + if let Some(tx) = client.get_tx(&txid)? { + let _ = tx_graph.insert_tx(tx); + } + let status = client.get_tx_status(&txid)?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = tx_graph.insert_anchor(txid, anchor); } } } } - Ok(graph) } + + Ok(tx_graph) } diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 535167ff2..c422a0833 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -16,7 +16,9 @@ //! [`TxGraph`]: bdk_chain::tx_graph::TxGraph //! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora -use bdk_chain::{BlockId, ConfirmationTimeHeightAnchor}; +use std::collections::BTreeMap; + +use bdk_chain::{local_chain, BlockId, ConfirmationTimeHeightAnchor, TxGraph}; use esplora_client::TxStatus; pub use esplora_client; @@ -48,3 +50,21 @@ fn anchor_from_status(status: &TxStatus) -> Option None } } + +/// Update returns from a full scan. +pub struct FullScanUpdate { + /// The update to apply to the receiving [`LocalChain`](local_chain::LocalChain). + pub local_chain: local_chain::Update, + /// The update to apply to the receiving [`TxGraph`]. + pub tx_graph: TxGraph, + /// Last active indices for the corresponding keychains (`K`). + pub last_active_indices: BTreeMap, +} + +/// Update returned from a sync. +pub struct SyncUpdate { + /// The update to apply to the receiving [`LocalChain`](local_chain::LocalChain). + pub local_chain: local_chain::Update, + /// The update to apply to the receiving [`TxGraph`]. + pub tx_graph: TxGraph, +} diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index c71c214e9..5946bb4d8 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -2,7 +2,7 @@ use bdk_esplora::EsploraAsyncExt; use electrsd::bitcoind::anyhow; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use esplora_client::{self, Builder}; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; @@ -52,8 +52,12 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } - let graph_update = client + // use a full checkpoint linked list (since this is not what we are testing) + let cp_tip = env.make_checkpoint_tip(); + + let sync_update = client .sync( + cp_tip.clone(), misc_spks.into_iter(), vec![].into_iter(), vec![].into_iter(), @@ -61,6 +65,24 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { ) .await?; + assert!( + { + let update_cps = sync_update + .local_chain + .tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + let superset_cps = cp_tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + superset_cps.is_superset(&update_cps) + }, + "update should not alter original checkpoint tip since we already started with all checkpoints", + ); + + let graph_update = sync_update.tx_graph; // Check to see if we have the floating txouts available from our two created transactions' // previous outputs in order to calculate transaction fees. for tx in graph_update.full_txs() { @@ -140,14 +162,24 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } + // use a full checkpoint linked list (since this is not what we are testing) + let cp_tip = env.make_checkpoint_tip(); + // A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4 // will. - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 3, 1).await?; - assert!(graph_update.full_txs().next().is_none()); - assert!(active_indices.is_empty()); - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 4, 1).await?; - assert_eq!(graph_update.full_txs().next().unwrap().txid, txid_4th_addr); - assert_eq!(active_indices[&0], 3); + let full_scan_update = client + .full_scan(cp_tip.clone(), keychains.clone(), 3, 1) + .await?; + assert!(full_scan_update.tx_graph.full_txs().next().is_none()); + assert!(full_scan_update.last_active_indices.is_empty()); + let full_scan_update = client + .full_scan(cp_tip.clone(), keychains.clone(), 4, 1) + .await?; + assert_eq!( + full_scan_update.tx_graph.full_txs().next().unwrap().txid, + txid_4th_addr + ); + assert_eq!(full_scan_update.last_active_indices[&0], 3); // Now receive a coin on the last address. let txid_last_addr = env.bitcoind.client.send_to_address( @@ -167,16 +199,26 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 5, 1).await?; - let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); + let full_scan_update = client + .full_scan(cp_tip.clone(), keychains.clone(), 5, 1) + .await?; + let txs: HashSet<_> = full_scan_update + .tx_graph + .full_txs() + .map(|tx| tx.txid) + .collect(); assert_eq!(txs.len(), 1); assert!(txs.contains(&txid_4th_addr)); - assert_eq!(active_indices[&0], 3); - let (graph_update, active_indices) = client.full_scan(keychains, 6, 1).await?; - let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); + assert_eq!(full_scan_update.last_active_indices[&0], 3); + let full_scan_update = client.full_scan(cp_tip, keychains, 6, 1).await?; + let txs: HashSet<_> = full_scan_update + .tx_graph + .full_txs() + .map(|tx| tx.txid) + .collect(); assert_eq!(txs.len(), 2); assert!(txs.contains(&txid_4th_addr) && txs.contains(&txid_last_addr)); - assert_eq!(active_indices[&0], 9); + assert_eq!(full_scan_update.last_active_indices[&0], 9); Ok(()) } diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 304d36065..ebbd8d000 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -3,7 +3,7 @@ use bdk_chain::BlockId; use bdk_esplora::EsploraExt; use electrsd::bitcoind::anyhow; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; -use esplora_client::{self, Builder}; +use esplora_client::{self, BlockHash, Builder}; use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::str::FromStr; use std::thread::sleep; @@ -68,13 +68,35 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } - let graph_update = client.sync( + // use a full checkpoint linked list (since this is not what we are testing) + let cp_tip = env.make_checkpoint_tip(); + + let sync_update = client.sync( + cp_tip.clone(), misc_spks.into_iter(), vec![].into_iter(), vec![].into_iter(), 1, )?; + assert!( + { + let update_cps = sync_update + .local_chain + .tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + let superset_cps = cp_tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + superset_cps.is_superset(&update_cps) + }, + "update should not alter original checkpoint tip since we already started with all checkpoints", + ); + + let graph_update = sync_update.tx_graph; // Check to see if we have the floating txouts available from our two created transactions' // previous outputs in order to calculate transaction fees. for tx in graph_update.full_txs() { @@ -155,14 +177,20 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } + // use a full checkpoint linked list (since this is not what we are testing) + let cp_tip = env.make_checkpoint_tip(); + // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4 // will. - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 3, 1)?; - assert!(graph_update.full_txs().next().is_none()); - assert!(active_indices.is_empty()); - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 4, 1)?; - assert_eq!(graph_update.full_txs().next().unwrap().txid, txid_4th_addr); - assert_eq!(active_indices[&0], 3); + let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 3, 1)?; + assert!(full_scan_update.tx_graph.full_txs().next().is_none()); + assert!(full_scan_update.last_active_indices.is_empty()); + let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 4, 1)?; + assert_eq!( + full_scan_update.tx_graph.full_txs().next().unwrap().txid, + txid_4th_addr + ); + assert_eq!(full_scan_update.last_active_indices[&0], 3); // Now receive a coin on the last address. let txid_last_addr = env.bitcoind.client.send_to_address( @@ -182,16 +210,24 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 5, 1)?; - let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); + let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 5, 1)?; + let txs: HashSet<_> = full_scan_update + .tx_graph + .full_txs() + .map(|tx| tx.txid) + .collect(); assert_eq!(txs.len(), 1); assert!(txs.contains(&txid_4th_addr)); - assert_eq!(active_indices[&0], 3); - let (graph_update, active_indices) = client.full_scan(keychains, 6, 1)?; - let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); + assert_eq!(full_scan_update.last_active_indices[&0], 3); + let full_scan_update = client.full_scan(cp_tip.clone(), keychains, 6, 1)?; + let txs: HashSet<_> = full_scan_update + .tx_graph + .full_txs() + .map(|tx| tx.txid) + .collect(); assert_eq!(txs.len(), 2); assert!(txs.contains(&txid_4th_addr) && txs.contains(&txid_last_addr)); - assert_eq!(active_indices[&0], 9); + assert_eq!(full_scan_update.last_active_indices[&0], 9); Ok(()) } @@ -317,14 +353,38 @@ fn update_local_chain() -> anyhow::Result<()> { for (i, t) in test_cases.into_iter().enumerate() { println!("Case {}: {}", i, t.name); let mut chain = t.chain; + let cp_tip = chain.tip(); - let update = client - .update_local_chain(chain.tip(), t.request_heights.iter().copied()) - .map_err(|err| { - anyhow::format_err!("[{}:{}] `update_local_chain` failed: {}", i, t.name, err) + let new_blocks = + bdk_esplora::init_chain_update_blocking(&client, &cp_tip).map_err(|err| { + anyhow::format_err!("[{}:{}] `init_chain_update` failed: {}", i, t.name, err) })?; - let update_blocks = update + let mock_anchors = t + .request_heights + .iter() + .map(|&h| { + let anchor_blockhash: BlockHash = bdk_chain::bitcoin::hashes::Hash::hash( + &format!("hash_at_height_{}", h).into_bytes(), + ); + let txid: Txid = bdk_chain::bitcoin::hashes::Hash::hash( + &format!("txid_at_height_{}", h).into_bytes(), + ); + let anchor = BlockId { + height: h, + hash: anchor_blockhash, + }; + (anchor, txid) + }) + .collect::>(); + + let chain_update = bdk_esplora::finalize_chain_update_blocking( + &client, + &cp_tip, + &mock_anchors, + new_blocks, + )?; + let update_blocks = chain_update .tip .iter() .map(|cp| cp.block_id()) @@ -346,14 +406,15 @@ fn update_local_chain() -> anyhow::Result<()> { ) .collect::>(); - assert_eq!( - update_blocks, exp_update_blocks, + assert!( + update_blocks.is_superset(&exp_update_blocks), "[{}:{}] unexpected update", - i, t.name + i, + t.name ); let _ = chain - .apply_update(update) + .apply_update(chain_update) .unwrap_or_else(|err| panic!("[{}:{}] update failed to apply: {}", i, t.name, err)); // all requested heights must exist in the final chain diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index e92205706..d3ec8bae4 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet}, + collections::BTreeMap, io::{self, Write}, sync::Mutex, }; @@ -60,6 +60,7 @@ enum EsploraCommands { esplora_args: EsploraArgs, }, } + impl EsploraCommands { fn esplora_args(&self) -> EsploraArgs { match self { @@ -149,20 +150,24 @@ fn main() -> anyhow::Result<()> { }; let client = esplora_cmd.esplora_args().client(args.network)?; - // Prepare the `IndexedTxGraph` update based on whether we are scanning or syncing. + // Prepare the `IndexedTxGraph` and `LocalChain` updates based on whether we are scanning or + // syncing. + // // Scanning: We are iterating through spks of all keychains and scanning for transactions for // each spk. We start with the lowest derivation index spk and stop scanning after `stop_gap` // number of consecutive spks have no transaction history. A Scan is done in situations of // wallet restoration. It is a special case. Applications should use "sync" style updates // after an initial scan. + // // Syncing: We only check for specified spks, utxos and txids to update their confirmation // status or fetch missing transactions. - let indexed_tx_graph_changeset = match &esplora_cmd { + let (local_chain_changeset, indexed_tx_graph_changeset) = match &esplora_cmd { EsploraCommands::Scan { stop_gap, scan_options, .. } => { + let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); let keychain_spks = graph .lock() .expect("mutex must not be poisoned") @@ -189,19 +194,29 @@ fn main() -> anyhow::Result<()> { // is reached. It returns a `TxGraph` update (`graph_update`) and a structure that // represents the last active spk derivation indices of keychains // (`keychain_indices_update`). - let (graph_update, last_active_indices) = client - .full_scan(keychain_spks, *stop_gap, scan_options.parallel_requests) + let update = client + .full_scan( + local_tip, + keychain_spks, + *stop_gap, + scan_options.parallel_requests, + ) .context("scanning for transactions")?; let mut graph = graph.lock().expect("mutex must not be poisoned"); + let mut chain = chain.lock().expect("mutex must not be poisoned"); // Because we did a stop gap based scan we are likely to have some updates to our // deriviation indices. Usually before a scan you are on a fresh wallet with no // addresses derived so we need to derive up to last active addresses the scan found // before adding the transactions. - let (_, index_changeset) = graph.index.reveal_to_target_multi(&last_active_indices); - let mut indexed_tx_graph_changeset = graph.apply_update(graph_update); - indexed_tx_graph_changeset.append(index_changeset.into()); - indexed_tx_graph_changeset + (chain.apply_update(update.local_chain)?, { + let (_, index_changeset) = graph + .index + .reveal_to_target_multi(&update.last_active_indices); + let mut indexed_tx_graph_changeset = graph.apply_update(update.tx_graph); + indexed_tx_graph_changeset.append(index_changeset.into()); + indexed_tx_graph_changeset + }) } EsploraCommands::Sync { mut unused_spks, @@ -227,12 +242,13 @@ fn main() -> anyhow::Result<()> { let mut outpoints: Box> = Box::new(core::iter::empty()); let mut txids: Box> = Box::new(core::iter::empty()); + let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); + // Get a short lock on the structures to get spks, utxos, and txs that we are interested // in. { let graph = graph.lock().unwrap(); let chain = chain.lock().unwrap(); - let chain_tip = chain.tip().block_id(); if *all_spks { let all_spks = graph @@ -272,7 +288,7 @@ fn main() -> anyhow::Result<()> { let init_outpoints = graph.index.outpoints().iter().cloned(); let utxos = graph .graph() - .filter_chain_unspents(&*chain, chain_tip, init_outpoints) + .filter_chain_unspents(&*chain, local_tip.block_id(), init_outpoints) .map(|(_, utxo)| utxo) .collect::>(); outpoints = Box::new( @@ -295,7 +311,7 @@ fn main() -> anyhow::Result<()> { // `EsploraExt::update_tx_graph_without_keychain`. let unconfirmed_txids = graph .graph() - .list_chain_txs(&*chain, chain_tip) + .list_chain_txs(&*chain, local_tip.block_id()) .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) .map(|canonical_tx| canonical_tx.tx_node.txid) .collect::>(); @@ -307,44 +323,26 @@ fn main() -> anyhow::Result<()> { } } - let graph_update = - client.sync(spks, txids, outpoints, scan_options.parallel_requests)?; + let update = client.sync( + local_tip, + spks, + txids, + outpoints, + scan_options.parallel_requests, + )?; - graph.lock().unwrap().apply_update(graph_update) + ( + chain.lock().unwrap().apply_update(update.local_chain)?, + graph.lock().unwrap().apply_update(update.tx_graph), + ) } }; println!(); - // Now that we're done updating the `IndexedTxGraph`, it's time to update the `LocalChain`! We - // want the `LocalChain` to have data about all the anchors in the `TxGraph` - for this reason, - // we want retrieve the blocks at the heights of the newly added anchors that are missing from - // our view of the chain. - let (missing_block_heights, tip) = { - let chain = &*chain.lock().unwrap(); - let missing_block_heights = indexed_tx_graph_changeset - .graph - .missing_heights_from(chain) - .collect::>(); - let tip = chain.tip(); - (missing_block_heights, tip) - }; - - println!("prev tip: {}", tip.height()); - println!("missing block heights: {:?}", missing_block_heights); - - // Here, we actually fetch the missing blocks and create a `local_chain::Update`. - let chain_changeset = { - let chain_update = client - .update_local_chain(tip, missing_block_heights) - .context("scanning for blocks")?; - println!("new tip: {}", chain_update.tip.height()); - chain.lock().unwrap().apply_update(chain_update)? - }; - // We persist the changes let mut db = db.lock().unwrap(); - db.stage((chain_changeset, indexed_tx_graph_changeset)); + db.stage((local_chain_changeset, indexed_tx_graph_changeset)); db.commit()?; Ok(()) } diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index 690cd87e2..50a8659e4 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -53,15 +53,13 @@ async fn main() -> Result<(), anyhow::Error> { (k, k_spks) }) .collect(); - let (update_graph, last_active_indices) = client - .full_scan(keychain_spks, STOP_GAP, PARALLEL_REQUESTS) + let update = client + .full_scan(prev_tip, keychain_spks, STOP_GAP, PARALLEL_REQUESTS) .await?; - let missing_heights = update_graph.missing_heights(wallet.local_chain()); - let chain_update = client.update_local_chain(prev_tip, missing_heights).await?; let update = Update { - last_active_indices, - graph: update_graph, - chain: Some(chain_update), + last_active_indices: update.last_active_indices, + graph: update.tx_graph, + chain: Some(update.local_chain), }; wallet.apply_update(update)?; wallet.commit()?; diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 73bfdd559..026ce7345 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -36,7 +36,6 @@ fn main() -> Result<(), anyhow::Error> { let client = esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking()?; - let prev_tip = wallet.latest_checkpoint(); let keychain_spks = wallet .all_unbounded_spk_iters() .into_iter() @@ -53,17 +52,18 @@ fn main() -> Result<(), anyhow::Error> { }) .collect(); - let (update_graph, last_active_indices) = - client.full_scan(keychain_spks, STOP_GAP, PARALLEL_REQUESTS)?; - let missing_heights = update_graph.missing_heights(wallet.local_chain()); - let chain_update = client.update_local_chain(prev_tip, missing_heights)?; - let update = Update { - last_active_indices, - graph: update_graph, - chain: Some(chain_update), - }; - - wallet.apply_update(update)?; + let update = client.full_scan( + wallet.latest_checkpoint(), + keychain_spks, + STOP_GAP, + PARALLEL_REQUESTS, + )?; + + wallet.apply_update(Update { + last_active_indices: update.last_active_indices, + graph: update.tx_graph, + chain: Some(update.local_chain), + })?; wallet.commit()?; println!(); From 88c7239edf3c1dea9d7fa81891e887c79bbaba22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Tue, 26 Mar 2024 15:11:43 +0800 Subject: [PATCH 5/8] chore(chain)!: rm `missing_heights` and `missing_heights_from` methods These methods are no longer needed as we can determine missing heights directly from the `CheckPoint` tip. --- crates/chain/src/tx_graph.rs | 87 +----------------- crates/chain/tests/test_tx_graph.rs | 133 ---------------------------- 2 files changed, 2 insertions(+), 218 deletions(-) diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index f80a20713..4a7538cab 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -89,8 +89,8 @@ //! [`insert_txout`]: TxGraph::insert_txout use crate::{ - collections::*, keychain::Balance, local_chain::LocalChain, Anchor, Append, BlockId, - ChainOracle, ChainPosition, FullTxOut, + collections::*, keychain::Balance, Anchor, Append, BlockId, ChainOracle, ChainPosition, + FullTxOut, }; use alloc::collections::vec_deque::VecDeque; use alloc::sync::Arc; @@ -696,69 +696,6 @@ impl TxGraph { } impl TxGraph { - /// Find missing block heights of `chain`. - /// - /// This works by scanning through anchors, and seeing whether the anchor block of the anchor - /// exists in the [`LocalChain`]. The returned iterator does not output duplicate heights. - pub fn missing_heights<'a>(&'a self, chain: &'a LocalChain) -> impl Iterator + 'a { - // Map of txids to skip. - // - // Usually, if a height of a tx anchor is missing from the chain, we would want to return - // this height in the iterator. The exception is when the tx is confirmed in chain. All the - // other missing-height anchors of this tx can be skipped. - // - // * Some(true) => skip all anchors of this txid - // * Some(false) => do not skip anchors of this txid - // * None => we do not know whether we can skip this txid - let mut txids_to_skip = HashMap::::new(); - - // Keeps track of the last height emitted so we don't double up. - let mut last_height_emitted = Option::::None; - - self.anchors - .iter() - .filter(move |(_, txid)| { - let skip = *txids_to_skip.entry(*txid).or_insert_with(|| { - let tx_anchors = match self.txs.get(txid) { - Some((_, anchors, _)) => anchors, - None => return true, - }; - let mut has_missing_height = false; - for anchor_block in tx_anchors.iter().map(Anchor::anchor_block) { - match chain.query(anchor_block.height) { - None => { - has_missing_height = true; - continue; - } - Some(chain_cp) => { - if chain_cp.hash() == anchor_block.hash { - return true; - } - } - } - } - !has_missing_height - }); - #[cfg(feature = "std")] - debug_assert!({ - println!("txid={} skip={}", txid, skip); - true - }); - !skip - }) - .filter_map(move |(a, _)| { - let anchor_block = a.anchor_block(); - if Some(anchor_block.height) != last_height_emitted - && chain.query(anchor_block.height).is_none() - { - last_height_emitted = Some(anchor_block.height); - Some(anchor_block.height) - } else { - None - } - }) - } - /// Get the position of the transaction in `chain` with tip `chain_tip`. /// /// Chain data is fetched from `chain`, a [`ChainOracle`] implementation. @@ -1267,8 +1204,6 @@ impl ChangeSet { /// /// This is useful if you want to find which heights you need to fetch data about in order to /// confirm or exclude these anchors. - /// - /// See also: [`TxGraph::missing_heights`] pub fn anchor_heights(&self) -> impl Iterator + '_ where A: Anchor, @@ -1283,24 +1218,6 @@ impl ChangeSet { !duplicate }) } - - /// Returns an iterator for the [`anchor_heights`] in this changeset that are not included in - /// `local_chain`. This tells you which heights you need to include in `local_chain` in order - /// for it to conclusively act as a [`ChainOracle`] for the transaction anchors this changeset - /// will add. - /// - /// [`ChainOracle`]: crate::ChainOracle - /// [`anchor_heights`]: Self::anchor_heights - pub fn missing_heights_from<'a>( - &'a self, - local_chain: &'a LocalChain, - ) -> impl Iterator + 'a - where - A: Anchor, - { - self.anchor_heights() - .filter(move |&height| local_chain.query(height).is_none()) - } } impl Append for ChangeSet { diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index 8b4674485..11ac8032a 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -1058,139 +1058,6 @@ fn test_changeset_last_seen_append() { } } -#[test] -fn test_missing_blocks() { - /// An anchor implementation for testing, made up of `(the_anchor_block, random_data)`. - #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, core::hash::Hash)] - struct TestAnchor(BlockId); - - impl Anchor for TestAnchor { - fn anchor_block(&self) -> BlockId { - self.0 - } - } - - struct Scenario<'a> { - name: &'a str, - graph: TxGraph, - chain: LocalChain, - exp_heights: &'a [u32], - } - - const fn new_anchor(height: u32, hash: BlockHash) -> TestAnchor { - TestAnchor(BlockId { height, hash }) - } - - fn new_scenario<'a>( - name: &'a str, - graph_anchors: &'a [(Txid, TestAnchor)], - chain: &'a [(u32, BlockHash)], - exp_heights: &'a [u32], - ) -> Scenario<'a> { - Scenario { - name, - graph: { - let mut g = TxGraph::default(); - for (txid, anchor) in graph_anchors { - let _ = g.insert_anchor(*txid, anchor.clone()); - } - g - }, - chain: { - let (mut c, _) = LocalChain::from_genesis_hash(h!("genesis")); - for (height, hash) in chain { - let _ = c.insert_block(BlockId { - height: *height, - hash: *hash, - }); - } - c - }, - exp_heights, - } - } - - fn run(scenarios: &[Scenario]) { - for scenario in scenarios { - let Scenario { - name, - graph, - chain, - exp_heights, - } = scenario; - - let heights = graph.missing_heights(chain).collect::>(); - assert_eq!(&heights, exp_heights, "scenario: {}", name); - } - } - - run(&[ - new_scenario( - "2 txs with the same anchor (2:B) which is missing from chain", - &[ - (h!("tx_1"), new_anchor(2, h!("B"))), - (h!("tx_2"), new_anchor(2, h!("B"))), - ], - &[(1, h!("A")), (3, h!("C"))], - &[2], - ), - new_scenario( - "2 txs with different anchors at the same height, one of the anchors is missing", - &[ - (h!("tx_1"), new_anchor(2, h!("B1"))), - (h!("tx_2"), new_anchor(2, h!("B2"))), - ], - &[(1, h!("A")), (2, h!("B1"))], - &[], - ), - new_scenario( - "tx with 2 anchors of same height which are missing from the chain", - &[ - (h!("tx"), new_anchor(3, h!("C1"))), - (h!("tx"), new_anchor(3, h!("C2"))), - ], - &[(1, h!("A")), (4, h!("D"))], - &[3], - ), - new_scenario( - "tx with 2 anchors at the same height, chain has this height but does not match either anchor", - &[ - (h!("tx"), new_anchor(4, h!("D1"))), - (h!("tx"), new_anchor(4, h!("D2"))), - ], - &[(4, h!("D3")), (5, h!("E"))], - &[], - ), - new_scenario( - "tx with 2 anchors at different heights, one anchor exists in chain, should return nothing", - &[ - (h!("tx"), new_anchor(3, h!("C"))), - (h!("tx"), new_anchor(4, h!("D"))), - ], - &[(4, h!("D")), (5, h!("E"))], - &[], - ), - new_scenario( - "tx with 2 anchors at different heights, first height is already in chain with different hash, iterator should only return 2nd height", - &[ - (h!("tx"), new_anchor(5, h!("E1"))), - (h!("tx"), new_anchor(6, h!("F1"))), - ], - &[(4, h!("D")), (5, h!("E")), (7, h!("G"))], - &[6], - ), - new_scenario( - "tx with 2 anchors at different heights, neither height is in chain, both heights should be returned", - &[ - (h!("tx"), new_anchor(3, h!("C"))), - (h!("tx"), new_anchor(4, h!("D"))), - ], - &[(1, h!("A")), (2, h!("B"))], - &[3, 4], - ), - ]); -} - #[test] /// The `map_anchors` allow a caller to pass a function to reconstruct the [`TxGraph`] with any [`Anchor`], /// even though the function is non-deterministic. From 661385ea89205434a1ad431a76ff0de13c9a995a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Tue, 26 Mar 2024 20:12:51 +0800 Subject: [PATCH 6/8] feat(testenv): add `genesis_hash` method This gets the genesis hash of the env blockchain. --- crates/testenv/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/testenv/src/lib.rs b/crates/testenv/src/lib.rs index 1c6f2de92..030878f46 100644 --- a/crates/testenv/src/lib.rs +++ b/crates/testenv/src/lib.rs @@ -250,6 +250,12 @@ impl TestEnv { })) .expect("must craft tip") } + + /// Get the genesis hash of the blockchain. + pub fn genesis_hash(&self) -> anyhow::Result { + let hash = self.bitcoind.client.get_block_hash(0)?; + Ok(hash) + } } #[cfg(test)] From cf25c0372ab192690e23fb1df639ba98d340610e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Tue, 26 Mar 2024 20:29:20 +0800 Subject: [PATCH 7/8] test(esplora): add `test_finalize_chain_update` We ensure that calling `finalize_chain_update` does not result in a chain which removed previous heights and all anchor heights are included. --- crates/esplora/tests/async_ext.rs | 172 +++++++++++++++++++++++++++ crates/esplora/tests/blocking_ext.rs | 168 ++++++++++++++++++++++++++ 2 files changed, 340 insertions(+) diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index 5946bb4d8..e053ba72b 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -1,3 +1,6 @@ +use bdk_chain::bitcoin::hashes::Hash; +use bdk_chain::local_chain::LocalChain; +use bdk_chain::BlockId; use bdk_esplora::EsploraAsyncExt; use electrsd::bitcoind::anyhow; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; @@ -10,6 +13,175 @@ use std::time::Duration; use bdk_chain::bitcoin::{Address, Amount, Txid}; use bdk_testenv::TestEnv; +macro_rules! h { + ($index:literal) => {{ + bdk_chain::bitcoin::hashes::Hash::hash($index.as_bytes()) + }}; +} + +/// Ensure that update does not remove heights (from original), and all anchor heights are included. +#[tokio::test] +pub async fn test_finalize_chain_update() -> anyhow::Result<()> { + struct TestCase<'a> { + name: &'a str, + /// Initial blockchain height to start the env with. + initial_env_height: u32, + /// Initial checkpoint heights to start with. + initial_cps: &'a [u32], + /// The final blockchain height of the env. + final_env_height: u32, + /// The anchors to test with: `(height, txid)`. Only the height is provided as we can fetch + /// the blockhash from the env. + anchors: &'a [(u32, Txid)], + } + + let test_cases = [ + TestCase { + name: "chain_extends", + initial_env_height: 60, + initial_cps: &[59, 60], + final_env_height: 90, + anchors: &[], + }, + TestCase { + name: "introduce_older_heights", + initial_env_height: 50, + initial_cps: &[10, 15], + final_env_height: 50, + anchors: &[(11, h!("A")), (14, h!("B"))], + }, + TestCase { + name: "introduce_older_heights_after_chain_extends", + initial_env_height: 50, + initial_cps: &[10, 15], + final_env_height: 100, + anchors: &[(11, h!("A")), (14, h!("B"))], + }, + ]; + + for (i, t) in test_cases.into_iter().enumerate() { + println!("[{}] running test case: {}", i, t.name); + + let env = TestEnv::new()?; + let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_async()?; + + // set env to `initial_env_height` + if let Some(to_mine) = t + .initial_env_height + .checked_sub(env.make_checkpoint_tip().height()) + { + env.mine_blocks(to_mine as _, None)?; + } + while client.get_height().await? < t.initial_env_height { + std::thread::sleep(Duration::from_millis(10)); + } + + // craft initial `local_chain` + let local_chain = { + let (mut chain, _) = LocalChain::from_genesis_hash(env.genesis_hash()?); + let chain_tip = chain.tip(); + let update_blocks = bdk_esplora::init_chain_update(&client, &chain_tip).await?; + let update_anchors = t + .initial_cps + .iter() + .map(|&height| -> anyhow::Result<_> { + Ok(( + BlockId { + height, + hash: env.bitcoind.client.get_block_hash(height as _)?, + }, + Txid::all_zeros(), + )) + }) + .collect::>>()?; + let chain_update = bdk_esplora::finalize_chain_update( + &client, + &chain_tip, + &update_anchors, + update_blocks, + ) + .await?; + chain.apply_update(chain_update)?; + chain + }; + println!("local chain height: {}", local_chain.tip().height()); + + // extend env chain + if let Some(to_mine) = t + .final_env_height + .checked_sub(env.make_checkpoint_tip().height()) + { + env.mine_blocks(to_mine as _, None)?; + } + while client.get_height().await? < t.final_env_height { + std::thread::sleep(Duration::from_millis(10)); + } + + // craft update + let update = { + let local_tip = local_chain.tip(); + let update_blocks = bdk_esplora::init_chain_update(&client, &local_tip).await?; + let update_anchors = t + .anchors + .iter() + .map(|&(height, txid)| -> anyhow::Result<_> { + Ok(( + BlockId { + height, + hash: env.bitcoind.client.get_block_hash(height as _)?, + }, + txid, + )) + }) + .collect::>()?; + bdk_esplora::finalize_chain_update(&client, &local_tip, &update_anchors, update_blocks) + .await? + }; + + // apply update + let mut updated_local_chain = local_chain.clone(); + updated_local_chain.apply_update(update)?; + println!( + "updated local chain height: {}", + updated_local_chain.tip().height() + ); + + assert!( + { + let initial_heights = local_chain + .iter_checkpoints() + .map(|cp| cp.height()) + .collect::>(); + let updated_heights = updated_local_chain + .iter_checkpoints() + .map(|cp| cp.height()) + .collect::>(); + updated_heights.is_superset(&initial_heights) + }, + "heights from the initial chain must all be in the updated chain", + ); + + assert!( + { + let exp_anchor_heights = t + .anchors + .iter() + .map(|(h, _)| *h) + .chain(t.initial_cps.iter().copied()) + .collect::>(); + let anchor_heights = updated_local_chain + .iter_checkpoints() + .map(|cp| cp.height()) + .collect::>(); + anchor_heights.is_superset(&exp_anchor_heights) + }, + "anchor heights must all be in updated chain", + ); + } + + Ok(()) +} #[tokio::test] pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let env = TestEnv::new()?; diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index ebbd8d000..2d54c9d43 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -1,3 +1,4 @@ +use bdk_chain::bitcoin::hashes::Hash; use bdk_chain::local_chain::LocalChain; use bdk_chain::BlockId; use bdk_esplora::EsploraExt; @@ -26,6 +27,173 @@ macro_rules! local_chain { }}; } +/// Ensure that update does not remove heights (from original), and all anchor heights are included. +#[test] +pub fn test_finalize_chain_update() -> anyhow::Result<()> { + struct TestCase<'a> { + name: &'a str, + /// Initial blockchain height to start the env with. + initial_env_height: u32, + /// Initial checkpoint heights to start with. + initial_cps: &'a [u32], + /// The final blockchain height of the env. + final_env_height: u32, + /// The anchors to test with: `(height, txid)`. Only the height is provided as we can fetch + /// the blockhash from the env. + anchors: &'a [(u32, Txid)], + } + + let test_cases = [ + TestCase { + name: "chain_extends", + initial_env_height: 60, + initial_cps: &[59, 60], + final_env_height: 90, + anchors: &[], + }, + TestCase { + name: "introduce_older_heights", + initial_env_height: 50, + initial_cps: &[10, 15], + final_env_height: 50, + anchors: &[(11, h!("A")), (14, h!("B"))], + }, + TestCase { + name: "introduce_older_heights_after_chain_extends", + initial_env_height: 50, + initial_cps: &[10, 15], + final_env_height: 100, + anchors: &[(11, h!("A")), (14, h!("B"))], + }, + ]; + + for (i, t) in test_cases.into_iter().enumerate() { + println!("[{}] running test case: {}", i, t.name); + + let env = TestEnv::new()?; + let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_blocking()?; + + // set env to `initial_env_height` + if let Some(to_mine) = t + .initial_env_height + .checked_sub(env.make_checkpoint_tip().height()) + { + env.mine_blocks(to_mine as _, None)?; + } + while client.get_height()? < t.initial_env_height { + std::thread::sleep(Duration::from_millis(10)); + } + + // craft initial `local_chain` + let local_chain = { + let (mut chain, _) = LocalChain::from_genesis_hash(env.genesis_hash()?); + let chain_tip = chain.tip(); + let update_blocks = bdk_esplora::init_chain_update_blocking(&client, &chain_tip)?; + let update_anchors = t + .initial_cps + .iter() + .map(|&height| -> anyhow::Result<_> { + Ok(( + BlockId { + height, + hash: env.bitcoind.client.get_block_hash(height as _)?, + }, + Txid::all_zeros(), + )) + }) + .collect::>>()?; + let chain_update = bdk_esplora::finalize_chain_update_blocking( + &client, + &chain_tip, + &update_anchors, + update_blocks, + )?; + chain.apply_update(chain_update)?; + chain + }; + println!("local chain height: {}", local_chain.tip().height()); + + // extend env chain + if let Some(to_mine) = t + .final_env_height + .checked_sub(env.make_checkpoint_tip().height()) + { + env.mine_blocks(to_mine as _, None)?; + } + while client.get_height()? < t.final_env_height { + std::thread::sleep(Duration::from_millis(10)); + } + + // craft update + let update = { + let local_tip = local_chain.tip(); + let update_blocks = bdk_esplora::init_chain_update_blocking(&client, &local_tip)?; + let update_anchors = t + .anchors + .iter() + .map(|&(height, txid)| -> anyhow::Result<_> { + Ok(( + BlockId { + height, + hash: env.bitcoind.client.get_block_hash(height as _)?, + }, + txid, + )) + }) + .collect::>()?; + bdk_esplora::finalize_chain_update_blocking( + &client, + &local_tip, + &update_anchors, + update_blocks, + )? + }; + + // apply update + let mut updated_local_chain = local_chain.clone(); + updated_local_chain.apply_update(update)?; + println!( + "updated local chain height: {}", + updated_local_chain.tip().height() + ); + + assert!( + { + let initial_heights = local_chain + .iter_checkpoints() + .map(|cp| cp.height()) + .collect::>(); + let updated_heights = updated_local_chain + .iter_checkpoints() + .map(|cp| cp.height()) + .collect::>(); + updated_heights.is_superset(&initial_heights) + }, + "heights from the initial chain must all be in the updated chain", + ); + + assert!( + { + let exp_anchor_heights = t + .anchors + .iter() + .map(|(h, _)| *h) + .chain(t.initial_cps.iter().copied()) + .collect::>(); + let anchor_heights = updated_local_chain + .iter_checkpoints() + .map(|cp| cp.height()) + .collect::>(); + anchor_heights.is_superset(&exp_anchor_heights) + }, + "anchor heights must all be in updated chain", + ); + } + + Ok(()) +} + #[test] pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let env = TestEnv::new()?; From f8aa68cc691c2fdaf82ea1d6666e1459895a8505 Mon Sep 17 00:00:00 2001 From: LLFourn Date: Tue, 2 Apr 2024 16:08:16 +1100 Subject: [PATCH 8/8] refactor(esplora): Simplify chain update logic I felt things didn't have to be so complicated. The chain could easily be updated in one function rather than spread across two. The logic needs at least one less loop. --- crates/chain/src/local_chain.rs | 38 +++++++ crates/esplora/src/blocking_ext.rs | 149 +++++++-------------------- crates/esplora/tests/blocking_ext.rs | 30 +----- 3 files changed, 83 insertions(+), 134 deletions(-) diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index aad281b1e..e123e20e8 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -173,6 +173,44 @@ impl CheckPoint { pub fn query_from(&self, height: u32) -> Option { self.iter().take_while(|cp| cp.height() >= height).last() } + + /// Inserts `block_id` at its height within the chain. + /// + /// The effect of `insert` depends on whether a height already exists. If it doesn't the + /// `block_id` we inserted and all pre-existing blocks higher than it will be re-inserted after + /// it. If the height already existed and has a conflicting block hash then it will be purged + /// along with all block followin it. The returned chain will have a tip of the `block_id` + /// passed in. Of course, if the `block_id` was already present then this just returns `self`. + #[must_use] + pub fn insert(self, block_id: BlockId) -> Self { + assert_ne!(block_id.height, 0, "cannot insert the genesis block"); + + let mut cp = self.clone(); + let mut tail = vec![]; + let base = loop { + if cp.height() == block_id.height { + if cp.hash() == block_id.hash { + return self; + } else { + // if we have a conflict we just return the inserted block because the tail is by + // implication invalid. + tail = vec![]; + break cp.prev().expect("can't be called on genesis block"); + } + } else if cp.height() < block_id.height { + break cp; + } else { + tail.push(cp.block_id()); + } + cp = cp.prev().expect("will break before genesis block"); + }; + + let tip = base + .extend(core::iter::once(block_id).chain(tail.into_iter().rev())) + .expect("tail is in order"); + + tip + } } /// Iterates over checkpoints backwards. diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 56267cca5..15b4c8c0f 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -2,11 +2,10 @@ use std::collections::BTreeSet; use std::thread::JoinHandle; use std::usize; -use bdk_chain::collections::btree_map; use bdk_chain::collections::BTreeMap; use bdk_chain::Anchor; use bdk_chain::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, + bitcoin::{OutPoint, ScriptBuf, TxOut, Txid}, local_chain::{self, CheckPoint}, BlockId, ConfirmationTimeHeightAnchor, TxGraph, }; @@ -89,19 +88,13 @@ impl EsploraExt for esplora_client::BlockingClient { stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let update_blocks = init_chain_update_blocking(self, &local_tip)?; let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking( self, keychain_spks, stop_gap, parallel_requests, )?; - let local_chain = finalize_chain_update_blocking( - self, - &local_tip, - tx_graph.all_anchors(), - update_blocks, - )?; + let local_chain = chain_update_blocking(self, &local_tip, tx_graph.all_anchors())?; Ok(FullScanUpdate { local_chain, tx_graph, @@ -117,7 +110,6 @@ impl EsploraExt for esplora_client::BlockingClient { outpoints: impl IntoIterator, parallel_requests: usize, ) -> Result { - let update_blocks = init_chain_update_blocking(self, &local_tip)?; let tx_graph = sync_for_index_and_graph_blocking( self, misc_spks, @@ -125,12 +117,7 @@ impl EsploraExt for esplora_client::BlockingClient { outpoints, parallel_requests, )?; - let local_chain = finalize_chain_update_blocking( - self, - &local_tip, - tx_graph.all_anchors(), - update_blocks, - )?; + let local_chain = chain_update_blocking(self, &local_tip, tx_graph.all_anchors())?; Ok(SyncUpdate { local_chain, tx_graph, @@ -138,113 +125,57 @@ impl EsploraExt for esplora_client::BlockingClient { } } -/// Create the initial chain update. -/// -/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the -/// update can connect to the `start_tip`. -/// -/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and -/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for -/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use -/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when -/// alternating between chain-sources. +/// Updates the chain making sure to include heights for the anchors #[doc(hidden)] -pub fn init_chain_update_blocking( +pub fn chain_update_blocking( client: &esplora_client::BlockingClient, local_tip: &CheckPoint, -) -> Result, Error> { - // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are - // consistent. - let mut fetched_blocks = client - .get_blocks(None)? - .into_iter() - .map(|b| (b.time.height, b.id)) - .collect::>(); - let new_tip_height = fetched_blocks - .keys() - .last() - .copied() - .expect("must atleast have one block"); - - // Ensure `fetched_blocks` can create an update that connects with the original chain by - // finding a "Point of Agreement". - for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { - if height > new_tip_height { - continue; - } - - let fetched_hash = match fetched_blocks.entry(height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height)?), - }; + anchors: &BTreeSet<(A, Txid)>, +) -> Result { + let mut point_of_agreement = None; + let mut conflicts = vec![]; + for local_cp in local_tip.iter() { + let remote_hash = client.get_block_hash(local_cp.height())?; - // We have found point of agreement so the update will connect! - if fetched_hash == local_hash { + if remote_hash == local_cp.hash() { + point_of_agreement = Some(local_cp.clone()); break; + } else { + // it is not strictly necessary to include all the conflicted heights (we do need the + // first one) but it seems prudent to make sure the updated chain's heights are a + // superset of the existing chain after update. + conflicts.push(BlockId { + height: local_cp.height(), + hash: remote_hash, + }); } } - Ok(fetched_blocks) -} + let mut tip = point_of_agreement.expect("remote esplora should have same genesis block"); -/// Fetches missing checkpoints and finalizes the [`local_chain::Update`]. -/// -/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an -/// existing checkpoint/block under `local_tip` or `update_blocks`. -#[doc(hidden)] -pub fn finalize_chain_update_blocking( - client: &esplora_client::BlockingClient, - local_tip: &CheckPoint, - anchors: &BTreeSet<(A, Txid)>, - mut update_blocks: BTreeMap, -) -> Result { - let update_tip_height = update_blocks - .keys() - .last() - .copied() - .expect("must atleast have one block"); - - // We want to have a corresponding checkpoint per height. We iterate the heights of anchors - // backwards, comparing it against our `local_tip`'s chain and our current set of - // `update_blocks` to see if a corresponding checkpoint already exists. - let anchor_heights = anchors - .iter() - .rev() - .map(|(a, _)| a.anchor_block().height) - // filter out heights that surpass the update tip - .filter(|h| *h <= update_tip_height) - // filter out duplicate heights - .filter({ - let mut prev_height = Option::::None; - move |h| match prev_height.replace(*h) { - None => true, - Some(prev_h) => prev_h != *h, - } - }); + tip = tip + .extend(conflicts.into_iter().rev()) + .expect("evicted are in order"); - // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of - // checkpoints more efficient. - let mut curr_cp = local_tip.clone(); - - for h in anchor_heights { - if let Some(cp) = curr_cp.query_from(h) { - curr_cp = cp.clone(); - if cp.height() == h { - continue; - } - } - if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) { - entry.insert(client.get_block_hash(h)?); + for anchor in anchors { + let height = anchor.0.anchor_block().height; + if tip.query(height).is_none() { + let hash = client.get_block_hash(height)?; + tip = tip.insert(BlockId { height, hash }); } } + // insert the most recent blocks at the tip to make sure we update the tip and make the update + // robust. + for block in client.get_blocks(None)? { + tip = tip.insert(BlockId { + height: block.time.height, + hash: block.id, + }); + } + Ok(local_chain::Update { - tip: CheckPoint::from_block_ids( - update_blocks - .into_iter() - .map(|(height, hash)| BlockId { height, hash }), - ) - .expect("must be in order"), + tip, introduce_older_blocks: true, }) } diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 2d54c9d43..d8658ed7b 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -89,7 +89,6 @@ pub fn test_finalize_chain_update() -> anyhow::Result<()> { let local_chain = { let (mut chain, _) = LocalChain::from_genesis_hash(env.genesis_hash()?); let chain_tip = chain.tip(); - let update_blocks = bdk_esplora::init_chain_update_blocking(&client, &chain_tip)?; let update_anchors = t .initial_cps .iter() @@ -103,12 +102,8 @@ pub fn test_finalize_chain_update() -> anyhow::Result<()> { )) }) .collect::>>()?; - let chain_update = bdk_esplora::finalize_chain_update_blocking( - &client, - &chain_tip, - &update_anchors, - update_blocks, - )?; + let chain_update = + bdk_esplora::chain_update_blocking(&client, &chain_tip, &update_anchors)?; chain.apply_update(chain_update)?; chain }; @@ -128,7 +123,6 @@ pub fn test_finalize_chain_update() -> anyhow::Result<()> { // craft update let update = { let local_tip = local_chain.tip(); - let update_blocks = bdk_esplora::init_chain_update_blocking(&client, &local_tip)?; let update_anchors = t .anchors .iter() @@ -142,12 +136,7 @@ pub fn test_finalize_chain_update() -> anyhow::Result<()> { )) }) .collect::>()?; - bdk_esplora::finalize_chain_update_blocking( - &client, - &local_tip, - &update_anchors, - update_blocks, - )? + bdk_esplora::chain_update_blocking(&client, &local_tip, &update_anchors)? }; // apply update @@ -523,11 +512,6 @@ fn update_local_chain() -> anyhow::Result<()> { let mut chain = t.chain; let cp_tip = chain.tip(); - let new_blocks = - bdk_esplora::init_chain_update_blocking(&client, &cp_tip).map_err(|err| { - anyhow::format_err!("[{}:{}] `init_chain_update` failed: {}", i, t.name, err) - })?; - let mock_anchors = t .request_heights .iter() @@ -546,12 +530,7 @@ fn update_local_chain() -> anyhow::Result<()> { }) .collect::>(); - let chain_update = bdk_esplora::finalize_chain_update_blocking( - &client, - &cp_tip, - &mock_anchors, - new_blocks, - )?; + let chain_update = bdk_esplora::chain_update_blocking(&client, &cp_tip, &mock_anchors)?; let update_blocks = chain_update .tip .iter() @@ -574,6 +553,7 @@ fn update_local_chain() -> anyhow::Result<()> { ) .collect::>(); + dbg!(&update_blocks, &exp_update_blocks); assert!( update_blocks.is_superset(&exp_update_blocks), "[{}:{}] unexpected update",