From 576f3af6dc0700c7c9f737c4fb1059b4e45f1116 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Sat, 15 Jun 2024 14:38:17 +0800 Subject: [PATCH] refactor(electrum): implement merkle proofs WIP --- crates/electrum/src/bdk_electrum_client.rs | 412 +++++++------------- crates/electrum/tests/test_electrum.rs | 37 +- example-crates/example_electrum/src/main.rs | 13 +- example-crates/wallet_electrum/src/main.rs | 4 +- 4 files changed, 161 insertions(+), 305 deletions(-) diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index 80101f1673..5401d39f6a 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -1,14 +1,16 @@ use bdk_chain::{ - bitcoin::{OutPoint, ScriptBuf, Transaction, Txid}, - collections::{BTreeMap, HashMap, HashSet}, + bitcoin::{BlockHash, OutPoint, ScriptBuf, Transaction, Txid}, + collections::{BTreeMap, HashMap}, local_chain::CheckPoint, spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}, tx_graph::TxGraph, - BlockId, ConfirmationHeightAnchor, ConfirmationTimeHeightAnchor, + Anchor, BlockId, ConfirmationTimeHeightAnchor, }; -use core::str::FromStr; use electrum_client::{ElectrumApi, Error, HeaderNotification}; -use std::sync::{Arc, Mutex}; +use std::{ + collections::BTreeSet, + sync::{Arc, Mutex}, +}; /// We include a chain suffix of a certain length for the purpose of robustness. const CHAIN_SUFFIX_LENGTH: u32 = 8; @@ -88,87 +90,32 @@ impl BdkElectrumClient { stop_gap: usize, batch_size: usize, fetch_prev_txouts: bool, - ) -> Result, Error> { - let mut request_spks = request.spks_by_keychain; - - // We keep track of already-scanned spks just in case a reorg happens and we need to do a - // rescan. We need to keep track of this as iterators in `keychain_spks` are "unbounded" so - // cannot be collected. In addition, we keep track of whether an spk has an active tx - // history for determining the `last_active_index`. - // * key: (keychain, spk_index) that identifies the spk. - // * val: (script_pubkey, has_tx_history). - let mut scanned_spks = BTreeMap::<(K, u32), (ScriptBuf, bool)>::new(); - - let update = loop { - let (tip, _) = construct_update_tip(&self.inner, request.chain_tip.clone())?; - let mut graph_update = TxGraph::::default(); - let cps = tip - .iter() - .take(10) - .map(|cp| (cp.height(), cp)) - .collect::>>(); - - if !request_spks.is_empty() { - if !scanned_spks.is_empty() { - scanned_spks.append( - &mut self.populate_with_spks( - &cps, - &mut graph_update, - &mut scanned_spks - .iter() - .map(|(i, (spk, _))| (i.clone(), spk.clone())), - stop_gap, - batch_size, - )?, - ); - } - for (keychain, keychain_spks) in &mut request_spks { - scanned_spks.extend( - self.populate_with_spks( - &cps, - &mut graph_update, - keychain_spks, - stop_gap, - batch_size, - )? - .into_iter() - .map(|(spk_i, spk)| ((keychain.clone(), spk_i), spk)), - ); - } - } - - // check for reorgs during scan process - let server_blockhash = self.inner.block_header(tip.height() as usize)?.block_hash(); - if tip.hash() != server_blockhash { - continue; // reorg + ) -> Result, Error> { + let (tip, latest_blocks) = + fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?; + let mut graph_update = TxGraph::::default(); + let mut last_active_indices = BTreeMap::::new(); + + for (keychain, keychain_spks) in request.spks_by_keychain { + if let Some(last_active_index) = + self.populate_with_spks(&mut graph_update, keychain_spks, stop_gap, batch_size)? + { + last_active_indices.insert(keychain.clone(), last_active_index); } + } - // Fetch previous `TxOut`s for fee calculation if flag is enabled. - if fetch_prev_txouts { - self.fetch_prev_txout(&mut graph_update)?; - } + let chain_update = chain_update(tip, &latest_blocks, graph_update.all_anchors())?; - let chain_update = tip; - - let keychain_update = request_spks - .into_keys() - .filter_map(|k| { - scanned_spks - .range((k.clone(), u32::MIN)..=(k.clone(), u32::MAX)) - .rev() - .find(|(_, (_, active))| *active) - .map(|((_, i), _)| (k, *i)) - }) - .collect::>(); - - break FullScanResult { - graph_update, - chain_update, - last_active_indices: keychain_update, - }; - }; + // Fetch previous `TxOut`s for fee calculation if flag is enabled. + if fetch_prev_txouts { + self.fetch_prev_txout(&mut graph_update)?; + } - Ok(ElectrumFullScanResult(update)) + Ok(FullScanResult { + graph_update, + chain_update, + last_active_indices, + }) } /// Sync a set of scripts with the blockchain (via an Electrum client) for the data specified @@ -190,32 +137,31 @@ impl BdkElectrumClient { request: SyncRequest, batch_size: usize, fetch_prev_txouts: bool, - ) -> Result { + ) -> Result { let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone()) .set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk))); - let mut full_scan_res = self - .full_scan(full_scan_req, usize::MAX, batch_size, false)? - .with_confirmation_height_anchor(); + let mut full_scan_res = self.full_scan(full_scan_req, usize::MAX, batch_size, false)?; + let (tip, latest_blocks) = + fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?; - let (tip, _) = construct_update_tip(&self.inner, request.chain_tip)?; - let cps = tip - .iter() - .take(10) - .map(|cp| (cp.height(), cp)) - .collect::>>(); + self.populate_with_txids(&mut full_scan_res.graph_update, request.txids)?; + self.populate_with_outpoints(&mut full_scan_res.graph_update, request.outpoints)?; - self.populate_with_txids(&cps, &mut full_scan_res.graph_update, request.txids)?; - self.populate_with_outpoints(&cps, &mut full_scan_res.graph_update, request.outpoints)?; + let chain_update = chain_update( + tip, + &latest_blocks, + full_scan_res.graph_update.all_anchors(), + )?; // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { self.fetch_prev_txout(&mut full_scan_res.graph_update)?; } - Ok(ElectrumSyncResult(SyncResult { - chain_update: full_scan_res.chain_update, + Ok(SyncResult { + chain_update, graph_update: full_scan_res.graph_update, - })) + }) } /// Populate the `graph_update` with transactions/anchors associated with the given `spks`. @@ -227,70 +173,46 @@ impl BdkElectrumClient { /// Checkpoints (in `cps`) are used to create anchors. The `tx_cache` is self-explanatory. fn populate_with_spks( &self, - cps: &BTreeMap>, - graph_update: &mut TxGraph, - spks: &mut impl Iterator, + graph_update: &mut TxGraph, + mut spks: impl Iterator, stop_gap: usize, batch_size: usize, - ) -> Result, Error> { + ) -> Result, Error> { let mut unused_spk_count = 0_usize; - let mut scanned_spks = BTreeMap::new(); + let mut last_active_index = Option::::None; loop { let spks = (0..batch_size) .map_while(|_| spks.next()) .collect::>(); if spks.is_empty() { - return Ok(scanned_spks); + return Ok(last_active_index); } let spk_histories = self .inner .batch_script_get_history(spks.iter().map(|(_, s)| s.as_script()))?; - for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) { + for ((spk_index, _spk), spk_history) in spks.into_iter().zip(spk_histories) { if spk_history.is_empty() { - scanned_spks.insert(spk_index, (spk, false)); unused_spk_count += 1; if unused_spk_count > stop_gap { - return Ok(scanned_spks); + return Ok(last_active_index); } continue; } else { - scanned_spks.insert(spk_index, (spk, true)); + last_active_index = Some(spk_index); unused_spk_count = 0; } for tx_res in spk_history { let _ = graph_update.insert_tx(self.fetch_tx(tx_res.tx_hash)?); - if let Some(anchor) = determine_tx_anchor(cps, tx_res.height, tx_res.tx_hash) { - let _ = graph_update.insert_anchor(tx_res.tx_hash, anchor); - } + self.validate_merkle_for_anchor(graph_update, tx_res.tx_hash, tx_res.height)?; } } } } - // Helper function which fetches the `TxOut`s of our relevant transactions' previous transactions, - // which we do not have by default. This data is needed to calculate the transaction fee. - fn fetch_prev_txout( - &self, - graph_update: &mut TxGraph, - ) -> Result<(), Error> { - let full_txs: Vec> = - graph_update.full_txs().map(|tx_node| tx_node.tx).collect(); - for tx in full_txs { - for vin in &tx.input { - let outpoint = vin.previous_output; - let vout = outpoint.vout; - let prev_tx = self.fetch_tx(outpoint.txid)?; - let txout = prev_tx.output[vout as usize].clone(); - let _ = graph_update.insert_txout(outpoint, txout); - } - } - Ok(()) - } - /// Populate the `graph_update` with associated transactions/anchors of `outpoints`. /// /// Transactions in which the outpoint resides, and transactions that spend from the outpoint are @@ -299,8 +221,7 @@ impl BdkElectrumClient { /// Checkpoints (in `cps`) are used to create anchors. The `tx_cache` is self-explanatory. fn populate_with_outpoints( &self, - cps: &BTreeMap>, - graph_update: &mut TxGraph, + graph_update: &mut TxGraph, outpoints: impl IntoIterator, ) -> Result<(), Error> { for outpoint in outpoints { @@ -324,9 +245,7 @@ impl BdkElectrumClient { if !has_residing && res.tx_hash == op_txid { has_residing = true; let _ = graph_update.insert_tx(Arc::clone(&op_tx)); - if let Some(anchor) = determine_tx_anchor(cps, res.height, res.tx_hash) { - let _ = graph_update.insert_anchor(res.tx_hash, anchor); - } + self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?; } if !has_spending && res.tx_hash != op_txid { @@ -340,9 +259,7 @@ impl BdkElectrumClient { continue; } let _ = graph_update.insert_tx(Arc::clone(&res_tx)); - if let Some(anchor) = determine_tx_anchor(cps, res.height, res.tx_hash) { - let _ = graph_update.insert_anchor(res.tx_hash, anchor); - } + self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?; } } } @@ -352,8 +269,7 @@ impl BdkElectrumClient { /// Populate the `graph_update` with transactions/anchors of the provided `txids`. fn populate_with_txids( &self, - cps: &BTreeMap>, - graph_update: &mut TxGraph, + graph_update: &mut TxGraph, txids: impl IntoIterator, ) -> Result<(), Error> { for txid in txids { @@ -371,122 +287,90 @@ impl BdkElectrumClient { // because of restrictions of the Electrum API, we have to use the `script_get_history` // call to get confirmation status of our transaction - let anchor = match self + if let Some(r) = self .inner .script_get_history(spk)? .into_iter() .find(|r| r.tx_hash == txid) { - Some(r) => determine_tx_anchor(cps, r.height, txid), - None => continue, - }; + self.validate_merkle_for_anchor(graph_update, txid, r.height)?; + } let _ = graph_update.insert_tx(tx); - if let Some(anchor) = anchor { - let _ = graph_update.insert_anchor(txid, anchor); - } } Ok(()) } -} -/// The result of [`BdkElectrumClient::full_scan`]. -/// -/// This can be transformed into a [`FullScanResult`] with either [`ConfirmationHeightAnchor`] or -/// [`ConfirmationTimeHeightAnchor`] anchor types. -pub struct ElectrumFullScanResult(FullScanResult); - -impl ElectrumFullScanResult { - /// Return [`FullScanResult`] with [`ConfirmationHeightAnchor`]. - pub fn with_confirmation_height_anchor( - self, - ) -> FullScanResult { - self.0 - } - - /// Return [`FullScanResult`] with [`ConfirmationTimeHeightAnchor`]. - /// - /// This requires additional calls to the Electrum server. - pub fn with_confirmation_time_height_anchor( - self, - client: &BdkElectrumClient, - ) -> Result, Error> { - let res = self.0; - Ok(FullScanResult { - graph_update: try_into_confirmation_time_result(res.graph_update, &client.inner)?, - chain_update: res.chain_update, - last_active_indices: res.last_active_indices, - }) - } -} - -/// The result of [`BdkElectrumClient::sync`]. -/// -/// This can be transformed into a [`SyncResult`] with either [`ConfirmationHeightAnchor`] or -/// [`ConfirmationTimeHeightAnchor`] anchor types. -pub struct ElectrumSyncResult(SyncResult); - -impl ElectrumSyncResult { - /// Return [`SyncResult`] with [`ConfirmationHeightAnchor`]. - pub fn with_confirmation_height_anchor(self) -> SyncResult { - self.0 + // Helper function which checks if a transaction is confirmed by validating the merkle proof. + // An anchor is inserted if the transaction is validated to be in a confirmed block. + fn validate_merkle_for_anchor( + &self, + graph_update: &mut TxGraph, + txid: Txid, + confirmation_height: i32, + ) -> Result<(), Error> { + if let Ok(merkle_res) = self + .inner + .transaction_get_merkle(&txid, confirmation_height as usize) + { + let header = self.inner.block_header(merkle_res.block_height)?; + let is_confirmed_tx = electrum_client::utils::validate_merkle_proof( + &txid, + &header.merkle_root, + &merkle_res, + ); + + if is_confirmed_tx { + let _ = graph_update.insert_anchor( + txid, + ConfirmationTimeHeightAnchor { + confirmation_height: merkle_res.block_height as u32, + confirmation_time: header.time as u64, + anchor_block: BlockId { + height: merkle_res.block_height as u32, + hash: header.block_hash(), + }, + }, + ); + } + } + Ok(()) } - /// Return [`SyncResult`] with [`ConfirmationTimeHeightAnchor`]. - /// - /// This requires additional calls to the Electrum server. - pub fn with_confirmation_time_height_anchor( - self, - client: &BdkElectrumClient, - ) -> Result, Error> { - let res = self.0; - Ok(SyncResult { - graph_update: try_into_confirmation_time_result(res.graph_update, &client.inner)?, - chain_update: res.chain_update, - }) + // Helper function which fetches the `TxOut`s of our relevant transactions' previous transactions, + // which we do not have by default. This data is needed to calculate the transaction fee. + fn fetch_prev_txout( + &self, + graph_update: &mut TxGraph, + ) -> Result<(), Error> { + let full_txs: Vec> = + graph_update.full_txs().map(|tx_node| tx_node.tx).collect(); + for tx in full_txs { + for vin in &tx.input { + let outpoint = vin.previous_output; + let vout = outpoint.vout; + let prev_tx = self.fetch_tx(outpoint.txid)?; + let txout = prev_tx.output[vout as usize].clone(); + let _ = graph_update.insert_txout(outpoint, txout); + } + } + Ok(()) } } -fn try_into_confirmation_time_result( - graph_update: TxGraph, - client: &impl ElectrumApi, -) -> Result, Error> { - let relevant_heights = graph_update - .all_anchors() - .iter() - .map(|(a, _)| a.confirmation_height) - .collect::>(); - - let height_to_time = relevant_heights - .clone() - .into_iter() - .zip( - client - .batch_block_header(relevant_heights)? - .into_iter() - .map(|bh| bh.time as u64), - ) - .collect::>(); - - Ok(graph_update.map_anchors(|a| ConfirmationTimeHeightAnchor { - anchor_block: a.anchor_block, - confirmation_height: a.confirmation_height, - confirmation_time: height_to_time[&a.confirmation_height], - })) -} - -/// Return a [`CheckPoint`] of the latest tip, that connects with `prev_tip`. -fn construct_update_tip( +/// Return a [`CheckPoint`] of the latest tip, that connects with `prev_tip`. The latest blocks are +/// fetched to construct anchor updates with the proper [`BlockHash`] in case of re-org. +fn fetch_tip_and_latest_blocks( client: &impl ElectrumApi, prev_tip: CheckPoint, -) -> Result<(CheckPoint, Option), Error> { +) -> Result<(CheckPoint, BTreeMap), Error> { let HeaderNotification { height, .. } = client.block_headers_subscribe()?; let new_tip_height = height as u32; // If electrum returns a tip height that is lower than our previous tip, then checkpoints do // not need updating. We just return the previous tip and use that as the point of agreement. if new_tip_height < prev_tip.height() { - return Ok((prev_tip.clone(), Some(prev_tip.height()))); + return Ok((prev_tip, BTreeMap::new())); } // Atomically fetch the latest `CHAIN_SUFFIX_LENGTH` count of blocks from Electrum. We use this @@ -529,6 +413,7 @@ fn construct_update_tip( let agreement_height = agreement_cp.as_ref().map(CheckPoint::height); let new_tip = new_blocks + .clone() .into_iter() // Prune `new_blocks` to only include blocks that are actually new. .filter(|(height, _)| Some(*height) > agreement_height) @@ -541,51 +426,28 @@ fn construct_update_tip( }) .expect("must have at least one checkpoint"); - Ok((new_tip, agreement_height)) + Ok((new_tip, new_blocks)) } -/// A [tx status] comprises of a concatenation of `tx_hash:height:`s. We transform a single one of -/// these concatenations into a [`ConfirmationHeightAnchor`] if possible. -/// -/// We use the lowest possible checkpoint as the anchor block (from `cps`). If an anchor block -/// cannot be found, or the transaction is unconfirmed, [`None`] is returned. -/// -/// [tx status](https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status) -fn determine_tx_anchor( - cps: &BTreeMap>, - raw_height: i32, - txid: Txid, -) -> Option { - // The electrum API has a weird quirk where an unconfirmed transaction is presented with a - // height of 0. To avoid invalid representation in our data structures, we manually set - // transactions residing in the genesis block to have height 0, then interpret a height of 0 as - // unconfirmed for all other transactions. - if txid - == Txid::from_str("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b") - .expect("must deserialize genesis coinbase txid") - { - let anchor_block = cps.values().next()?.block_id(); - return Some(ConfirmationHeightAnchor { - anchor_block, - confirmation_height: 0, - }); - } - match raw_height { - h if h <= 0 => { - debug_assert!(h == 0 || h == -1, "unexpected height ({}) from electrum", h); - None - } - h => { - let h = h as u32; - let anchor_block = cps.range(h..).next().map(|(_, cp)| cp.block_id())?; - if h > anchor_block.height { - None - } else { - Some(ConfirmationHeightAnchor { - anchor_block, - confirmation_height: h, - }) - } +// Add a corresponding checkpoint per anchor height if it does not yet exist. Checkpoints should not +// surpass `latest_blocks`. +fn chain_update( + mut tip: CheckPoint, + latest_blocks: &BTreeMap, + anchors: &BTreeSet<(A, Txid)>, +) -> Result, Error> { + for anchor in anchors { + let height = anchor.0.anchor_block().height; + + // Checkpoint uses the `BlockHash` from `latest_blocks` so that the hash will be consistent + // in case of a re-org. + if tip.get(height).is_none() && height <= tip.height() { + let hash = match latest_blocks.get(&height) { + Some(&hash) => hash, + None => anchor.0.anchor_block().hash, + }; + tip = tip.insert(BlockId { hash, height }); } } + Ok(tip) } diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index c105ecca25..11582353c0 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -62,14 +62,11 @@ fn scan_detects_confirmed_tx() -> anyhow::Result<()> { // Sync up to tip. env.wait_until_electrum_sees_block()?; - let update = client - .sync( - SyncRequest::from_chain_tip(recv_chain.tip()) - .chain_spks(core::iter::once(spk_to_track)), - 5, - true, - )? - .with_confirmation_time_height_anchor(&client)?; + let update = client.sync( + SyncRequest::from_chain_tip(recv_chain.tip()).chain_spks(core::iter::once(spk_to_track)), + 5, + true, + )?; let _ = recv_chain .apply_update(update.chain_update) @@ -155,13 +152,11 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { // Sync up to tip. env.wait_until_electrum_sees_block()?; - let update = client - .sync( - SyncRequest::from_chain_tip(recv_chain.tip()).chain_spks([spk_to_track.clone()]), - 5, - false, - )? - .with_confirmation_time_height_anchor(&client)?; + let update = client.sync( + SyncRequest::from_chain_tip(recv_chain.tip()).chain_spks([spk_to_track.clone()]), + 5, + false, + )?; let _ = recv_chain .apply_update(update.chain_update) @@ -186,13 +181,11 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { env.reorg_empty_blocks(depth)?; env.wait_until_electrum_sees_block()?; - let update = client - .sync( - SyncRequest::from_chain_tip(recv_chain.tip()).chain_spks([spk_to_track.clone()]), - 5, - false, - )? - .with_confirmation_time_height_anchor(&client)?; + let update = client.sync( + SyncRequest::from_chain_tip(recv_chain.tip()).chain_spks([spk_to_track.clone()]), + 5, + false, + )?; let _ = recv_chain .apply_update(update.chain_update) diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 8467d2699a..eee406399f 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -193,8 +193,7 @@ fn main() -> anyhow::Result<()> { let res = client .full_scan::<_>(request, stop_gap, scan_options.batch_size, false) - .context("scanning the blockchain")? - .with_confirmation_height_anchor(); + .context("scanning the blockchain")?; ( res.chain_update, res.graph_update, @@ -313,8 +312,7 @@ fn main() -> anyhow::Result<()> { let res = client .sync(request, scan_options.batch_size, false) - .context("scanning the blockchain")? - .with_confirmation_height_anchor(); + .context("scanning the blockchain")?; // drop lock on graph and chain drop((graph, chain)); @@ -341,7 +339,12 @@ fn main() -> anyhow::Result<()> { let (_, keychain_changeset) = graph.index.reveal_to_target_multi(&keychain_update); indexed_tx_graph_changeset.append(keychain_changeset.into()); } - indexed_tx_graph_changeset.append(graph.apply_update(graph_update)); + indexed_tx_graph_changeset.append(graph.apply_update(graph_update.map_anchors(|a| { + ConfirmationHeightAnchor { + confirmation_height: a.confirmation_height, + anchor_block: a.anchor_block, + } + }))); (chain_changeset, indexed_tx_graph_changeset) }; diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index e6c01c20be..75d13f1a8c 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -57,9 +57,7 @@ fn main() -> Result<(), anyhow::Error> { }) .inspect_spks_for_all_keychains(|_, _, _| std::io::stdout().flush().expect("must flush")); - let mut update = client - .full_scan(request, STOP_GAP, BATCH_SIZE, false)? - .with_confirmation_time_height_anchor(&client)?; + let mut update = client.full_scan(request, STOP_GAP, BATCH_SIZE, false)?; let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); let _ = update.graph_update.update_last_seen_unconfirmed(now);