diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 4d6e0dfa83..55a5071b6e 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use bdk_chain::collections::btree_map; +use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, collections::BTreeMap, @@ -45,22 +46,20 @@ pub trait EsploraAsyncExt { ) -> Result; /// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and - /// returns a [`TxGraph`] and a map of last active indices. - /// - /// * `keychain_spks`: keychains that we want to scan transactions for + /// returns a [`TxGraph`] and a map of keychain last active indices. /// /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in /// parallel. async fn full_scan( &self, - keychain_spks: BTreeMap< + request: FullScanRequest< K, impl IntoIterator + Send> + Send, >, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error>; + ) -> Result, Error>; /// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// specified and return a [`TxGraph`]. @@ -76,11 +75,9 @@ pub trait EsploraAsyncExt { /// [`full_scan`]: EsploraAsyncExt::full_scan async fn sync( &self, - misc_spks: impl IntoIterator + Send> + Send, - txids: impl IntoIterator + Send> + Send, - outpoints: impl IntoIterator + Send> + Send, + request: SyncRequest, parallel_requests: usize, - ) -> Result, Error>; + ) -> Result; } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -151,23 +148,22 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { async fn full_scan( &self, - keychain_spks: BTreeMap< + mut request: FullScanRequest< K, impl IntoIterator + Send> + Send, >, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error> { + ) -> Result, Error> { type TxsOfSpkIndex = (u32, Vec); let parallel_requests = Ord::max(parallel_requests, 1); - let mut graph = TxGraph::::default(); - let mut last_active_indexes = BTreeMap::::new(); + let mut graph_update = TxGraph::::default(); + let mut last_active_indices = BTreeMap::::new(); - for (keychain, spks) in keychain_spks { + for (keychain, spks) in request.take_spks_by_keychain() { let mut spks = spks.into_iter(); let mut last_index = Option::::None; let mut last_active_index = Option::::None; - loop { let handles = spks .by_ref() @@ -200,9 +196,9 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { last_active_index = Some(index); } for tx in txs { - let _ = graph.insert_tx(tx.to_tx()); + let _ = graph_update.insert_tx(tx.to_tx()); if let Some(anchor) = anchor_from_status(&tx.status) { - let _ = graph.insert_anchor(tx.txid, anchor); + let _ = graph_update.insert_anchor(tx.txid, anchor); } let previous_outputs = tx.vin.iter().filter_map(|vin| { @@ -220,7 +216,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { }); for (outpoint, txout) in previous_outputs { - let _ = graph.insert_txout(outpoint, txout); + let _ = graph_update.insert_txout(outpoint, txout); } } } @@ -237,42 +233,51 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } if let Some(last_active_index) = last_active_index { - last_active_indexes.insert(keychain, last_active_index); + last_active_indices.insert(keychain, last_active_index); } } - Ok((graph, last_active_indexes)) + // new tx graph transactions determine possible missing blockchain heights + let missing_heights = graph_update.anchor_heights(request.chain_tip.height()); + // get blockchain update from original request checkpoint and missing heights + let chain_update = self + .update_local_chain(request.chain_tip.clone(), missing_heights) + .await?; + + Ok(FullScanResult { + graph_update, + chain_update, + last_active_indices, + }) } async fn sync( &self, - misc_spks: impl IntoIterator + Send> + Send, - txids: impl IntoIterator + Send> + Send, - outpoints: impl IntoIterator + Send> + Send, + mut request: SyncRequest, parallel_requests: usize, - ) -> Result, Error> { - let mut graph = self - .full_scan( - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), - usize::MAX, - parallel_requests, - ) + ) -> Result { + let mut full_scan_request = FullScanRequest::new(request.chain_tip.clone()); + let spks = [( + 0, + request + .take_spks() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + )] + .into(); + full_scan_request.add_spks_by_keychain(spks); + + let mut graph_update = self + .full_scan(full_scan_request, usize::MAX, parallel_requests) .await - .map(|(g, _)| g)?; + .map(|result| result.graph_update)?; - let mut txids = txids.into_iter(); + let mut txids = request.take_txids(); loop { let handles = txids .by_ref() .take(parallel_requests) - .filter(|&txid| graph.get_tx(txid).is_none()) + .filter(|&txid| graph_update.get_tx(txid).is_none()) .map(|txid| { let client = self.clone(); async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) } @@ -285,36 +290,52 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { for (txid, status) in handles.try_collect::>().await? { if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + let _ = graph_update.insert_anchor(txid, anchor); } } } - for op in outpoints.into_iter() { - if graph.get_tx(op.txid).is_none() { + for op in request.take_outpoints() { + if graph_update.get_tx(op.txid).is_none() { if let Some(tx) = self.get_tx(&op.txid).await? { - let _ = graph.insert_tx(tx); + let _ = graph_update.insert_tx(tx); } let status = self.get_tx_status(&op.txid).await?; if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(op.txid, anchor); + let _ = graph_update.insert_anchor(op.txid, anchor); } } if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? { if let Some(txid) = op_status.txid { - if graph.get_tx(txid).is_none() { + if graph_update.get_tx(txid).is_none() { if let Some(tx) = self.get_tx(&txid).await? { - let _ = graph.insert_tx(tx); + let _ = graph_update.insert_tx(tx); } let status = self.get_tx_status(&txid).await?; if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + let _ = graph_update.insert_anchor(txid, anchor); } } } } } - Ok(graph) + + // Now that we're done updating the `IndexedTxGraph`, it's time to update the `LocalChain`! We + // want the `LocalChain` to have data about all the anchors in the `TxGraph` - for this reason, + // we want to retrieve the blocks at the heights of the newly added anchors that are missing from + // our view of the chain. + + // new tx graph transactions determine possible missing blockchain heights + let missing_heights = graph_update.anchor_heights(request.chain_tip.height()); + // get blockchain update from original request checkpoint and missing heights + let chain_update = self + .update_local_chain(request.chain_tip.clone(), missing_heights) + .await?; + + Ok(SyncResult { + graph_update, + chain_update, + }) } } diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 993e33ac0b..a0263f4e83 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -2,6 +2,7 @@ use std::thread::JoinHandle; use bdk_chain::collections::btree_map; use bdk_chain::collections::BTreeMap; +use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, local_chain::{self, CheckPoint}, @@ -50,12 +51,15 @@ pub trait EsploraExt { /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in /// parallel. - fn full_scan( + fn full_scan( &self, - keychain_spks: BTreeMap>, + request: FullScanRequest< + K, + impl IntoIterator + Send> + Send, + >, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error>; + ) -> Result, Error>; /// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// specified and return a [`TxGraph`]. @@ -69,13 +73,7 @@ pub trait EsploraExt { /// may include scripts that have been used, use [`full_scan`] with the keychain. /// /// [`full_scan`]: EsploraExt::full_scan - fn sync( - &self, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - parallel_requests: usize, - ) -> Result, Error>; + fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result; } impl EsploraExt for esplora_client::BlockingClient { @@ -139,22 +137,24 @@ impl EsploraExt for esplora_client::BlockingClient { }) } - fn full_scan( + fn full_scan( &self, - keychain_spks: BTreeMap>, + mut request: FullScanRequest< + K, + impl IntoIterator + Send> + Send, + >, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error> { + ) -> Result, Error> { type TxsOfSpkIndex = (u32, Vec); let parallel_requests = Ord::max(parallel_requests, 1); - let mut graph = TxGraph::::default(); - let mut last_active_indexes = BTreeMap::::new(); + let mut graph_update = TxGraph::::default(); + let mut last_active_indices = BTreeMap::::new(); - for (keychain, spks) in keychain_spks { + for (keychain, spks) in request.take_spks_by_keychain() { let mut spks = spks.into_iter(); let mut last_index = Option::::None; let mut last_active_index = Option::::None; - loop { let handles = spks .by_ref() @@ -190,9 +190,9 @@ impl EsploraExt for esplora_client::BlockingClient { last_active_index = Some(index); } for tx in txs { - let _ = graph.insert_tx(tx.to_tx()); + let _ = graph_update.insert_tx(tx.to_tx()); if let Some(anchor) = anchor_from_status(&tx.status) { - let _ = graph.insert_anchor(tx.txid, anchor); + let _ = graph_update.insert_anchor(tx.txid, anchor); } let previous_outputs = tx.vin.iter().filter_map(|vin| { @@ -210,7 +210,7 @@ impl EsploraExt for esplora_client::BlockingClient { }); for (outpoint, txout) in previous_outputs { - let _ = graph.insert_txout(outpoint, txout); + let _ = graph_update.insert_txout(outpoint, txout); } } } @@ -227,41 +227,53 @@ impl EsploraExt for esplora_client::BlockingClient { } if let Some(last_active_index) = last_active_index { - last_active_indexes.insert(keychain, last_active_index); + last_active_indices.insert(keychain, last_active_index); } } - Ok((graph, last_active_indexes)) + // Now that we're done updating the `IndexedTxGraph`, it's time to update the `LocalChain`! We + // want the `LocalChain` to have data about all the anchors in the `TxGraph` - for this reason, + // we want to retrieve the blocks at the heights of the newly added anchors that are missing from + // our view of the chain. + + // new tx graph transactions determine possible missing blockchain heights + let missing_heights = graph_update.anchor_heights(request.chain_tip.height()); // chain anchor heights + // get blockchain update from original request checkpoint and possible missing heights + let chain_update = self.update_local_chain(request.chain_tip, missing_heights)?; + + Ok(FullScanResult { + graph_update, + chain_update, + last_active_indices, + }) } fn sync( &self, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, + mut request: SyncRequest, parallel_requests: usize, - ) -> Result, Error> { - let mut graph = self - .full_scan( - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), - usize::MAX, - parallel_requests, - ) - .map(|(g, _)| g)?; - - let mut txids = txids.into_iter(); + ) -> Result { + let mut full_scan_request = FullScanRequest::new(request.chain_tip.clone()); + let spks = [( + 0, + request + .take_spks() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + )] + .into(); + full_scan_request.add_spks_by_keychain(spks); + + let mut graph_update = self + .full_scan(full_scan_request, usize::MAX, parallel_requests) + .map(|result| result.graph_update)?; + + let mut txids = request.take_txids(); loop { let handles = txids .by_ref() .take(parallel_requests) - .filter(|&txid| graph.get_tx(txid).is_none()) + .filter(|&txid| graph_update.get_tx(txid).is_none()) .map(|txid| { std::thread::spawn({ let client = self.clone(); @@ -282,36 +294,45 @@ impl EsploraExt for esplora_client::BlockingClient { for handle in handles { let (txid, status) = handle.join().expect("thread must not panic")?; if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + let _ = graph_update.insert_anchor(txid, anchor); } } } - for op in outpoints { - if graph.get_tx(op.txid).is_none() { + for op in request.take_outpoints() { + if graph_update.get_tx(op.txid).is_none() { if let Some(tx) = self.get_tx(&op.txid)? { - let _ = graph.insert_tx(tx); + let _ = graph_update.insert_tx(tx); } let status = self.get_tx_status(&op.txid)?; if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(op.txid, anchor); + let _ = graph_update.insert_anchor(op.txid, anchor); } } if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? { if let Some(txid) = op_status.txid { - if graph.get_tx(txid).is_none() { + if graph_update.get_tx(txid).is_none() { if let Some(tx) = self.get_tx(&txid)? { - let _ = graph.insert_tx(tx); + let _ = graph_update.insert_tx(tx); } let status = self.get_tx_status(&txid)?; if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + let _ = graph_update.insert_anchor(txid, anchor); } } } } } - Ok(graph) + + // new tx graph transactions determine possible missing blockchain heights + let missing_heights = graph_update.anchor_heights(request.chain_tip.height()); // chain anchor heights + // get blockchain update from original request checkpoint and possible missing heights + let chain_update = self.update_local_chain(request.chain_tip, missing_heights)?; + + Ok(SyncResult { + graph_update, + chain_update, + }) } } diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index baae1d11b0..5a0bd67883 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -1,3 +1,5 @@ +use bdk_chain::bitcoin::constants::genesis_block; +use bdk_chain::bitcoin::Network::Testnet; use bdk_esplora::EsploraAsyncExt; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use electrsd::bitcoind::{self, anyhow, BitcoinD}; @@ -9,6 +11,8 @@ use std::thread::sleep; use std::time::Duration; use bdk_chain::bitcoin::{Address, Amount, BlockHash, Txid}; +use bdk_chain::local_chain::LocalChain; +use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; struct TestEnv { bitcoind: BitcoinD, @@ -98,16 +102,15 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { while env.client.get_height().await.unwrap() < 102 { sleep(Duration::from_millis(10)) } + let genesis_hash = genesis_block(Testnet).block_hash(); + let (local_chain, _change_set) = LocalChain::from_genesis_hash(genesis_hash); + let mut request = SyncRequest::new(local_chain.tip()); + request.add_spks(misc_spks); - let graph_update = env - .client - .sync( - misc_spks.into_iter(), - vec![].into_iter(), - vec![].into_iter(), - 1, - ) - .await?; + let SyncResult { + graph_update, + chain_update: _, + } = env.client.sync(request, 1).await?; // Check to see if we have the floating txouts available from our two created transactions' // previous outputs in order to calculate transaction fees. @@ -188,12 +191,25 @@ pub async fn test_async_update_tx_graph_gap_limit() -> anyhow::Result<()> { // A scan with a gap limit of 2 won't find the transaction, but a scan with a gap limit of 3 // will. - let (graph_update, active_indices) = env.client.full_scan(keychains.clone(), 2, 1).await?; + let genesis_hash = genesis_block(Testnet).block_hash(); + let (local_chain, _change_set) = LocalChain::from_genesis_hash(genesis_hash); + let mut request = FullScanRequest::new(local_chain.tip()); + request.add_spks_by_keychain(keychains.clone()); + + let FullScanResult { + graph_update, + chain_update: _, + last_active_indices, + } = env.client.full_scan(request.clone(), 2, 1).await?; assert!(graph_update.full_txs().next().is_none()); - assert!(active_indices.is_empty()); - let (graph_update, active_indices) = env.client.full_scan(keychains.clone(), 3, 1).await?; + assert!(last_active_indices.is_empty()); + let FullScanResult { + graph_update, + chain_update: _, + last_active_indices, + } = env.client.full_scan(request.clone(), 3, 1).await?; assert_eq!(graph_update.full_txs().next().unwrap().txid, txid_4th_addr); - assert_eq!(active_indices[&0], 3); + assert_eq!(last_active_indices[&0], 3); // Now receive a coin on the last address. let txid_last_addr = env.bitcoind.client.send_to_address( @@ -213,16 +229,29 @@ pub async fn test_async_update_tx_graph_gap_limit() -> anyhow::Result<()> { // A scan with gap limit 4 won't find the second transaction, but a scan with gap limit 5 will. // The last active indice won't be updated in the first case but will in the second one. - let (graph_update, active_indices) = env.client.full_scan(keychains.clone(), 4, 1).await?; + let genesis_hash = genesis_block(Testnet).block_hash(); + let (local_chain, _change_set) = LocalChain::from_genesis_hash(genesis_hash); + let mut request = FullScanRequest::new(local_chain.tip()); + request.add_spks_by_keychain(keychains); + + let FullScanResult { + graph_update, + chain_update: _, + last_active_indices, + } = env.client.full_scan(request.clone(), 4, 1).await?; let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); assert_eq!(txs.len(), 1); assert!(txs.contains(&txid_4th_addr)); - assert_eq!(active_indices[&0], 3); - let (graph_update, active_indices) = env.client.full_scan(keychains, 5, 1).await?; + assert_eq!(last_active_indices[&0], 3); + let FullScanResult { + graph_update, + chain_update: _, + last_active_indices, + } = env.client.full_scan(request, 5, 1).await?; let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); assert_eq!(txs.len(), 2); assert!(txs.contains(&txid_4th_addr) && txs.contains(&txid_last_addr)); - assert_eq!(active_indices[&0], 9); + assert_eq!(last_active_indices[&0], 9); Ok(()) } diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 54c367e76c..4eae6f728c 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -1,3 +1,5 @@ +use bdk_chain::bitcoin::constants::genesis_block; +use bdk_chain::bitcoin::Network::Testnet; use bdk_chain::local_chain::LocalChain; use bdk_chain::BlockId; use bdk_esplora::EsploraExt; @@ -11,6 +13,7 @@ use std::thread::sleep; use std::time::Duration; use bdk_chain::bitcoin::{Address, Amount, BlockHash, Txid}; +use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; macro_rules! h { ($index:literal) => {{ @@ -129,12 +132,15 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } - let graph_update = env.client.sync( - misc_spks.into_iter(), - vec![].into_iter(), - vec![].into_iter(), - 1, - )?; + let genesis_hash = genesis_block(Testnet).block_hash(); + let (local_chain, _change_set) = LocalChain::from_genesis_hash(genesis_hash); + let mut request = SyncRequest::new(local_chain.tip()); + request.add_spks(misc_spks); + + let SyncResult { + graph_update, + chain_update: _, + } = env.client.sync(request, 1)?; // Check to see if we have the floating txouts available from our two created transactions' // previous outputs in order to calculate transaction fees. @@ -216,12 +222,25 @@ pub fn test_update_tx_graph_gap_limit() -> anyhow::Result<()> { // A scan with a gap limit of 2 won't find the transaction, but a scan with a gap limit of 3 // will. - let (graph_update, active_indices) = env.client.full_scan(keychains.clone(), 2, 1)?; + let genesis_hash = genesis_block(Testnet).block_hash(); + let (local_chain, _change_set) = LocalChain::from_genesis_hash(genesis_hash); + let mut request = FullScanRequest::new(local_chain.tip()); + request.add_spks_by_keychain(keychains.clone()); + + let FullScanResult { + graph_update, + chain_update: _, + last_active_indices, + } = env.client.full_scan(request.clone(), 2, 1)?; assert!(graph_update.full_txs().next().is_none()); - assert!(active_indices.is_empty()); - let (graph_update, active_indices) = env.client.full_scan(keychains.clone(), 3, 1)?; + assert!(last_active_indices.is_empty()); + let FullScanResult { + graph_update, + chain_update: _, + last_active_indices, + } = env.client.full_scan(request.clone(), 3, 1)?; assert_eq!(graph_update.full_txs().next().unwrap().txid, txid_4th_addr); - assert_eq!(active_indices[&0], 3); + assert_eq!(last_active_indices[&0], 3); // Now receive a coin on the last address. let txid_last_addr = env.bitcoind.client.send_to_address( @@ -241,16 +260,29 @@ pub fn test_update_tx_graph_gap_limit() -> anyhow::Result<()> { // A scan with gap limit 4 won't find the second transaction, but a scan with gap limit 5 will. // The last active indice won't be updated in the first case but will in the second one. - let (graph_update, active_indices) = env.client.full_scan(keychains.clone(), 4, 1)?; + let genesis_hash = genesis_block(Testnet).block_hash(); + let (local_chain, _change_set) = LocalChain::from_genesis_hash(genesis_hash); + let mut request = FullScanRequest::new(local_chain.tip()); + request.add_spks_by_keychain(keychains); + + let FullScanResult { + graph_update, + chain_update: _, + last_active_indices, + } = env.client.full_scan(request.clone(), 4, 1)?; let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); assert_eq!(txs.len(), 1); assert!(txs.contains(&txid_4th_addr)); - assert_eq!(active_indices[&0], 3); - let (graph_update, active_indices) = env.client.full_scan(keychains, 5, 1)?; + assert_eq!(last_active_indices[&0], 3); + let FullScanResult { + graph_update, + chain_update: _, + last_active_indices, + } = env.client.full_scan(request.clone(), 5, 1)?; let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); assert_eq!(txs.len(), 2); assert!(txs.contains(&txid_4th_addr) && txs.contains(&txid_last_addr)); - assert_eq!(active_indices[&0], 9); + assert_eq!(last_active_indices[&0], 9); Ok(()) }