From f795a43cc72fdb4ef26ca349c4cb4f4bfd3b90b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 16:45:57 +0800 Subject: [PATCH 01/14] feat(example_cli): allow chain specific args in examples So you can pass in the esplora/electrum/bitcoind_rpc server details in the example. Co-authored-by: LLFourn --- example-crates/example_cli/src/lib.rs | 435 +++++++++----------- example-crates/example_electrum/src/main.rs | 67 ++- example-crates/example_esplora/src/main.rs | 52 ++- 3 files changed, 280 insertions(+), 274 deletions(-) diff --git a/example-crates/example_cli/src/lib.rs b/example-crates/example_cli/src/lib.rs index c9459c353..1982c30c6 100644 --- a/example-crates/example_cli/src/lib.rs +++ b/example-crates/example_cli/src/lib.rs @@ -34,7 +34,7 @@ pub type Database<'m, C> = Persist, C>; #[derive(Parser)] #[clap(author, version, about, long_about = None)] #[clap(propagate_version = true)] -pub struct Args { +pub struct Args { #[clap(env = "DESCRIPTOR")] pub descriptor: String, #[clap(env = "CHANGE_DESCRIPTOR")] @@ -50,14 +50,14 @@ pub struct Args { pub cp_limit: usize, #[clap(subcommand)] - pub command: Commands, + pub command: Commands, } #[allow(clippy::almost_swapped)] #[derive(Subcommand, Debug, Clone)] -pub enum Commands { +pub enum Commands { #[clap(flatten)] - ChainSpecific(S), + ChainSpecific(CS), /// Address generation and inspection. Address { #[clap(subcommand)] @@ -77,6 +77,8 @@ pub enum Commands { address: Address, #[clap(short, default_value = "bnb")] coin_select: CoinSelectionAlgo, + #[clap(flatten)] + chain_specfic: S, }, } @@ -183,225 +185,6 @@ impl core::fmt::Display for Keychain { } } -pub fn run_address_cmd( - graph: &mut KeychainTxGraph, - db: &Mutex>, - network: Network, - cmd: AddressCmd, -) -> anyhow::Result<()> -where - C: Default + Append + DeserializeOwned + Serialize + From>, -{ - let index = &mut graph.index; - - match cmd { - AddressCmd::Next | AddressCmd::New => { - let spk_chooser = match cmd { - AddressCmd::Next => KeychainTxOutIndex::next_unused_spk, - AddressCmd::New => KeychainTxOutIndex::reveal_next_spk, - _ => unreachable!("only these two variants exist in match arm"), - }; - - let ((spk_i, spk), index_changeset) = spk_chooser(index, &Keychain::External); - let db = &mut *db.lock().unwrap(); - db.stage(C::from(( - local_chain::ChangeSet::default(), - indexed_tx_graph::ChangeSet::from(index_changeset), - ))); - db.commit()?; - let addr = Address::from_script(spk, network).context("failed to derive address")?; - println!("[address @ {}] {}", spk_i, addr); - Ok(()) - } - AddressCmd::Index => { - for (keychain, derivation_index) in index.last_revealed_indices() { - println!("{:?}: {}", keychain, derivation_index); - } - Ok(()) - } - AddressCmd::List { change } => { - let target_keychain = match change { - true => Keychain::Internal, - false => Keychain::External, - }; - for (spk_i, spk) in index.revealed_spks_of_keychain(&target_keychain) { - let address = Address::from_script(spk, network) - .expect("should always be able to derive address"); - println!( - "{:?} {} used:{}", - spk_i, - address, - index.is_used(&(target_keychain, spk_i)) - ); - } - Ok(()) - } - } -} - -pub fn run_balance_cmd( - graph: &KeychainTxGraph, - chain: &O, -) -> Result<(), O::Error> { - fn print_balances<'a>(title_str: &'a str, items: impl IntoIterator) { - println!("{}:", title_str); - for (name, amount) in items.into_iter() { - println!(" {:<10} {:>12} sats", name, amount) - } - } - - let balance = graph.graph().try_balance( - chain, - chain.get_chain_tip()?.unwrap_or_default(), - graph.index.outpoints().iter().cloned(), - |(k, _), _| k == &Keychain::Internal, - )?; - - let confirmed_total = balance.confirmed + balance.immature; - let unconfirmed_total = balance.untrusted_pending + balance.trusted_pending; - - print_balances( - "confirmed", - [ - ("total", confirmed_total), - ("spendable", balance.confirmed), - ("immature", balance.immature), - ], - ); - print_balances( - "unconfirmed", - [ - ("total", unconfirmed_total), - ("trusted", balance.trusted_pending), - ("untrusted", balance.untrusted_pending), - ], - ); - - Ok(()) -} - -pub fn run_txo_cmd( - graph: &KeychainTxGraph, - chain: &O, - network: Network, - cmd: TxOutCmd, -) -> anyhow::Result<()> -where - O::Error: std::error::Error + Send + Sync + 'static, -{ - let chain_tip = chain.get_chain_tip()?.unwrap_or_default(); - let outpoints = graph.index.outpoints().iter().cloned(); - - match cmd { - TxOutCmd::List { - spent, - unspent, - confirmed, - unconfirmed, - } => { - let txouts = graph - .graph() - .try_filter_chain_txouts(chain, chain_tip, outpoints) - .filter(|r| match r { - Ok((_, full_txo)) => match (spent, unspent) { - (true, false) => full_txo.spent_by.is_some(), - (false, true) => full_txo.spent_by.is_none(), - _ => true, - }, - // always keep errored items - Err(_) => true, - }) - .filter(|r| match r { - Ok((_, full_txo)) => match (confirmed, unconfirmed) { - (true, false) => full_txo.chain_position.is_confirmed(), - (false, true) => !full_txo.chain_position.is_confirmed(), - _ => true, - }, - // always keep errored items - Err(_) => true, - }) - .collect::, _>>()?; - - for (spk_i, full_txo) in txouts { - let addr = Address::from_script(&full_txo.txout.script_pubkey, network)?; - println!( - "{:?} {} {} {} spent:{:?}", - spk_i, full_txo.txout.value, full_txo.outpoint, addr, full_txo.spent_by - ) - } - Ok(()) - } - } -} - -#[allow(clippy::too_many_arguments)] -pub fn run_send_cmd( - graph: &Mutex>, - db: &Mutex>, - chain: &O, - keymap: &HashMap, - cs_algorithm: CoinSelectionAlgo, - address: Address, - value: u64, - broadcast: impl FnOnce(&Transaction) -> anyhow::Result<()>, -) -> anyhow::Result<()> -where - O::Error: std::error::Error + Send + Sync + 'static, - C: Default + Append + DeserializeOwned + Serialize + From>, -{ - let (transaction, change_index) = { - let graph = &mut *graph.lock().unwrap(); - // take mutable ref to construct tx -- it is only open for a short time while building it. - let (tx, change_info) = create_tx(graph, chain, keymap, cs_algorithm, address, value)?; - - if let Some((index_changeset, (change_keychain, index))) = change_info { - // We must first persist to disk the fact that we've got a new address from the - // change keychain so future scans will find the tx we're about to broadcast. - // If we're unable to persist this, then we don't want to broadcast. - { - let db = &mut *db.lock().unwrap(); - db.stage(C::from(( - local_chain::ChangeSet::default(), - indexed_tx_graph::ChangeSet::from(index_changeset), - ))); - db.commit()?; - } - - // We don't want other callers/threads to use this address while we're using it - // but we also don't want to scan the tx we just created because it's not - // technically in the blockchain yet. - graph.index.mark_used(&change_keychain, index); - (tx, Some((change_keychain, index))) - } else { - (tx, None) - } - }; - - match (broadcast)(&transaction) { - Ok(_) => { - println!("Broadcasted Tx : {}", transaction.txid()); - - let keychain_changeset = graph.lock().unwrap().insert_tx(&transaction, None, None); - - // We know the tx is at least unconfirmed now. Note if persisting here fails, - // it's not a big deal since we can always find it again form - // blockchain. - db.lock().unwrap().stage(C::from(( - local_chain::ChangeSet::default(), - keychain_changeset, - ))); - Ok(()) - } - Err(e) => { - if let Some((keychain, index)) = change_index { - // We failed to broadcast, so allow our change address to be used in the future - graph.lock().unwrap().index.unmark_used(&keychain, index); - } - Err(e) - } - } -} - #[allow(clippy::type_complexity)] pub fn create_tx( graph: &mut KeychainTxGraph, @@ -647,14 +430,14 @@ pub fn planned_utxos( +pub fn handle_commands( graph: &Mutex>, db: &Mutex>, chain: &Mutex, keymap: &HashMap, network: Network, - broadcast: impl FnOnce(&Transaction) -> anyhow::Result<()>, - cmd: Commands, + broadcast: impl FnOnce(S, &Transaction) -> anyhow::Result<()>, + cmd: Commands, ) -> anyhow::Result<()> where O::Error: std::error::Error + Send + Sync + 'static, @@ -664,45 +447,213 @@ where Commands::ChainSpecific(_) => unreachable!("example code should handle this!"), Commands::Address { addr_cmd } => { let graph = &mut *graph.lock().unwrap(); - run_address_cmd(graph, db, network, addr_cmd) + let index = &mut graph.index; + + match addr_cmd { + AddressCmd::Next | AddressCmd::New => { + let spk_chooser = match addr_cmd { + AddressCmd::Next => KeychainTxOutIndex::next_unused_spk, + AddressCmd::New => KeychainTxOutIndex::reveal_next_spk, + _ => unreachable!("only these two variants exist in match arm"), + }; + + let ((spk_i, spk), index_changeset) = spk_chooser(index, &Keychain::External); + let db = &mut *db.lock().unwrap(); + db.stage(C::from(( + local_chain::ChangeSet::default(), + indexed_tx_graph::ChangeSet::from(index_changeset), + ))); + db.commit()?; + let addr = + Address::from_script(spk, network).context("failed to derive address")?; + println!("[address @ {}] {}", spk_i, addr); + Ok(()) + } + AddressCmd::Index => { + for (keychain, derivation_index) in index.last_revealed_indices() { + println!("{:?}: {}", keychain, derivation_index); + } + Ok(()) + } + AddressCmd::List { change } => { + let target_keychain = match change { + true => Keychain::Internal, + false => Keychain::External, + }; + for (spk_i, spk) in index.revealed_spks_of_keychain(&target_keychain) { + let address = Address::from_script(spk, network) + .expect("should always be able to derive address"); + println!( + "{:?} {} used:{}", + spk_i, + address, + index.is_used(&(target_keychain, spk_i)) + ); + } + Ok(()) + } + } } Commands::Balance => { let graph = &*graph.lock().unwrap(); let chain = &*chain.lock().unwrap(); - run_balance_cmd(graph, chain).map_err(anyhow::Error::from) + fn print_balances<'a>( + title_str: &'a str, + items: impl IntoIterator, + ) { + println!("{}:", title_str); + for (name, amount) in items.into_iter() { + println!(" {:<10} {:>12} sats", name, amount) + } + } + + let balance = graph.graph().try_balance( + chain, + chain.get_chain_tip()?.unwrap_or_default(), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + )?; + + let confirmed_total = balance.confirmed + balance.immature; + let unconfirmed_total = balance.untrusted_pending + balance.trusted_pending; + + print_balances( + "confirmed", + [ + ("total", confirmed_total), + ("spendable", balance.confirmed), + ("immature", balance.immature), + ], + ); + print_balances( + "unconfirmed", + [ + ("total", unconfirmed_total), + ("trusted", balance.trusted_pending), + ("untrusted", balance.untrusted_pending), + ], + ); + + Ok(()) } Commands::TxOut { txout_cmd } => { let graph = &*graph.lock().unwrap(); let chain = &*chain.lock().unwrap(); - run_txo_cmd(graph, chain, network, txout_cmd) + let chain_tip = chain.get_chain_tip()?.unwrap_or_default(); + let outpoints = graph.index.outpoints().iter().cloned(); + + match txout_cmd { + TxOutCmd::List { + spent, + unspent, + confirmed, + unconfirmed, + } => { + let txouts = graph + .graph() + .try_filter_chain_txouts(chain, chain_tip, outpoints) + .filter(|r| match r { + Ok((_, full_txo)) => match (spent, unspent) { + (true, false) => full_txo.spent_by.is_some(), + (false, true) => full_txo.spent_by.is_none(), + _ => true, + }, + // always keep errored items + Err(_) => true, + }) + .filter(|r| match r { + Ok((_, full_txo)) => match (confirmed, unconfirmed) { + (true, false) => full_txo.chain_position.is_confirmed(), + (false, true) => !full_txo.chain_position.is_confirmed(), + _ => true, + }, + // always keep errored items + Err(_) => true, + }) + .collect::, _>>()?; + + for (spk_i, full_txo) in txouts { + let addr = Address::from_script(&full_txo.txout.script_pubkey, network)?; + println!( + "{:?} {} {} {} spent:{:?}", + spk_i, full_txo.txout.value, full_txo.outpoint, addr, full_txo.spent_by + ) + } + Ok(()) + } + } } Commands::Send { value, address, coin_select, + chain_specfic, } => { let chain = &*chain.lock().unwrap(); let address = address.require_network(network)?; - run_send_cmd( - graph, - db, - chain, - keymap, - coin_select, - address, - value, - broadcast, - ) + let (transaction, change_index) = { + let graph = &mut *graph.lock().unwrap(); + // take mutable ref to construct tx -- it is only open for a short time while building it. + let (tx, change_info) = + create_tx(graph, chain, keymap, coin_select, address, value)?; + + if let Some((index_changeset, (change_keychain, index))) = change_info { + // We must first persist to disk the fact that we've got a new address from the + // change keychain so future scans will find the tx we're about to broadcast. + // If we're unable to persist this, then we don't want to broadcast. + { + let db = &mut *db.lock().unwrap(); + db.stage(C::from(( + local_chain::ChangeSet::default(), + indexed_tx_graph::ChangeSet::from(index_changeset), + ))); + db.commit()?; + } + + // We don't want other callers/threads to use this address while we're using it + // but we also don't want to scan the tx we just created because it's not + // technically in the blockchain yet. + graph.index.mark_used(&change_keychain, index); + (tx, Some((change_keychain, index))) + } else { + (tx, None) + } + }; + + match (broadcast)(chain_specfic, &transaction) { + Ok(_) => { + println!("Broadcasted Tx : {}", transaction.txid()); + + let keychain_changeset = + graph.lock().unwrap().insert_tx(&transaction, None, None); + + // We know the tx is at least unconfirmed now. Note if persisting here fails, + // it's not a big deal since we can always find it again form + // blockchain. + db.lock().unwrap().stage(C::from(( + local_chain::ChangeSet::default(), + keychain_changeset, + ))); + Ok(()) + } + Err(e) => { + if let Some((keychain, index)) = change_index { + // We failed to broadcast, so allow our change address to be used in the future + graph.lock().unwrap().index.unmark_used(&keychain, index); + } + Err(e) + } + } } } } #[allow(clippy::type_complexity)] -pub fn init<'m, S: clap::Subcommand, C>( +pub fn init<'m, CS: clap::Subcommand, S: clap::Args, C>( db_magic: &'m [u8], db_default_path: &str, ) -> anyhow::Result<( - Args, + Args, KeyMap, KeychainTxOutIndex, Mutex>, @@ -714,7 +665,7 @@ where if std::env::var("BDK_DB_PATH").is_err() { std::env::set_var("BDK_DB_PATH", db_default_path); } - let args = Args::::parse(); + let args = Args::::parse(); let secp = Secp256k1::default(); let mut index = KeychainTxOutIndex::::default(); diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index a05e85c57..be5ffc7bc 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -12,7 +12,7 @@ use bdk_chain::{ Append, ConfirmationHeightAnchor, }; use bdk_electrum::{ - electrum_client::{self, ElectrumApi}, + electrum_client::{self, Client, ElectrumApi}, ElectrumExt, ElectrumUpdate, }; use example_cli::{ @@ -33,6 +33,8 @@ enum ElectrumCommands { stop_gap: usize, #[clap(flatten)] scan_options: ScanOptions, + #[clap(flatten)] + electrum_args: ElectrumArgs, }, /// Scans particular addresses using the electrum API. Sync { @@ -50,9 +52,44 @@ enum ElectrumCommands { unconfirmed: bool, #[clap(flatten)] scan_options: ScanOptions, + #[clap(flatten)] + electrum_args: ElectrumArgs, }, } +impl ElectrumCommands { + fn electrum_args(&self) -> ElectrumArgs { + match self { + ElectrumCommands::Scan { electrum_args, .. } => electrum_args.clone(), + ElectrumCommands::Sync { electrum_args, .. } => electrum_args.clone(), + } + } +} + +#[derive(clap::Args, Debug, Clone)] +pub struct ElectrumArgs { + /// The electrum url to use to connect to. If not provided it will use a default electrum server + /// for your chosen network. + electrum_url: Option, +} + +impl ElectrumArgs { + pub fn client(&self, network: Network) -> anyhow::Result { + let electrum_url = self.electrum_url.as_deref().unwrap_or(match network { + Network::Bitcoin => "ssl://electrum.blockstream.info:50002", + Network::Testnet => "ssl://electrum.blockstream.info:60002", + Network::Regtest => "tcp://localhost:60401", + Network::Signet => "tcp://signet-electrumx.wakiyamap.dev:50001", + _ => panic!("Unknown network"), + }); + let config = electrum_client::Config::builder() + .validate_domain(matches!(network, Network::Bitcoin)) + .build(); + + Ok(electrum_client::Client::from_config(electrum_url, config)?) + } +} + #[derive(Parser, Debug, Clone, PartialEq)] pub struct ScanOptions { /// Set batch size for each script_history call to electrum client. @@ -67,7 +104,7 @@ type ChangeSet = ( fn main() -> anyhow::Result<()> { let (args, keymap, index, db, (disk_local_chain, disk_tx_graph)) = - example_cli::init::(DB_MAGIC, DB_PATH)?; + example_cli::init::(DB_MAGIC, DB_PATH)?; let graph = Mutex::new({ let mut graph = IndexedTxGraph::new(index); @@ -77,19 +114,6 @@ fn main() -> anyhow::Result<()> { let chain = Mutex::new(LocalChain::from_changeset(disk_local_chain)); - let electrum_url = match args.network { - Network::Bitcoin => "ssl://electrum.blockstream.info:50002", - Network::Testnet => "ssl://electrum.blockstream.info:60002", - Network::Regtest => "tcp://localhost:60401", - Network::Signet => "tcp://signet-electrumx.wakiyamap.dev:50001", - _ => panic!("Unknown network"), - }; - let config = electrum_client::Config::builder() - .validate_domain(matches!(args.network, Network::Bitcoin)) - .build(); - - let client = electrum_client::Client::from_config(electrum_url, config)?; - let electrum_cmd = match &args.command { example_cli::Commands::ChainSpecific(electrum_cmd) => electrum_cmd, general_cmd => { @@ -99,11 +123,10 @@ fn main() -> anyhow::Result<()> { &chain, &keymap, args.network, - |tx| { - client - .transaction_broadcast(tx) - .map(|_| ()) - .map_err(anyhow::Error::from) + |electrum_args, tx| { + let client = electrum_args.client(args.network)?; + client.transaction_broadcast(tx)?; + Ok(()) }, general_cmd.clone(), ); @@ -113,10 +136,13 @@ fn main() -> anyhow::Result<()> { } }; + let client = electrum_cmd.electrum_args().client(args.network)?; + let response = match electrum_cmd.clone() { ElectrumCommands::Scan { stop_gap, scan_options, + .. } => { let (keychain_spks, tip) = { let graph = &*graph.lock().unwrap(); @@ -162,6 +188,7 @@ fn main() -> anyhow::Result<()> { mut utxos, mut unconfirmed, scan_options, + .. } => { // Get a short lock on the tracker to get the spks we're interested in let graph = graph.lock().unwrap(); diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index 5791fe61a..d2ba62d0b 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -37,6 +37,8 @@ enum EsploraCommands { stop_gap: usize, #[clap(flatten)] scan_options: ScanOptions, + #[clap(flatten)] + esplora_args: EsploraArgs, }, /// Scan for particular addresses and unconfirmed transactions using the esplora API. Sync { @@ -54,8 +56,40 @@ enum EsploraCommands { unconfirmed: bool, #[clap(flatten)] scan_options: ScanOptions, + #[clap(flatten)] + esplora_args: EsploraArgs, }, } +impl EsploraCommands { + fn esplora_args(&self) -> EsploraArgs { + match self { + EsploraCommands::Scan { esplora_args, .. } => esplora_args.clone(), + EsploraCommands::Sync { esplora_args, .. } => esplora_args.clone(), + } + } +} + +#[derive(clap::Args, Debug, Clone)] +pub struct EsploraArgs { + /// The esplora url endpoint to connect to e.g. `` + /// If not provided it'll be set to a default for the network provided + esplora_url: Option, +} + +impl EsploraArgs { + pub fn client(&self, network: Network) -> anyhow::Result { + let esplora_url = self.esplora_url.as_deref().unwrap_or(match network { + Network::Bitcoin => "https://blockstream.info/api", + Network::Testnet => "https://blockstream.info/testnet/api", + Network::Regtest => "http://localhost:3002", + Network::Signet => "https://mempool.space/signet/api", + _ => panic!("unsupported network"), + }); + + let client = esplora_client::Builder::new(esplora_url).build_blocking()?; + Ok(client) + } +} #[derive(Parser, Debug, Clone, PartialEq)] pub struct ScanOptions { @@ -66,7 +100,7 @@ pub struct ScanOptions { fn main() -> anyhow::Result<()> { let (args, keymap, index, db, init_changeset) = - example_cli::init::(DB_MAGIC, DB_PATH)?; + example_cli::init::(DB_MAGIC, DB_PATH)?; let (init_chain_changeset, init_indexed_tx_graph_changeset) = init_changeset; @@ -84,16 +118,6 @@ fn main() -> anyhow::Result<()> { chain }); - let esplora_url = match args.network { - Network::Bitcoin => "https://blockstream.info/api", - Network::Testnet => "https://blockstream.info/testnet/api", - Network::Regtest => "http://localhost:3002", - Network::Signet => "https://mempool.space/signet/api", - _ => panic!("unsupported network"), - }; - - let client = esplora_client::Builder::new(esplora_url).build_blocking()?; - let esplora_cmd = match &args.command { // These are commands that are handled by this example (sync, scan). example_cli::Commands::ChainSpecific(esplora_cmd) => esplora_cmd, @@ -105,7 +129,8 @@ fn main() -> anyhow::Result<()> { &chain, &keymap, args.network, - |tx| { + |esplora_args, tx| { + let client = esplora_args.client(args.network)?; client .broadcast(tx) .map(|_| ()) @@ -119,6 +144,7 @@ fn main() -> anyhow::Result<()> { } }; + let client = esplora_cmd.esplora_args().client(args.network)?; // Prepare the `IndexedTxGraph` update based on whether we are scanning or syncing. // Scanning: We are iterating through spks of all keychains and scanning for transactions for // each spk. We start with the lowest derivation index spk and stop scanning after `stop_gap` @@ -131,6 +157,7 @@ fn main() -> anyhow::Result<()> { EsploraCommands::Scan { stop_gap, scan_options, + .. } => { let keychain_spks = graph .lock() @@ -184,6 +211,7 @@ fn main() -> anyhow::Result<()> { mut utxos, mut unconfirmed, scan_options, + .. } => { if !(*all_spks || unused_spks || utxos || unconfirmed) { // If nothing is specifically selected, we select everything (except all spks). From b3db5ca9df4302fddcd5474569849371f2be7e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 16:54:28 +0800 Subject: [PATCH 02/14] feat(chain): add `AnchorFromBlockPosition` trait This is useful for block-by-block chain sources. We can determine the tx's anchor based on the block, block height and tx position in the block. --- crates/chain/src/chain_data.rs | 28 +++++++++++++++++++++++++++- crates/chain/src/tx_data_traits.rs | 9 ++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/crates/chain/src/chain_data.rs b/crates/chain/src/chain_data.rs index 550854298..537f0bf7e 100644 --- a/crates/chain/src/chain_data.rs +++ b/crates/chain/src/chain_data.rs @@ -1,6 +1,6 @@ use bitcoin::{hashes::Hash, BlockHash, OutPoint, TxOut, Txid}; -use crate::{Anchor, COINBASE_MATURITY}; +use crate::{Anchor, AnchorFromBlockPosition, COINBASE_MATURITY}; /// Represents the observed position of some chain data. /// @@ -109,6 +109,12 @@ impl Anchor for BlockId { } } +impl AnchorFromBlockPosition for BlockId { + fn from_block_position(_block: &bitcoin::Block, block_id: BlockId, _tx_pos: usize) -> Self { + block_id + } +} + impl Default for BlockId { fn default() -> Self { Self { @@ -168,6 +174,15 @@ impl Anchor for ConfirmationHeightAnchor { } } +impl AnchorFromBlockPosition for ConfirmationHeightAnchor { + fn from_block_position(_block: &bitcoin::Block, block_id: BlockId, _tx_pos: usize) -> Self { + Self { + anchor_block: block_id, + confirmation_height: block_id.height, + } + } +} + /// An [`Anchor`] implementation that also records the exact confirmation time and height of the /// transaction. /// @@ -196,6 +211,17 @@ impl Anchor for ConfirmationTimeAnchor { self.confirmation_height } } + +impl AnchorFromBlockPosition for ConfirmationTimeAnchor { + fn from_block_position(block: &bitcoin::Block, block_id: BlockId, _tx_pos: usize) -> Self { + Self { + anchor_block: block_id, + confirmation_height: block_id.height, + confirmation_time: block.header.time as _, + } + } +} + /// A `TxOut` with as much data as we can retrieve about it #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct FullTxOut { diff --git a/crates/chain/src/tx_data_traits.rs b/crates/chain/src/tx_data_traits.rs index 274bae36e..c957a3e57 100644 --- a/crates/chain/src/tx_data_traits.rs +++ b/crates/chain/src/tx_data_traits.rs @@ -76,12 +76,19 @@ pub trait Anchor: core::fmt::Debug + Clone + Eq + PartialOrd + Ord + core::hash: } } -impl Anchor for &'static A { +impl<'a, A: Anchor> Anchor for &'a A { fn anchor_block(&self) -> BlockId { ::anchor_block(self) } } +/// An [`Anchor`] that can be constructed from a given block, block height and transaction position +/// within the block. +pub trait AnchorFromBlockPosition: Anchor { + /// Construct the anchor from a given `block`, block height and `tx_pos` within the block. + fn from_block_position(block: &bitcoin::Block, block_id: BlockId, tx_pos: usize) -> Self; +} + /// Trait that makes an object appendable. pub trait Append { /// Append another object of the same type onto `self`. From 43bc813c6498b5b021d70a6127ff7e57d7337813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 16:57:19 +0800 Subject: [PATCH 03/14] chain: add helper methods on `CheckPoint` * `CheckPoint::from_header` allows us to construct a checkpoint from block header. * `CheckPoint::into_update` transforms the cp into a `local_chain::Update`. --- crates/chain/src/local_chain.rs | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index d6cb20aa2..094b77424 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -39,6 +39,41 @@ impl CheckPoint { Self(Arc::new(CPInner { block, prev: None })) } + /// Construct a checkpoint from the given `header` and block `height`. + /// + /// If `header` is of the genesis block, the checkpoint won't have a [`prev`] node. Otherwise, + /// we return a checkpoint linked with the previous block. + /// + /// [`prev`]: CheckPoint::prev + pub fn from_header(header: &bitcoin::block::Header, height: u32) -> Self { + let hash = header.block_hash(); + let this_block_id = BlockId { height, hash }; + + let prev_height = match height.checked_sub(1) { + Some(h) => h, + None => return Self::new(this_block_id), + }; + + let prev_block_id = BlockId { + height: prev_height, + hash: header.prev_blockhash, + }; + + CheckPoint::new(prev_block_id) + .push(this_block_id) + .expect("must construct checkpoint") + } + + /// Convenience method to convert the [`CheckPoint`] into an [`Update`]. + /// + /// For more information, refer to [`Update`]. + pub fn into_update(self, introduce_older_blocks: bool) -> Update { + Update { + tip: self, + introduce_older_blocks, + } + } + /// Puts another checkpoint onto the linked list representing the blockchain. /// /// Returns an `Err(self)` if the block you are pushing on is not at a greater height that the one you From 240657b1674ad901c77090ee1fa96dc0d71e91d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 17:10:46 +0800 Subject: [PATCH 04/14] chain: add batch-insert methods for `IndexedTxGraph` --- crates/chain/src/indexed_tx_graph.rs | 166 +++++++++++++++++--- crates/chain/tests/test_indexed_tx_graph.rs | 13 +- 2 files changed, 148 insertions(+), 31 deletions(-) diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 4df7e85e6..065e0892a 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -3,12 +3,12 @@ //! This is essentially a [`TxGraph`] combined with an indexer. use alloc::vec::Vec; -use bitcoin::{OutPoint, Transaction, TxOut}; +use bitcoin::{Block, OutPoint, Transaction, TxOut}; use crate::{ keychain, tx_graph::{self, TxGraph}, - Anchor, Append, + Anchor, AnchorFromBlockPosition, Append, BlockId, }; /// A struct that combines [`TxGraph`] and an [`Indexer`] implementation. @@ -126,17 +126,13 @@ where self.apply_update(update) } - /// Insert relevant transactions from the given `txs` iterator. + /// Batch insert transactions, filtering out those that are irrelevant. /// /// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. Irrelevant /// transactions in `txs` will be ignored. `txs` do not need to be in topological order. - /// - /// `anchors` can be provided to anchor the transactions to blocks. `seen_at` is a unix - /// timestamp of when the transactions are last seen. - pub fn insert_relevant_txs<'t>( + pub fn batch_insert_relevant<'t>( &mut self, - txs: impl IntoIterator)>, - seen_at: Option, + txs: impl IntoIterator>>, ) -> ChangeSet { // The algorithm below allows for non-topologically ordered transactions by using two loops. // This is achieved by: @@ -145,27 +141,149 @@ where // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant` // returns true or not. (in a second loop). let mut changeset = ChangeSet::::default(); - let mut transactions = Vec::new(); - for (tx, anchors) in txs.into_iter() { + + let txs = txs + .into_iter() + .inspect(|(tx, _, _)| changeset.indexer.append(self.index.index_tx(tx))) + .collect::>(); + + for (tx, anchors, seen_at) in txs { + if self.index.is_tx_relevant(tx) { + changeset.append(self.insert_tx(tx, anchors, seen_at)); + } + } + + changeset + } + + /// Batch insert transactions. + /// + /// All transactions in `txs` will be inserted. To filter out irrelevant transactions, use + /// [`batch_insert_relevant`] instead. + /// + /// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant + pub fn batch_insert<'t>( + &mut self, + txs: impl IntoIterator>>, + ) -> ChangeSet { + let mut changeset = ChangeSet::::default(); + for (tx, anchors, seen_at) in txs { changeset.indexer.append(self.index.index_tx(tx)); - transactions.push((tx, anchors)); + changeset.append(self.insert_tx(tx, anchors, seen_at)); } - changeset.append( - transactions - .into_iter() - .filter_map(|(tx, anchors)| match self.index.is_tx_relevant(tx) { - true => Some(self.insert_tx(tx, anchors, seen_at)), - false => None, - }) - .fold(Default::default(), |mut acc, other| { - acc.append(other); - acc - }), - ); changeset } + + /// Batch insert unconfirmed transactions, filtering out those that are irrelevant. + /// + /// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`. + /// Irrelevant tansactions in `txs` will be ignored. + /// + /// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp. + /// The *last seen* communicates when the transaction is last seen in the mempool which is used + /// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). + pub fn batch_insert_relevant_unconfirmed<'t>( + &mut self, + unconfirmed_txs: impl IntoIterator)>, + ) -> ChangeSet { + self.batch_insert_relevant( + unconfirmed_txs + .into_iter() + .map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)), + ) + } + + /// Batch insert unconfirmed transactions. + /// + /// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp. + /// The *last seen* communicates when the transaction is last seen in the mempool which is used + /// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). + /// + /// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead. + /// + /// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed + pub fn batch_insert_unconfirmed<'t>( + &mut self, + unconfirmed_txs: impl IntoIterator)>, + ) -> ChangeSet { + self.batch_insert( + unconfirmed_txs + .into_iter() + .map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)), + ) + } +} + +/// Methods are available if the anchor (`A`) implements [`AnchorFromBlockPosition`]. +impl IndexedTxGraph +where + I::ChangeSet: Default + Append, + A: AnchorFromBlockPosition, +{ + /// Batch insert all transactions of the given `block` of `height`, filtering out those that are + /// irrelevant. + /// + /// Each inserted transaction's anchor will be constructed from + /// [`AnchorFromBlockPosition::from_block_position`]. + /// + /// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`. + /// Irrelevant tansactions in `txs` will be ignored. + pub fn apply_block_relevant( + &mut self, + block: Block, + height: u32, + ) -> ChangeSet { + let block_id = BlockId { + hash: block.block_hash(), + height, + }; + let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| { + ( + tx, + core::iter::once(A::from_block_position(&block, block_id, tx_pos)), + None, + ) + }); + self.batch_insert_relevant(txs) + } + + /// Batch insert all transactions of the given `block` of `height`. + /// + /// Each inserted transaction's anchor will be constructed from + /// [`AnchorFromBlockPosition::from_block_position`]. + /// + /// To only insert relevant transactions, use [`apply_block_relevant`] instead. + /// + /// [`apply_block_relevant`]: IndexedTxGraph::apply_block_relevant + pub fn apply_block(&mut self, block: Block, height: u32) -> ChangeSet { + let block_id = BlockId { + hash: block.block_hash(), + height, + }; + let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| { + ( + tx, + core::iter::once(A::from_block_position(&block, block_id, tx_pos)), + None, + ) + }); + self.batch_insert(txs) + } } +/// A tuple of a transaction, and associated metadata, that are to be inserted into [`IndexedTxGraph`]. +/// +/// This tuple contains fields in the following order: +/// * A reference to the transaction. +/// * A collection of [`Anchor`]s. +/// * An optional last-seen timestamp. +/// +/// This is used as a input item of [`batch_insert_relevant`] and [`batch_insert`]. +/// +/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant +/// [`batch_insert`]: IndexedTxGraph::batch_insert +pub type InsertTxItem<'t, A> = (&'t Transaction, A, Option); + /// A structure that represents changes to an [`IndexedTxGraph`]. #[derive(Clone, Debug, PartialEq)] #[cfg_attr( diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 84506ec11..5f95e111d 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -74,7 +74,7 @@ fn insert_relevant_txs() { }; assert_eq!( - graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None)), None), + graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None, None))), changeset, ); @@ -211,8 +211,8 @@ fn test_list_owned_txouts() { // Insert transactions into graph with respective anchors // For unconfirmed txs we pass in `None`. - let _ = graph.insert_relevant_txs( - [&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| { + let _ = + graph.batch_insert_relevant([&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| { let height = i as u32; ( *tx, @@ -225,12 +225,11 @@ fn test_list_owned_txouts() { anchor_block, confirmation_height: anchor_block.height, }), + None, ) - }), - None, - ); + })); - let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None)), Some(100)); + let _ = graph.batch_insert_relevant([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100)))); // A helper lambda to extract and filter data from the graph. let fetch = From bb7424d11d3a4bd837ddde0f42f9abd93d56aee1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 18:11:17 +0800 Subject: [PATCH 05/14] feat(bitcoind_rpc): introduce `bitcoind_rpc` crate --- Cargo.toml | 1 + crates/bitcoind_rpc/Cargo.toml | 21 +++ crates/bitcoind_rpc/src/lib.rs | 243 +++++++++++++++++++++++++++++++++ 3 files changed, 265 insertions(+) create mode 100644 crates/bitcoind_rpc/Cargo.toml create mode 100644 crates/bitcoind_rpc/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index b20ef222d..a5058ebc4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "crates/file_store", "crates/electrum", "crates/esplora", + "crates/bitcoind_rpc", "example-crates/example_cli", "example-crates/example_electrum", "example-crates/example_esplora", diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml new file mode 100644 index 000000000..eeb9de581 --- /dev/null +++ b/crates/bitcoind_rpc/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "bdk_bitcoind_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# For no-std, remember to enable the bitcoin/no-std feature +bitcoin = { version = "0.30", default-features = false } +bitcoincore-rpc = { version = "0.17" } + +[dev-dependencies] +bdk_chain = { path = "../chain", version = "0.5", features = ["serde", "miniscript"] } +bitcoind = { version = "0.33", features = ["25_0"] } +anyhow = { version = "1" } + +[features] +default = ["std"] +std = ["bitcoin/std"] +serde = ["bitcoin/serde"] diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs new file mode 100644 index 000000000..a4b28c8e8 --- /dev/null +++ b/crates/bitcoind_rpc/src/lib.rs @@ -0,0 +1,243 @@ +//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface (excluding the +//! RPC wallet API). +//! +//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`]. +//! +//! To only get block updates (exclude mempool transactions), the caller can use +//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means +//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole +//! mempool. +#![warn(missing_docs)] + +use std::collections::BTreeMap; + +use bitcoin::{block::Header, Block, BlockHash, Transaction}; +pub use bitcoincore_rpc; +use bitcoincore_rpc::bitcoincore_rpc_json; + +/// A structure that emits data sourced from [`bitcoincore_rpc::Client`]. +/// +/// Refer to [module-level documentation] for more. +/// +/// [module-level documentation]: crate +pub struct Emitter<'c, C> { + client: &'c C, + start_height: u32, + + emitted_blocks: BTreeMap, + last_block: Option, + + /// The latest first-seen epoch of emitted mempool transactions. This is used to determine + /// whether a mempool transaction is already emitted. + last_mempool_time: usize, + + /// The last emitted block during our last mempool emission. This is used to determine whether + /// there has been a reorg since our last mempool emission. + last_mempool_tip: Option<(u32, BlockHash)>, +} + +impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { + /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`]. + /// + /// `start_height` is the block height to start emitting blocks from. + pub fn new(client: &'c C, start_height: u32) -> Self { + Self { + client, + start_height, + emitted_blocks: BTreeMap::new(), + last_block: None, + last_mempool_time: 0, + last_mempool_tip: None, + } + } + + /// Emit mempool transactions, alongside their first-seen unix timestamps. + /// + /// Ideally, this method would only emit the same transaction once. However, if the receiver + /// filters transactions based on whether it alters the output set of tracked script pubkeys, + /// there are situations where we would want to re-emit. For example, if an emitted mempool + /// transaction spends a tracked UTXO which is confirmed at height `h`, but the receiver has + /// only seen up to block of height `h-1`, we want to re-emit this transaction until the + /// receiver has seen the block at height `h`. + /// + /// In other words, we want to re-emit a transaction if we cannot guarantee it's ancestors are + /// already emitted. + pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { + let client = self.client; + + let prev_mempool_tip = match self.last_mempool_tip { + // use 'avoid-re-emission' logic if there is no reorg + Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height, + _ => 0, + }; + + let prev_mempool_time = self.last_mempool_time; + let mut latest_time = prev_mempool_time; + + let txs_to_emit = client + .get_raw_mempool_verbose()? + .into_iter() + .filter_map({ + let latest_time = &mut latest_time; + move |(txid, tx_entry)| -> Option> { + let tx_time = tx_entry.time as usize; + if tx_time > *latest_time { + *latest_time = tx_time; + } + + // Avoid emitting transactions that are already emitted if we can guarantee + // blocks containing ancestors are already emitted. The bitcoind rpc interface + // provides us with the block height that the tx is introduced to the mempool. + // If we have already emitted the block of height, we can assume that all + // ancestor txs have been processed by the receiver. + let is_already_emitted = tx_time <= prev_mempool_time; + let is_within_height = tx_entry.height <= prev_mempool_tip as _; + if is_already_emitted && is_within_height { + return None; + } + + let tx = match client.get_raw_transaction(&txid, None) { + Ok(tx) => tx, + // the tx is confirmed or evicted since `get_raw_mempool_verbose` + Err(err) if err.is_not_found_error() => return None, + Err(err) => return Some(Err(err)), + }; + + Some(Ok((tx, tx_time as u64))) + } + }) + .collect::, _>>()?; + + self.last_mempool_time = latest_time; + self.last_mempool_tip = self + .emitted_blocks + .iter() + .last() + .map(|(&height, &hash)| (height, hash)); + + Ok(txs_to_emit) + } + + /// Emit the next block height and header (if any). + pub fn next_header(&mut self) -> Result, bitcoincore_rpc::Error> { + poll(self, |hash| self.client.get_block_header(hash)) + } + + /// Emit the next block height and block (if any). + pub fn next_block(&mut self) -> Result, bitcoincore_rpc::Error> { + poll(self, |hash| self.client.get_block(hash)) + } +} + +enum PollResponse { + Block(bitcoincore_rpc_json::GetBlockResult), + NoMoreBlocks, + /// Fetched block is not in the best chain. + BlockNotInBestChain, + AgreementFound(bitcoincore_rpc_json::GetBlockResult), + AgreementPointNotFound, +} + +fn poll_once(emitter: &Emitter) -> Result +where + C: bitcoincore_rpc::RpcApi, +{ + let client = emitter.client; + + if let Some(last_res) = &emitter.last_block { + assert!(!emitter.emitted_blocks.is_empty()); + + let next_hash = match last_res.nextblockhash { + None => return Ok(PollResponse::NoMoreBlocks), + Some(next_hash) => next_hash, + }; + + let res = client.get_block_info(&next_hash)?; + if res.confirmations < 0 { + return Ok(PollResponse::BlockNotInBestChain); + } + return Ok(PollResponse::Block(res)); + } + + if emitter.emitted_blocks.is_empty() { + let hash = client.get_block_hash(emitter.start_height as _)?; + + let res = client.get_block_info(&hash)?; + if res.confirmations < 0 { + return Ok(PollResponse::BlockNotInBestChain); + } + return Ok(PollResponse::Block(res)); + } + + for (&_, hash) in emitter.emitted_blocks.iter().rev() { + let res = client.get_block_info(hash)?; + if res.confirmations < 0 { + // block is not in best chain + continue; + } + + // agreement point found + return Ok(PollResponse::AgreementFound(res)); + } + + Ok(PollResponse::AgreementPointNotFound) +} + +fn poll( + emitter: &mut Emitter, + get_item: F, +) -> Result, bitcoincore_rpc::Error> +where + C: bitcoincore_rpc::RpcApi, + F: Fn(&BlockHash) -> Result, +{ + loop { + match poll_once(emitter)? { + PollResponse::Block(res) => { + let height = res.height as u32; + let item = get_item(&res.hash)?; + assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None); + emitter.last_block = Some(res); + return Ok(Some((height, item))); + } + PollResponse::NoMoreBlocks => { + emitter.last_block = None; + return Ok(None); + } + PollResponse::BlockNotInBestChain => { + emitter.last_block = None; + continue; + } + PollResponse::AgreementFound(res) => { + emitter.emitted_blocks.split_off(&(res.height as u32 + 1)); + emitter.last_block = Some(res); + continue; + } + PollResponse::AgreementPointNotFound => { + emitter.emitted_blocks.clear(); + emitter.last_block = None; + continue; + } + } + } +} + +/// Extends [`bitcoincore_rpc::Error`]. +pub trait BitcoindRpcErrorExt { + /// Returns whether the error is a "not found" error. + /// + /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as + /// [`Iterator::Item`]. + fn is_not_found_error(&self) -> bool; +} + +impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { + fn is_not_found_error(&self) -> bool { + if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self + { + rpc_err.code == -5 + } else { + false + } + } +} From a73dac2d91b29c4ba05f606f81e511fbf1f9ec7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 18:19:29 +0800 Subject: [PATCH 06/14] test(bitcoind_rpc): initial tests for `Emitter` --- crates/bitcoind_rpc/tests/test_emitter.rs | 759 ++++++++++++++++++++++ 1 file changed, 759 insertions(+) create mode 100644 crates/bitcoind_rpc/tests/test_emitter.rs diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs new file mode 100644 index 000000000..e6073eee7 --- /dev/null +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -0,0 +1,759 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use bdk_bitcoind_rpc::Emitter; +use bdk_chain::{ + bitcoin::{Address, Amount, BlockHash, Txid}, + indexed_tx_graph::InsertTxItem, + keychain::Balance, + local_chain::{self, CheckPoint, LocalChain}, + Append, BlockId, IndexedTxGraph, SpkTxOutIndex, +}; +use bitcoin::{ + address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash, + secp256k1::rand::random, Block, CompactTarget, OutPoint, ScriptBuf, ScriptHash, Transaction, + TxIn, TxOut, WScriptHash, +}; +use bitcoincore_rpc::{ + bitcoincore_rpc_json::{GetBlockTemplateModes, GetBlockTemplateRules}, + RpcApi, +}; + +struct TestEnv { + #[allow(dead_code)] + daemon: bitcoind::BitcoinD, + client: bitcoincore_rpc::Client, +} + +impl TestEnv { + fn new() -> anyhow::Result { + let daemon = match std::env::var_os("TEST_BITCOIND") { + Some(bitcoind_path) => bitcoind::BitcoinD::new(bitcoind_path), + None => bitcoind::BitcoinD::from_downloaded(), + }?; + let client = bitcoincore_rpc::Client::new( + &daemon.rpc_url(), + bitcoincore_rpc::Auth::CookieFile(daemon.params.cookie_file.clone()), + )?; + Ok(Self { daemon, client }) + } + + fn mine_blocks( + &self, + count: usize, + address: Option
, + ) -> anyhow::Result> { + let coinbase_address = match address { + Some(address) => address, + None => self.client.get_new_address(None, None)?.assume_checked(), + }; + let block_hashes = self + .client + .generate_to_address(count as _, &coinbase_address)?; + Ok(block_hashes) + } + + fn mine_empty_block(&self) -> anyhow::Result<(usize, BlockHash)> { + let bt = self.client.get_block_template( + GetBlockTemplateModes::Template, + &[GetBlockTemplateRules::SegWit], + &[], + )?; + + let txdata = vec![Transaction { + version: 1, + lock_time: bitcoin::absolute::LockTime::from_height(0)?, + input: vec![TxIn { + previous_output: bitcoin::OutPoint::default(), + script_sig: ScriptBuf::builder() + .push_int(bt.height as _) + // randomn number so that re-mining creates unique block + .push_int(random()) + .into_script(), + sequence: bitcoin::Sequence::default(), + witness: bitcoin::Witness::new(), + }], + output: vec![TxOut { + value: 0, + script_pubkey: ScriptBuf::new_p2sh(&ScriptHash::all_zeros()), + }], + }]; + + let bits: [u8; 4] = bt + .bits + .clone() + .try_into() + .expect("rpc provided us with invalid bits"); + + let mut block = Block { + header: Header { + version: bitcoin::block::Version::default(), + prev_blockhash: bt.previous_block_hash, + merkle_root: TxMerkleNode::all_zeros(), + time: Ord::max(bt.min_time, std::time::UNIX_EPOCH.elapsed()?.as_secs()) as u32, + bits: CompactTarget::from_consensus(u32::from_be_bytes(bits)), + nonce: 0, + }, + txdata, + }; + + block.header.merkle_root = block.compute_merkle_root().expect("must compute"); + + for nonce in 0..=u32::MAX { + block.header.nonce = nonce; + if block.header.target().is_met_by(block.block_hash()) { + break; + } + } + + self.client.submit_block(&block)?; + Ok((bt.height as usize, block.block_hash())) + } + + fn invalidate_blocks(&self, count: usize) -> anyhow::Result<()> { + let mut hash = self.client.get_best_block_hash()?; + for _ in 0..count { + let prev_hash = self.client.get_block_info(&hash)?.previousblockhash; + self.client.invalidate_block(&hash)?; + match prev_hash { + Some(prev_hash) => hash = prev_hash, + None => break, + } + } + Ok(()) + } + + fn reorg(&self, count: usize) -> anyhow::Result> { + let start_height = self.client.get_block_count()?; + self.invalidate_blocks(count)?; + + let res = self.mine_blocks(count, None); + assert_eq!( + self.client.get_block_count()?, + start_height, + "reorg should not result in height change" + ); + res + } + + fn reorg_empty_blocks(&self, count: usize) -> anyhow::Result> { + let start_height = self.client.get_block_count()?; + self.invalidate_blocks(count)?; + + let res = (0..count) + .map(|_| self.mine_empty_block()) + .collect::, _>>()?; + assert_eq!( + self.client.get_block_count()?, + start_height, + "reorg should not result in height change" + ); + Ok(res) + } + + fn send(&self, address: &Address, amount: Amount) -> anyhow::Result { + let txid = self + .client + .send_to_address(address, amount, None, None, None, None, None, None)?; + Ok(txid) + } +} + +fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Update { + let this_id = BlockId { + height, + hash: block.block_hash(), + }; + let tip = if block.header.prev_blockhash == BlockHash::all_zeros() { + CheckPoint::new(this_id) + } else { + CheckPoint::new(BlockId { + height: height - 1, + hash: block.header.prev_blockhash, + }) + .extend(core::iter::once(this_id)) + .expect("must construct checkpoint") + }; + + local_chain::Update { + tip, + introduce_older_blocks: false, + } +} + +fn block_to_tx_graph_update( + block: &bitcoin::Block, + height: u32, +) -> impl Iterator>> { + let anchor = BlockId { + hash: block.block_hash(), + height, + }; + block.txdata.iter().map(move |tx| (tx, Some(anchor), None)) +} + +/// Ensure that blocks are emitted in order even after reorg. +/// +/// 1. Mine 101 blocks. +/// 2. Emit blocks from [`Emitter`] and update the [`LocalChain`]. +/// 3. Reorg highest 6 blocks. +/// 4. Emit blocks from [`Emitter`] and re-update the [`LocalChain`]. +#[test] +pub fn test_sync_local_chain() -> anyhow::Result<()> { + let env = TestEnv::new()?; + let mut local_chain = LocalChain::default(); + let mut emitter = Emitter::new(&env.client, 0); + + // mine some blocks and returned the actual block hashes + let exp_hashes = { + let mut hashes = vec![env.client.get_block_hash(0)?]; // include genesis block + hashes.extend(env.mine_blocks(101, None)?); + hashes + }; + + // see if the emitter outputs the right blocks + println!("first sync:"); + while let Some((height, block)) = emitter.next_block()? { + assert_eq!( + block.block_hash(), + exp_hashes[height as usize], + "emitted block hash is unexpected" + ); + + let chain_update = block_to_chain_update(&block, height); + assert_eq!( + local_chain.apply_update(chain_update)?, + BTreeMap::from([(height, Some(block.block_hash()))]), + "chain update changeset is unexpected", + ); + } + + assert_eq!( + local_chain.blocks(), + &exp_hashes + .iter() + .enumerate() + .map(|(i, hash)| (i as u32, *hash)) + .collect(), + "final local_chain state is unexpected", + ); + + // perform reorg + let reorged_blocks = env.reorg(6)?; + let exp_hashes = exp_hashes + .iter() + .take(exp_hashes.len() - reorged_blocks.len()) + .chain(&reorged_blocks) + .cloned() + .collect::>(); + + // see if the emitter outputs the right blocks + println!("after reorg:"); + let mut exp_height = exp_hashes.len() - reorged_blocks.len(); + while let Some((height, block)) = emitter.next_block()? { + assert_eq!( + height, exp_height as u32, + "emitted block has unexpected height" + ); + + assert_eq!( + block.block_hash(), + exp_hashes[height as usize], + "emitted block is unexpected" + ); + + let chain_update = block_to_chain_update(&block, height); + assert_eq!( + local_chain.apply_update(chain_update)?, + if exp_height == exp_hashes.len() - reorged_blocks.len() { + core::iter::once((height, Some(block.block_hash()))) + .chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None))) + .collect::() + } else { + BTreeMap::from([(height, Some(block.block_hash()))]) + }, + "chain update changeset is unexpected", + ); + + exp_height += 1; + } + + assert_eq!( + local_chain.blocks(), + &exp_hashes + .iter() + .enumerate() + .map(|(i, hash)| (i as u32, *hash)) + .collect(), + "final local_chain state is unexpected after reorg", + ); + + Ok(()) +} + +/// Ensure that [`EmittedUpdate::into_tx_graph_update`] behaves appropriately for both mempool and +/// block updates. +/// +/// [`EmittedUpdate::into_tx_graph_update`]: bdk_bitcoind_rpc::EmittedUpdate::into_tx_graph_update +#[test] +fn test_into_tx_graph() -> anyhow::Result<()> { + let env = TestEnv::new()?; + + println!("getting new addresses!"); + let addr_0 = env.client.get_new_address(None, None)?.assume_checked(); + let addr_1 = env.client.get_new_address(None, None)?.assume_checked(); + let addr_2 = env.client.get_new_address(None, None)?.assume_checked(); + println!("got new addresses!"); + + println!("mining block!"); + env.mine_blocks(101, None)?; + println!("mined blocks!"); + + let mut chain = LocalChain::default(); + let mut indexed_tx_graph = IndexedTxGraph::::new({ + let mut index = SpkTxOutIndex::::default(); + index.insert_spk(0, addr_0.script_pubkey()); + index.insert_spk(1, addr_1.script_pubkey()); + index.insert_spk(2, addr_2.script_pubkey()); + index + }); + + let emitter = &mut Emitter::new(&env.client, 0); + + while let Some((height, block)) = emitter.next_block()? { + let _ = chain.apply_update(block_to_chain_update(&block, height))?; + let indexed_additions = + indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height)); + assert!(indexed_additions.is_empty()); + } + + // send 3 txs to a tracked address, these txs will be in the mempool + let exp_txids = { + let mut txids = BTreeSet::new(); + for _ in 0..3 { + txids.insert(env.client.send_to_address( + &addr_0, + Amount::from_sat(10_000), + None, + None, + None, + None, + None, + None, + )?); + } + txids + }; + + // expect that the next block should be none and we should get 3 txs from mempool + { + // next block should be `None` + assert!(emitter.next_block()?.is_none()); + + let mempool_txs = emitter.mempool()?; + let indexed_additions = indexed_tx_graph + .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time)))); + assert_eq!( + indexed_additions + .graph + .txs + .iter() + .map(|tx| tx.txid()) + .collect::>(), + exp_txids, + "changeset should have the 3 mempool transactions", + ); + assert!(indexed_additions.graph.anchors.is_empty()); + } + + // mine a block that confirms the 3 txs + let exp_block_hash = env.mine_blocks(1, None)?[0]; + let exp_block_height = env.client.get_block_info(&exp_block_hash)?.height as u32; + let exp_anchors = exp_txids + .iter() + .map({ + let anchor = BlockId { + height: exp_block_height, + hash: exp_block_hash, + }; + move |&txid| (anchor, txid) + }) + .collect::>(); + + // must receive mined block which will confirm the transactions. + { + let (height, block) = emitter.next_block()?.expect("must get mined block"); + let _ = chain.apply_update(block_to_chain_update(&block, height))?; + let indexed_additions = + indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height)); + assert!(indexed_additions.graph.txs.is_empty()); + assert!(indexed_additions.graph.txouts.is_empty()); + assert_eq!(indexed_additions.graph.anchors, exp_anchors); + } + + Ok(()) +} + +/// Ensure next block emitted after reorg is at reorg height. +/// +/// After a reorg, if the last-emitted block height is equal or greater than the reorg height, and +/// the fallback height is equal to or lower than the reorg height, the next block/header emission +/// should be at the reorg height. +/// +/// TODO: If the reorg height is lower than the fallback height, how do we find a block height to +/// emit that can connect with our receiver chain? +#[test] +fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { + const EMITTER_START_HEIGHT: usize = 100; + const CHAIN_TIP_HEIGHT: usize = 110; + + let env = TestEnv::new()?; + let mut emitter = Emitter::new(&env.client, EMITTER_START_HEIGHT as _); + + env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; + while emitter.next_header()?.is_some() {} + + for reorg_count in 1..=10 { + let replaced_blocks = env.reorg_empty_blocks(reorg_count)?; + let (height, next_header) = emitter.next_header()?.expect("must emit block after reorg"); + assert_eq!( + (height as usize, next_header.block_hash()), + replaced_blocks[0], + "block emitted after reorg should be at the reorg height" + ); + while emitter.next_header()?.is_some() {} + } + + Ok(()) +} + +fn process_block( + recv_chain: &mut LocalChain, + recv_graph: &mut IndexedTxGraph>, + block: Block, + block_height: u32, +) -> anyhow::Result<()> { + recv_chain + .apply_update(CheckPoint::from_header(&block.header, block_height).into_update(false))?; + let _ = recv_graph.apply_block(block, block_height); + Ok(()) +} + +fn sync_from_emitter( + recv_chain: &mut LocalChain, + recv_graph: &mut IndexedTxGraph>, + emitter: &mut Emitter, +) -> anyhow::Result<()> +where + C: bitcoincore_rpc::RpcApi, +{ + while let Some((height, block)) = emitter.next_block()? { + process_block(recv_chain, recv_graph, block, height)?; + } + Ok(()) +} + +fn get_balance( + recv_chain: &LocalChain, + recv_graph: &IndexedTxGraph>, +) -> anyhow::Result { + let chain_tip = recv_chain + .tip() + .map_or(BlockId::default(), |cp| cp.block_id()); + let outpoints = recv_graph.index.outpoints().clone(); + let balance = recv_graph + .graph() + .balance(recv_chain, chain_tip, outpoints, |_, _| true); + Ok(balance) +} + +/// If a block is reorged out, ensure that containing transactions that do not exist in the +/// replacement block(s) become unconfirmed. +#[test] +fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { + const PREMINE_COUNT: usize = 101; + const ADDITIONAL_COUNT: usize = 11; + const SEND_AMOUNT: Amount = Amount::from_sat(10_000); + + let env = TestEnv::new()?; + let mut emitter = Emitter::new(&env.client, 0); + + // setup addresses + let addr_to_mine = env.client.get_new_address(None, None)?.assume_checked(); + let spk_to_track = ScriptBuf::new_v0_p2wsh(&WScriptHash::all_zeros()); + let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?; + + // setup receiver + let mut recv_chain = LocalChain::default(); + let mut recv_graph = IndexedTxGraph::::new({ + let mut recv_index = SpkTxOutIndex::default(); + recv_index.insert_spk((), spk_to_track.clone()); + recv_index + }); + + // mine and sync receiver up to tip + env.mine_blocks(PREMINE_COUNT, Some(addr_to_mine))?; + + // create transactions that are tracked by our receiver + for _ in 0..ADDITIONAL_COUNT { + let txid = env.send(&addr_to_track, SEND_AMOUNT)?; + + // lock outputs that send to `addr_to_track` + let outpoints_to_lock = env + .client + .get_transaction(&txid, None)? + .transaction()? + .output + .into_iter() + .enumerate() + .filter(|(_, txo)| txo.script_pubkey == spk_to_track) + .map(|(vout, _)| OutPoint::new(txid, vout as _)) + .collect::>(); + env.client.lock_unspent(&outpoints_to_lock)?; + + let _ = env.mine_blocks(1, None)?; + } + + // get emitter up to tip + sync_from_emitter(&mut recv_chain, &mut recv_graph, &mut emitter)?; + + assert_eq!( + get_balance(&recv_chain, &recv_graph)?, + Balance { + confirmed: SEND_AMOUNT.to_sat() * ADDITIONAL_COUNT as u64, + ..Balance::default() + }, + "initial balance must be correct", + ); + + // perform reorgs with different depths + for reorg_count in 1..=ADDITIONAL_COUNT { + env.reorg_empty_blocks(reorg_count)?; + sync_from_emitter(&mut recv_chain, &mut recv_graph, &mut emitter)?; + + assert_eq!( + get_balance(&recv_chain, &recv_graph)?, + Balance { + confirmed: SEND_AMOUNT.to_sat() * (ADDITIONAL_COUNT - reorg_count) as u64, + trusted_pending: SEND_AMOUNT.to_sat() * reorg_count as u64, + ..Balance::default() + }, + "reorg_count: {}", + reorg_count, + ); + } + + Ok(()) +} + +/// Ensure avoid-re-emission-logic is sound when [`Emitter`] is synced to tip. +/// +/// The receiver (bdk_chain structures) is synced to the chain tip, and there is txs in the mempool. +/// When we call Emitter::mempool multiple times, mempool txs should not be re-emitted, even if the +/// chain tip is extended. +#[test] +fn mempool_avoids_re_emission() -> anyhow::Result<()> { + const BLOCKS_TO_MINE: usize = 101; + const MEMPOOL_TX_COUNT: usize = 2; + + let env = TestEnv::new()?; + let mut emitter = Emitter::new(&env.client, 0); + + // mine blocks and sync up emitter + let addr = env.client.get_new_address(None, None)?.assume_checked(); + env.mine_blocks(BLOCKS_TO_MINE, Some(addr.clone()))?; + while emitter.next_header()?.is_some() {} + + // have some random txs in mempool + let exp_txids = (0..MEMPOOL_TX_COUNT) + .map(|_| env.send(&addr, Amount::from_sat(2100))) + .collect::, _>>()?; + + // the first emission should include all transactions + let emitted_txids = emitter + .mempool()? + .into_iter() + .map(|(tx, _)| tx.txid()) + .collect::>(); + assert_eq!( + emitted_txids, exp_txids, + "all mempool txs should be emitted" + ); + + // second emission should be empty + assert!( + emitter.mempool()?.is_empty(), + "second emission should be empty" + ); + + // mine empty blocks + sync up our emitter -> we should still not re-emit + for _ in 0..BLOCKS_TO_MINE { + env.mine_empty_block()?; + } + while emitter.next_header()?.is_some() {} + assert!( + emitter.mempool()?.is_empty(), + "third emission, after chain tip is extended, should also be empty" + ); + + Ok(()) +} + +/// Ensure mempool tx is still re-emitted if [`Emitter`] has not reached the tx's introduction +/// height. +/// +/// We introduce a mempool tx after each block, where blocks are empty (does not confirm previous +/// mempool txs). Then we emit blocks from [`Emitter`] (intertwining `mempool` calls). We check +/// that `mempool` should always re-emit txs that have introduced at a height greater than the last +/// emitted block height. +#[test] +fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()> { + const PREMINE_COUNT: usize = 101; + const MEMPOOL_TX_COUNT: usize = 21; + + let env = TestEnv::new()?; + let mut emitter = Emitter::new(&env.client, 0); + + // mine blocks to get initial balance, sync emitter up to tip + let addr = env.client.get_new_address(None, None)?.assume_checked(); + env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; + while emitter.next_header()?.is_some() {} + + // mine blocks to introduce txs to mempool at different heights + let tx_introductions = (0..MEMPOOL_TX_COUNT) + .map(|_| -> anyhow::Result<_> { + let (height, _) = env.mine_empty_block()?; + let txid = env.send(&addr, Amount::from_sat(2100))?; + Ok((height, txid)) + }) + .collect::>>()?; + + assert_eq!( + emitter + .mempool()? + .into_iter() + .map(|(tx, _)| tx.txid()) + .collect::>(), + tx_introductions.iter().map(|&(_, txid)| txid).collect(), + "first mempool emission should include all txs", + ); + assert_eq!( + emitter + .mempool()? + .into_iter() + .map(|(tx, _)| tx.txid()) + .collect::>(), + tx_introductions.iter().map(|&(_, txid)| txid).collect(), + "second mempool emission should still include all txs", + ); + + // At this point, the emitter has seen all mempool transactions. It should only re-emit those + // that have introduction heights less than the emitter's last-emitted block tip. + while let Some((height, _)) = emitter.next_header()? { + // We call `mempool()` twice. + // The second call (at height `h`) should skip the tx introduced at height `h`. + for try_index in 0..2 { + let exp_txids = tx_introductions + .range((height as usize + try_index, Txid::all_zeros())..) + .map(|&(_, txid)| txid) + .collect::>(); + let emitted_txids = emitter + .mempool()? + .into_iter() + .map(|(tx, _)| tx.txid()) + .collect::>(); + assert_eq!( + emitted_txids, exp_txids, + "\n emission {} (try {}) must only contain txs introduced at that height or lower: \n\t missing: {:?} \n\t extra: {:?}", + height, + try_index, + exp_txids + .difference(&emitted_txids) + .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap())) + .collect::>(), + emitted_txids + .difference(&exp_txids) + .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap())) + .collect::>(), + ); + } + } + + Ok(()) +} + +/// Ensure we force re-emit all mempool txs after reorg. +#[test] +fn mempool_during_reorg() -> anyhow::Result<()> { + const TIP_DIFF: usize = 10; + const PREMINE_COUNT: usize = 101; + + let env = TestEnv::new()?; + let mut emitter = Emitter::new(&env.client, 0); + + // mine blocks to get initial balance + let addr = env.client.get_new_address(None, None)?.assume_checked(); + env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; + + // introduce mempool tx at each block extension + for _ in 0..TIP_DIFF { + env.mine_empty_block()?; + env.send(&addr, Amount::from_sat(2100))?; + } + + // perform reorgs at different heights + for reorg_count in 1..TIP_DIFF { + // sync emitter to tip + while emitter.next_header()?.is_some() {} + + println!("REORG COUNT: {}", reorg_count); + env.reorg_empty_blocks(reorg_count)?; + + // we recalculate this at every loop as reorgs may evict transactions from mempool + let tx_introductions = env + .client + .get_raw_mempool_verbose()? + .into_iter() + .map(|(txid, entry)| (txid, entry.height as usize)) + .collect::>(); + + if let Some((height, _)) = emitter.next_header()? { + // the mempool emission (that follows the first block emission after reorg) should return + // the entire mempool contents + let mempool = emitter + .mempool()? + .into_iter() + .map(|(tx, _)| tx.txid()) + .collect::>(); + let exp_mempool = tx_introductions.keys().copied().collect::>(); + assert_eq!( + mempool, exp_mempool, + "the first mempool emission after reorg should include all mempool txs" + ); + + let mempool = emitter + .mempool()? + .into_iter() + .map(|(tx, _)| tx.txid()) + .collect::>(); + let exp_mempool = tx_introductions + .iter() + .filter(|&(_, &intro_height)| intro_height > (height as usize)) + .map(|(&txid, _)| txid) + .collect::>(); + assert_eq!( + mempool, exp_mempool, + "following mempool emissions after reorg should exclude mempool introduction heights <= last emitted block height: \n\t missing: {:?} \n\t extra: {:?}", + exp_mempool + .difference(&mempool) + .map(|txid| (txid, tx_introductions.get(txid).unwrap())) + .collect::>(), + mempool + .difference(&exp_mempool) + .map(|txid| (txid, tx_introductions.get(txid).unwrap())) + .collect::>(), + ); + } + } + + Ok(()) +} From 4f10463d9eaad9365b87dd99d49f0ddb8be673ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 18:20:10 +0800 Subject: [PATCH 07/14] test(bitcoind_rpc): add no_agreement_point test Co-authored-by: Steve Myers --- crates/bitcoind_rpc/tests/test_emitter.rs | 53 +++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index e6073eee7..7a4b7e4d1 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -757,3 +757,56 @@ fn mempool_during_reorg() -> anyhow::Result<()> { Ok(()) } + +/// If blockchain re-org includes the start height, emit new start height block +/// +/// 1. mine 101 blocks +/// 2. emmit blocks 99a, 100a +/// 3. invalidate blocks 99a, 100a, 101a +/// 4. mine new blocks 99b, 100b, 101b +/// 5. emmit block 99b +/// +/// The block hash of 99b should be different than 99a, but their previous block hashes should +/// be the same. +#[test] +fn no_agreement_point() -> anyhow::Result<()> { + const PREMINE_COUNT: usize = 101; + + let env = TestEnv::new()?; + + // start height is 99 + let mut emitter = Emitter::new(&env.client, (PREMINE_COUNT - 2) as u32); + + // mine 101 blocks + env.mine_blocks(PREMINE_COUNT, None)?; + + // emit block 99a + let (_, block_header_99a) = emitter.next_header()?.expect("block 99a header"); + let block_hash_99a = block_header_99a.block_hash(); + let block_hash_98a = block_header_99a.prev_blockhash; + + // emit block 100a + let (_, block_header_100a) = emitter.next_header()?.expect("block 100a header"); + let block_hash_100a = block_header_100a.block_hash(); + + // get hash for block 101a + let block_hash_101a = env.client.get_block_hash(101)?; + + // invalidate blocks 99a, 100a, 101a + env.client.invalidate_block(&block_hash_99a)?; + env.client.invalidate_block(&block_hash_100a)?; + env.client.invalidate_block(&block_hash_101a)?; + + // mine new blocks 99b, 100b, 101b + env.mine_blocks(3, None)?; + + // emit block header 99b + let (_, block_header_99b) = emitter.next_header()?.expect("block 99b header"); + let block_hash_99b = block_header_99b.block_hash(); + let block_hash_98b = block_header_99b.prev_blockhash; + + assert_ne!(block_hash_99a, block_hash_99b); + assert_eq!(block_hash_98a, block_hash_98b); + + Ok(()) +} From 150d6f8ab6cd1eb1c9448d61e7bd71db0dd32a01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 18:22:03 +0800 Subject: [PATCH 08/14] feat(example_bitcoind_rpc_polling): add example for RPC polling --- Cargo.toml | 1 + .../example_bitcoind_rpc_polling/Cargo.toml | 12 + .../example_bitcoind_rpc_polling/src/main.rs | 366 ++++++++++++++++++ 3 files changed, 379 insertions(+) create mode 100644 example-crates/example_bitcoind_rpc_polling/Cargo.toml create mode 100644 example-crates/example_bitcoind_rpc_polling/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index a5058ebc4..0e1efc902 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "example-crates/example_cli", "example-crates/example_electrum", "example-crates/example_esplora", + "example-crates/example_bitcoind_rpc_polling", "example-crates/wallet_electrum", "example-crates/wallet_esplora_blocking", "example-crates/wallet_esplora_async", diff --git a/example-crates/example_bitcoind_rpc_polling/Cargo.toml b/example-crates/example_bitcoind_rpc_polling/Cargo.toml new file mode 100644 index 000000000..6728bb13a --- /dev/null +++ b/example-crates/example_bitcoind_rpc_polling/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "example_bitcoind_rpc_polling" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../../crates/chain", features = ["serde"] } +bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" } +example_cli = { path = "../example_cli" } +ctrlc = { version = "^2" } diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs new file mode 100644 index 000000000..6fb557f7a --- /dev/null +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -0,0 +1,366 @@ +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + time::{Duration, Instant}, +}; + +use bdk_bitcoind_rpc::{ + bitcoincore_rpc::{Auth, Client, RpcApi}, + Emitter, +}; +use bdk_chain::{ + bitcoin::{Block, Transaction}, + indexed_tx_graph, keychain, + local_chain::{self, CheckPoint, LocalChain}, + ConfirmationTimeAnchor, IndexedTxGraph, +}; +use example_cli::{ + anyhow, + clap::{self, Args, Subcommand}, + Keychain, +}; + +const DB_MAGIC: &[u8] = b"bdk_example_rpc"; +const DB_PATH: &str = ".bdk_example_rpc.db"; + +const CHANNEL_BOUND: usize = 10; +/// The block depth which we assume no reorgs can happen at. +const ASSUME_FINAL_DEPTH: u32 = 6; +/// Delay for printing status to stdout. +const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6); +/// Delay between mempool emissions. +const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30); +/// Delay for commiting to persistance. +const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); + +type ChangeSet = ( + local_chain::ChangeSet, + indexed_tx_graph::ChangeSet>, +); + +#[derive(Debug)] +enum Emission { + Block { height: u32, block: Block }, + Mempool(Vec<(Transaction, u64)>), + Tip(u32), +} + +#[derive(Args, Debug, Clone)] +struct RpcArgs { + /// RPC URL + #[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")] + url: String, + /// RPC auth cookie file + #[clap(env = "RPC_COOKIE", long)] + rpc_cookie: Option, + /// RPC auth username + #[clap(env = "RPC_USER", long)] + rpc_user: Option, + /// RPC auth password + #[clap(env = "RPC_PASS", long)] + rpc_password: Option, + /// Starting block height to fallback to if no point of agreement if found + #[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")] + fallback_height: u32, + /// The unused-scripts lookahead will be kept at this size + #[clap(long, default_value = "10")] + lookahead: u32, +} + +impl From for Auth { + fn from(args: RpcArgs) -> Self { + match (args.rpc_cookie, args.rpc_user, args.rpc_password) { + (None, None, None) => Self::None, + (Some(path), _, _) => Self::CookieFile(path), + (_, Some(user), Some(pass)) => Self::UserPass(user, pass), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + } + } +} + +impl RpcArgs { + fn new_client(&self) -> anyhow::Result { + Ok(Client::new( + &self.url, + match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) { + (None, None, None) => Auth::None, + (Some(path), _, _) => Auth::CookieFile(path.clone()), + (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + }, + )?) + } +} + +#[derive(Subcommand, Debug, Clone)] +enum RpcCommands { + /// Syncs local state with remote state via RPC (starting from last point of agreement) and + /// stores/indexes relevant transactions + Sync { + #[clap(flatten)] + rpc_args: RpcArgs, + }, + /// Sync by having the emitter logic in a separate thread + Live { + #[clap(flatten)] + rpc_args: RpcArgs, + }, +} + +fn main() -> anyhow::Result<()> { + let (args, keymap, index, db, init_changeset) = + example_cli::init::(DB_MAGIC, DB_PATH)?; + + let graph = Mutex::new({ + let mut graph = IndexedTxGraph::new(index); + graph.apply_changeset(init_changeset.1); + graph + }); + println!("loaded indexed tx graph from db"); + + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); + println!("loaded local chain from db"); + + let rpc_cmd = match args.command { + example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, + general_cmd => { + let res = example_cli::handle_commands( + &graph, + &db, + &chain, + &keymap, + args.network, + |rpc_args, tx| { + let client = rpc_args.new_client()?; + client.send_raw_transaction(tx)?; + Ok(()) + }, + general_cmd, + ); + db.lock().unwrap().commit()?; + return res; + } + }; + + match rpc_cmd { + RpcCommands::Sync { rpc_args } => { + let RpcArgs { + fallback_height, + lookahead, + .. + } = rpc_args; + + let mut chain = chain.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + let mut db = db.lock().unwrap(); + + graph.index.set_lookahead_for_all(lookahead); + // we start at a height lower than last-seen tip in case of reorgs + let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| { + cp.height().saturating_sub(ASSUME_FINAL_DEPTH) + }); + + let rpc_client = rpc_args.new_client()?; + let mut emitter = Emitter::new(&rpc_client, start_height); + + let mut last_db_commit = Instant::now(); + let mut last_print = Instant::now(); + + while let Some((height, block)) = emitter.next_block()? { + let chain_update = + CheckPoint::from_header(&block.header, height).into_update(false); + let chain_changeset = chain.apply_update(chain_update)?; + let graph_changeset = graph.apply_block_relevant(block, height); + db.stage((chain_changeset, graph_changeset)); + + // commit staged db changes in intervals + if last_db_commit.elapsed() >= DB_COMMIT_DELAY { + last_db_commit = Instant::now(); + db.commit()?; + println!( + "commited to db (took {}s)", + last_db_commit.elapsed().as_secs_f32() + ); + } + + // print synced-to height and current balance in intervals + if last_print.elapsed() >= STDOUT_PRINT_DELAY { + last_print = Instant::now(); + if let Some(synced_to) = chain.tip() { + let balance = { + graph.graph().balance( + &*chain, + synced_to.block_id(), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + ) + }; + println!( + "synced to {} @ {} | total: {} sats", + synced_to.hash(), + synced_to.height(), + balance.total() + ); + } + } + } + + // mempool + let mempool_txs = emitter.mempool()?; + let graph_changeset = graph + .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time)))); + db.stage((local_chain::ChangeSet::default(), graph_changeset)); + + // commit one last time! + db.commit()?; + } + RpcCommands::Live { rpc_args } => { + let RpcArgs { + fallback_height, + lookahead, + .. + } = rpc_args; + let sigterm_flag = start_ctrlc_handler(); + + graph.lock().unwrap().index.set_lookahead_for_all(lookahead); + // we start at a height lower than last-seen tip in case of reorgs + let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| { + cp.height().saturating_sub(ASSUME_FINAL_DEPTH) + }); + + let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); + let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { + println!("emitter thread started..."); + + let rpc_client = rpc_args.new_client()?; + let mut emitter = Emitter::new(&rpc_client, start_height); + + let mut block_count = rpc_client.get_block_count()? as u32; + tx.send(Emission::Tip(block_count))?; + + loop { + match emitter.next_block()? { + Some((height, block)) => { + if sigterm_flag.load(Ordering::Acquire) { + break; + } + if height > block_count { + block_count = rpc_client.get_block_count()? as u32; + tx.send(Emission::Tip(block_count))?; + } + tx.send(Emission::Block { height, block })?; + } + None => { + if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) { + break; + } + println!("preparing mempool emission..."); + let now = Instant::now(); + tx.send(Emission::Mempool(emitter.mempool()?))?; + println!("mempool emission prepared in {}s", now.elapsed().as_secs()); + continue; + } + }; + } + + println!("emitter thread shutting down..."); + Ok(()) + }); + + let mut db = db.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + let mut chain = chain.lock().unwrap(); + let mut tip_height = 0_u32; + + let mut last_db_commit = Instant::now(); + let mut last_print = Option::::None; + + for emission in rx { + let changeset = match emission { + Emission::Block { height, block } => { + let chain_update = + CheckPoint::from_header(&block.header, height).into_update(false); + let chain_changeset = chain.apply_update(chain_update)?; + let graph_changeset = graph.apply_block_relevant(block, height); + (chain_changeset, graph_changeset) + } + Emission::Mempool(mempool_txs) => { + let graph_changeset = graph.batch_insert_relevant_unconfirmed( + mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))), + ); + (local_chain::ChangeSet::default(), graph_changeset) + } + Emission::Tip(h) => { + tip_height = h; + continue; + } + }; + + db.stage(changeset); + + if last_db_commit.elapsed() >= DB_COMMIT_DELAY { + last_db_commit = Instant::now(); + db.commit()?; + println!( + "commited to db (took {}s)", + last_db_commit.elapsed().as_secs_f32() + ); + } + + if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY { + last_print = Some(Instant::now()); + if let Some(synced_to) = chain.tip() { + let balance = { + graph.graph().balance( + &*chain, + synced_to.block_id(), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + ) + }; + println!( + "synced to {} @ {} / {} | total: {} sats", + synced_to.hash(), + synced_to.height(), + tip_height, + balance.total() + ); + } + } + } + + emission_jh.join().expect("must join emitter thread")?; + } + } + + Ok(()) +} + +#[allow(dead_code)] +fn start_ctrlc_handler() -> Arc { + let flag = Arc::new(AtomicBool::new(false)); + let cloned_flag = flag.clone(); + + ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release)); + + flag +} + +#[allow(dead_code)] +fn await_flag(flag: &AtomicBool, duration: Duration) -> bool { + let start = Instant::now(); + loop { + if flag.load(Ordering::Acquire) { + return true; + } + if start.elapsed() >= duration { + return false; + } + std::thread::sleep(Duration::from_secs(1)); + } +} From 4f5695d43add3eab37ab12e897ac7c49f0d0787e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 6 Oct 2023 02:05:31 +0800 Subject: [PATCH 09/14] chain: improvements to `IndexedTxGraph` and `TxGraph` APIs For `IndexedTxGraph`: - Remove `InsertTxItem` type (this is too complex). - `batch_insert_relevant` now uses a simple tuple `(&tx, anchors)`. - `batch_insert` is now also removed, as the same functionality can be done elsewhere. - Add internal helper method `index_tx_graph_changeset` so we don't need to create a seprate `TxGraph` update in each method. - `batch_insert__unconfirmed` no longer takes in an option of last_seen. - `batch_insert_unconfirmed` no longer takes a reference of a transaction (since we apply all transactions anyway, so there is no need to clone). For `TxGraph`: - Add `batch_insert_unconfirmed` method. --- crates/bdk/src/wallet/mod.rs | 2 +- crates/bitcoind_rpc/tests/test_emitter.rs | 21 +-- crates/chain/src/indexed_tx_graph.rs | 169 ++++++++---------- crates/chain/src/tx_graph.rs | 17 ++ crates/chain/tests/test_indexed_tx_graph.rs | 5 +- .../example_bitcoind_rpc_polling/src/main.rs | 5 +- 6 files changed, 104 insertions(+), 115 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 9ee72b4b6..090a9ca60 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -509,7 +509,7 @@ impl Wallet { where D: PersistBackend, { - let additions = self.indexed_graph.insert_txout(outpoint, &txout); + let additions = self.indexed_graph.insert_txout(outpoint, txout); self.persist.stage(ChangeSet::from(additions)); } diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 7a4b7e4d1..601fb5616 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -3,7 +3,6 @@ use std::collections::{BTreeMap, BTreeSet}; use bdk_bitcoind_rpc::Emitter; use bdk_chain::{ bitcoin::{Address, Amount, BlockHash, Txid}, - indexed_tx_graph::InsertTxItem, keychain::Balance, local_chain::{self, CheckPoint, LocalChain}, Append, BlockId, IndexedTxGraph, SpkTxOutIndex, @@ -180,17 +179,6 @@ fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Up } } -fn block_to_tx_graph_update( - block: &bitcoin::Block, - height: u32, -) -> impl Iterator>> { - let anchor = BlockId { - hash: block.block_hash(), - height, - }; - block.txdata.iter().map(move |tx| (tx, Some(anchor), None)) -} - /// Ensure that blocks are emitted in order even after reorg. /// /// 1. Mine 101 blocks. @@ -321,8 +309,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { while let Some((height, block)) = emitter.next_block()? { let _ = chain.apply_update(block_to_chain_update(&block, height))?; - let indexed_additions = - indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height)); + let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); assert!(indexed_additions.is_empty()); } @@ -350,8 +337,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; - let indexed_additions = indexed_tx_graph - .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time)))); + let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs); assert_eq!( indexed_additions .graph @@ -383,8 +369,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { { let (height, block) = emitter.next_block()?.expect("must get mined block"); let _ = chain.apply_update(block_to_chain_update(&block, height))?; - let indexed_additions = - indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height)); + let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txouts.is_empty()); assert_eq!(indexed_additions.graph.anchors, exp_anchors); diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 065e0892a..e65b6868a 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -72,32 +72,34 @@ impl IndexedTxGraph where I::ChangeSet: Default + Append, { + fn index_tx_graph_changeset( + &mut self, + tx_graph_changeset: &tx_graph::ChangeSet, + ) -> I::ChangeSet { + let mut changeset = I::ChangeSet::default(); + for added_tx in &tx_graph_changeset.txs { + changeset.append(self.index.index_tx(added_tx)); + } + for (&added_outpoint, added_txout) in &tx_graph_changeset.txouts { + changeset.append(self.index.index_txout(added_outpoint, added_txout)); + } + changeset + } + /// Apply an `update` directly. /// /// `update` is a [`TxGraph`] and the resultant changes is returned as [`ChangeSet`]. pub fn apply_update(&mut self, update: TxGraph) -> ChangeSet { let graph = self.graph.apply_update(update); - - let mut indexer = I::ChangeSet::default(); - for added_tx in &graph.txs { - indexer.append(self.index.index_tx(added_tx)); - } - for (&added_outpoint, added_txout) in &graph.txouts { - indexer.append(self.index.index_txout(added_outpoint, added_txout)); - } - + let indexer = self.index_tx_graph_changeset(&graph); ChangeSet { graph, indexer } } /// Insert a floating `txout` of given `outpoint`. - pub fn insert_txout( - &mut self, - outpoint: OutPoint, - txout: &TxOut, - ) -> ChangeSet { - let mut update = TxGraph::::default(); - let _ = update.insert_txout(outpoint, txout.clone()); - self.apply_update(update) + pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet { + let graph = self.graph.insert_txout(outpoint, txout); + let indexer = self.index_tx_graph_changeset(&graph); + ChangeSet { graph, indexer } } /// Insert and index a transaction into the graph. @@ -112,18 +114,19 @@ where ) -> ChangeSet { let txid = tx.txid(); - let mut update = TxGraph::::default(); + let mut graph = tx_graph::ChangeSet::default(); if self.graph.get_tx(txid).is_none() { - let _ = update.insert_tx(tx.clone()); + graph.append(self.graph.insert_tx(tx.clone())); } for anchor in anchors.into_iter() { - let _ = update.insert_anchor(txid, anchor); + graph.append(self.graph.insert_anchor(txid, anchor)); } if let Some(seen_at) = seen_at { - let _ = update.insert_seen_at(txid, seen_at); + graph.append(self.graph.insert_seen_at(txid, seen_at)); } - self.apply_update(update) + let indexer = self.index_tx_graph_changeset(&graph); + ChangeSet { graph, indexer } } /// Batch insert transactions, filtering out those that are irrelevant. @@ -132,7 +135,7 @@ where /// transactions in `txs` will be ignored. `txs` do not need to be in topological order. pub fn batch_insert_relevant<'t>( &mut self, - txs: impl IntoIterator>>, + txs: impl IntoIterator)>, ) -> ChangeSet { // The algorithm below allows for non-topologically ordered transactions by using two loops. // This is achieved by: @@ -140,38 +143,25 @@ where // not store anything about them. // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant` // returns true or not. (in a second loop). - let mut changeset = ChangeSet::::default(); + let txs = txs.into_iter().collect::>(); - let txs = txs - .into_iter() - .inspect(|(tx, _, _)| changeset.indexer.append(self.index.index_tx(tx))) - .collect::>(); + let mut indexer = I::ChangeSet::default(); + for (tx, _) in &txs { + indexer.append(self.index.index_tx(tx)); + } - for (tx, anchors, seen_at) in txs { + let mut graph = tx_graph::ChangeSet::default(); + for (tx, anchors) in txs { if self.index.is_tx_relevant(tx) { - changeset.append(self.insert_tx(tx, anchors, seen_at)); + let txid = tx.txid(); + graph.append(self.graph.insert_tx(tx.clone())); + for anchor in anchors { + graph.append(self.graph.insert_anchor(txid, anchor)); + } } } - changeset - } - - /// Batch insert transactions. - /// - /// All transactions in `txs` will be inserted. To filter out irrelevant transactions, use - /// [`batch_insert_relevant`] instead. - /// - /// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant - pub fn batch_insert<'t>( - &mut self, - txs: impl IntoIterator>>, - ) -> ChangeSet { - let mut changeset = ChangeSet::::default(); - for (tx, anchors, seen_at) in txs { - changeset.indexer.append(self.index.index_tx(tx)); - changeset.append(self.insert_tx(tx, anchors, seen_at)); - } - changeset + ChangeSet { graph, indexer } } /// Batch insert unconfirmed transactions, filtering out those that are irrelevant. @@ -179,38 +169,51 @@ where /// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`. /// Irrelevant tansactions in `txs` will be ignored. /// - /// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp. - /// The *last seen* communicates when the transaction is last seen in the mempool which is used - /// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). + /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The + /// *last seen* communicates when the transaction is last seen in the mempool which is used for + /// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). pub fn batch_insert_relevant_unconfirmed<'t>( &mut self, - unconfirmed_txs: impl IntoIterator)>, + unconfirmed_txs: impl IntoIterator, ) -> ChangeSet { - self.batch_insert_relevant( - unconfirmed_txs - .into_iter() - .map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)), - ) + // The algorithm below allows for non-topologically ordered transactions by using two loops. + // This is achieved by: + // 1. insert all txs into the index. If they are irrelevant then that's fine it will just + // not store anything about them. + // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant` + // returns true or not. (in a second loop). + let txs = unconfirmed_txs.into_iter().collect::>(); + + let mut indexer = I::ChangeSet::default(); + for (tx, _) in &txs { + indexer.append(self.index.index_tx(tx)); + } + + let graph = self.graph.batch_insert_unconfirmed( + txs.into_iter() + .filter(|(tx, _)| self.index.is_tx_relevant(tx)) + .map(|(tx, seen_at)| (tx.clone(), seen_at)), + ); + + ChangeSet { graph, indexer } } /// Batch insert unconfirmed transactions. /// - /// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp. - /// The *last seen* communicates when the transaction is last seen in the mempool which is used - /// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). + /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The + /// *last seen* communicates when the transaction is last seen in the mempool which is used for + /// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). /// /// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead. /// /// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed - pub fn batch_insert_unconfirmed<'t>( + pub fn batch_insert_unconfirmed( &mut self, - unconfirmed_txs: impl IntoIterator)>, + txs: impl IntoIterator, ) -> ChangeSet { - self.batch_insert( - unconfirmed_txs - .into_iter() - .map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)), - ) + let graph = self.graph.batch_insert_unconfirmed(txs); + let indexer = self.index_tx_graph_changeset(&graph); + ChangeSet { graph, indexer } } } @@ -241,7 +244,6 @@ where ( tx, core::iter::once(A::from_block_position(&block, block_id, tx_pos)), - None, ) }); self.batch_insert_relevant(txs) @@ -260,30 +262,17 @@ where hash: block.block_hash(), height, }; - let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| { - ( - tx, - core::iter::once(A::from_block_position(&block, block_id, tx_pos)), - None, - ) - }); - self.batch_insert(txs) + let mut graph = tx_graph::ChangeSet::default(); + for (tx_pos, tx) in block.txdata.iter().enumerate() { + let anchor = A::from_block_position(&block, block_id, tx_pos); + graph.append(self.graph.insert_anchor(tx.txid(), anchor)); + graph.append(self.graph.insert_tx(tx.clone())); + } + let indexer = self.index_tx_graph_changeset(&graph); + ChangeSet { graph, indexer } } } -/// A tuple of a transaction, and associated metadata, that are to be inserted into [`IndexedTxGraph`]. -/// -/// This tuple contains fields in the following order: -/// * A reference to the transaction. -/// * A collection of [`Anchor`]s. -/// * An optional last-seen timestamp. -/// -/// This is used as a input item of [`batch_insert_relevant`] and [`batch_insert`]. -/// -/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant -/// [`batch_insert`]: IndexedTxGraph::batch_insert -pub type InsertTxItem<'t, A> = (&'t Transaction, A, Option); - /// A structure that represents changes to an [`IndexedTxGraph`]. #[derive(Clone, Debug, PartialEq)] #[cfg_attr( diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index edc1e4966..698af76c4 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -451,6 +451,23 @@ impl TxGraph { self.apply_update(update) } + /// Batch insert unconfirmed transactions. + /// + /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The + /// *last seen* communicates when the transaction is last seen in the mempool which is used for + /// conflict-resolution (refer to [`TxGraph::insert_seen_at`] for details). + pub fn batch_insert_unconfirmed( + &mut self, + txs: impl IntoIterator, + ) -> ChangeSet { + let mut changeset = ChangeSet::::default(); + for (tx, seen_at) in txs { + changeset.append(self.insert_seen_at(tx.txid(), seen_at)); + changeset.append(self.insert_tx(tx)); + } + changeset + } + /// Inserts the given `anchor` into [`TxGraph`]. /// /// The [`ChangeSet`] returned will be empty if graph already knows that `txid` exists in diff --git a/crates/chain/tests/test_indexed_tx_graph.rs b/crates/chain/tests/test_indexed_tx_graph.rs index 5f95e111d..3dc22ef5b 100644 --- a/crates/chain/tests/test_indexed_tx_graph.rs +++ b/crates/chain/tests/test_indexed_tx_graph.rs @@ -74,7 +74,7 @@ fn insert_relevant_txs() { }; assert_eq!( - graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None, None))), + graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None))), changeset, ); @@ -225,11 +225,10 @@ fn test_list_owned_txouts() { anchor_block, confirmation_height: anchor_block.height, }), - None, ) })); - let _ = graph.batch_insert_relevant([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100)))); + let _ = graph.batch_insert_relevant_unconfirmed([&tx4, &tx5].iter().map(|tx| (*tx, 100))); // A helper lambda to extract and filter data from the graph. let fetch = diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 6fb557f7a..c9bcc9728 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -212,8 +212,7 @@ fn main() -> anyhow::Result<()> { // mempool let mempool_txs = emitter.mempool()?; - let graph_changeset = graph - .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time)))); + let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs); db.stage((local_chain::ChangeSet::default(), graph_changeset)); // commit one last time! @@ -291,7 +290,7 @@ fn main() -> anyhow::Result<()> { } Emission::Mempool(mempool_txs) => { let graph_changeset = graph.batch_insert_relevant_unconfirmed( - mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))), + mempool_txs.iter().map(|(tx, time)| (tx, *time)), ); (local_chain::ChangeSet::default(), graph_changeset) } From 6d4b33ef91a6c3e3443f6321cf3e3d186f77c595 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 6 Oct 2023 11:07:00 +0800 Subject: [PATCH 10/14] chain: split `IndexedTxGraph::insert_tx` into 3 methods Instead of inserting anchors and seen_at timestamp in the same method, we have three separate methods. This makes the API easier to understand and makes `IndexedTxGraph` more consistent with the `TxGraph` API. --- crates/bdk/src/wallet/mod.rs | 11 +++++++- crates/chain/src/indexed_tx_graph.rs | 39 +++++++++++---------------- example-crates/example_cli/src/lib.rs | 3 +-- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 090a9ca60..659da90a2 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -738,7 +738,16 @@ impl Wallet { ConfirmationTime::Unconfirmed { last_seen } => (None, Some(last_seen)), }; - let changeset: ChangeSet = self.indexed_graph.insert_tx(&tx, anchor, last_seen).into(); + let mut changeset = ChangeSet::default(); + let txid = tx.txid(); + changeset.append(self.indexed_graph.insert_tx(tx).into()); + if let Some(anchor) = anchor { + changeset.append(self.indexed_graph.insert_anchor(txid, anchor).into()); + } + if let Some(last_seen) = last_seen { + changeset.append(self.indexed_graph.insert_seen_at(txid, last_seen).into()); + } + let changed = !changeset.is_empty(); self.persist.stage(changeset); Ok(changed) diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index e65b6868a..0e2620e0d 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -3,7 +3,7 @@ //! This is essentially a [`TxGraph`] combined with an indexer. use alloc::vec::Vec; -use bitcoin::{Block, OutPoint, Transaction, TxOut}; +use bitcoin::{Block, OutPoint, Transaction, TxOut, Txid}; use crate::{ keychain, @@ -103,32 +103,25 @@ where } /// Insert and index a transaction into the graph. - /// - /// `anchors` can be provided to anchor the transaction to various blocks. `seen_at` is a - /// unix timestamp of when the transaction is last seen. - pub fn insert_tx( - &mut self, - tx: &Transaction, - anchors: impl IntoIterator, - seen_at: Option, - ) -> ChangeSet { - let txid = tx.txid(); - - let mut graph = tx_graph::ChangeSet::default(); - if self.graph.get_tx(txid).is_none() { - graph.append(self.graph.insert_tx(tx.clone())); - } - for anchor in anchors.into_iter() { - graph.append(self.graph.insert_anchor(txid, anchor)); - } - if let Some(seen_at) = seen_at { - graph.append(self.graph.insert_seen_at(txid, seen_at)); - } - + pub fn insert_tx(&mut self, tx: Transaction) -> ChangeSet { + let graph = self.graph.insert_tx(tx); let indexer = self.index_tx_graph_changeset(&graph); ChangeSet { graph, indexer } } + /// Insert an `anchor` for a given transaction. + pub fn insert_anchor(&mut self, txid: Txid, anchor: A) -> ChangeSet { + self.graph.insert_anchor(txid, anchor).into() + } + + /// Insert a unix timestamp of when a transaction is seen in the mempool. + /// + /// This is used for transaction conflict resolution in [`TxGraph`] where the transaction with + /// the later last-seen is prioritized. + pub fn insert_seen_at(&mut self, txid: Txid, seen_at: u64) -> ChangeSet { + self.graph.insert_seen_at(txid, seen_at).into() + } + /// Batch insert transactions, filtering out those that are irrelevant. /// /// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. Irrelevant diff --git a/example-crates/example_cli/src/lib.rs b/example-crates/example_cli/src/lib.rs index 1982c30c6..9e572a892 100644 --- a/example-crates/example_cli/src/lib.rs +++ b/example-crates/example_cli/src/lib.rs @@ -624,8 +624,7 @@ where Ok(_) => { println!("Broadcasted Tx : {}", transaction.txid()); - let keychain_changeset = - graph.lock().unwrap().insert_tx(&transaction, None, None); + let keychain_changeset = graph.lock().unwrap().insert_tx(transaction); // We know the tx is at least unconfirmed now. Note if persisting here fails, // it's not a big deal since we can always find it again form From 57590e0a1f2dad09a63fadb11f01e9f704cdcffb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 6 Oct 2023 17:39:22 +0800 Subject: [PATCH 11/14] bitcoind_rpc: rm `BlockHash` from `Emitter::last_mempool_tip` Instead of comparing the blockhash against the emitted_blocks map to see whether the block is part of the emitter's best chain, we reduce the `last_mempool_tip` height to the last agreement height during the polling logic. The benefits of this is we have tighter bounds for avoiding re- emission. Also, it will be easier to replace `emitted_blocks` to a `CheckPoint` (since we no longer rely on map lookup). --- crates/bitcoind_rpc/src/lib.rs | 38 ++++++++++------ crates/bitcoind_rpc/tests/test_emitter.rs | 53 ++++++++++++++++++----- 2 files changed, 66 insertions(+), 25 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index a4b28c8e8..8ed646c81 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -33,7 +33,7 @@ pub struct Emitter<'c, C> { /// The last emitted block during our last mempool emission. This is used to determine whether /// there has been a reorg since our last mempool emission. - last_mempool_tip: Option<(u32, BlockHash)>, + last_mempool_tip: Option, } impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { @@ -65,12 +65,17 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { let client = self.client; - let prev_mempool_tip = match self.last_mempool_tip { - // use 'avoid-re-emission' logic if there is no reorg - Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height, - _ => 0, - }; - + // This is the emitted tip height during the last mempool emission. + let prev_mempool_tip = self + .last_mempool_tip + // We use `start_height - 1` as we cannot guarantee that the block at + // `start_height` has been emitted. + .unwrap_or(self.start_height.saturating_sub(1)); + + // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep + // track of the latest mempool tx's timestamp to determine whether we have seen a tx + // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will + // be the new latest timestamp. let prev_mempool_time = self.last_mempool_time; let mut latest_time = prev_mempool_time; @@ -109,11 +114,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { .collect::, _>>()?; self.last_mempool_time = latest_time; - self.last_mempool_tip = self - .emitted_blocks - .iter() - .last() - .map(|(&height, &hash)| (height, hash)); + self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height); Ok(txs_to_emit) } @@ -209,7 +210,18 @@ where continue; } PollResponse::AgreementFound(res) => { - emitter.emitted_blocks.split_off(&(res.height as u32 + 1)); + let agreement_h = res.height as u32; + + // get rid of evicted blocks + emitter.emitted_blocks.split_off(&(agreement_h + 1)); + + // The tip during the last mempool emission needs to in the best chain, we reduce + // it if it is not. + if let Some(h) = emitter.last_mempool_tip.as_mut() { + if *h > agreement_h { + *h = agreement_h; + } + } emitter.last_block = Some(res); continue; } diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 601fb5616..5d57bde18 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -368,7 +368,8 @@ fn test_into_tx_graph() -> anyhow::Result<()> { // must receive mined block which will confirm the transactions. { let (height, block) = emitter.next_block()?.expect("must get mined block"); - let _ = chain.apply_update(block_to_chain_update(&block, height))?; + let _ = chain + .apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?; let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height); assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txouts.is_empty()); @@ -685,34 +686,59 @@ fn mempool_during_reorg() -> anyhow::Result<()> { env.send(&addr, Amount::from_sat(2100))?; } - // perform reorgs at different heights - for reorg_count in 1..TIP_DIFF { - // sync emitter to tip - while emitter.next_header()?.is_some() {} + // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted + // from the mempool yet) + while emitter.next_header()?.is_some() {} + assert_eq!( + emitter + .mempool()? + .into_iter() + .map(|(tx, _)| tx.txid()) + .collect::>(), + env.client + .get_raw_mempool()? + .into_iter() + .collect::>(), + "first mempool emission should include all txs", + ); + // perform reorgs at different heights, these reorgs will not comfirm transactions in the + // mempool + for reorg_count in 1..TIP_DIFF { println!("REORG COUNT: {}", reorg_count); env.reorg_empty_blocks(reorg_count)?; - // we recalculate this at every loop as reorgs may evict transactions from mempool - let tx_introductions = env + // This is a map of mempool txids to tip height where the tx was introduced to the mempool + // we recalculate this at every loop as reorgs may evict transactions from mempool. We use + // the introduction height to determine whether we expect a tx to appear in a mempool + // emission. + // TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first? + let tx_introductions = dbg!(env .client .get_raw_mempool_verbose()? .into_iter() .map(|(txid, entry)| (txid, entry.height as usize)) - .collect::>(); + .collect::>()); + // `next_header` emits the replacement block of the reorg if let Some((height, _)) = emitter.next_header()? { - // the mempool emission (that follows the first block emission after reorg) should return - // the entire mempool contents + println!("\t- replacement height: {}", height); + + // the mempool emission (that follows the first block emission after reorg) should only + // include mempool txs introduced at reorg height or greater let mempool = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.txid()) .collect::>(); - let exp_mempool = tx_introductions.keys().copied().collect::>(); + let exp_mempool = tx_introductions + .iter() + .filter(|(_, &intro_h)| intro_h >= (height as usize)) + .map(|(&txid, _)| txid) + .collect::>(); assert_eq!( mempool, exp_mempool, - "the first mempool emission after reorg should include all mempool txs" + "the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater" ); let mempool = emitter @@ -738,6 +764,9 @@ fn mempool_during_reorg() -> anyhow::Result<()> { .collect::>(), ); } + + // sync emitter to tip + while emitter.next_header()?.is_some() {} } Ok(()) From 5f34df8489fedae2aa3fd001036cc9ef6abe9a7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 7 Oct 2023 00:56:01 +0800 Subject: [PATCH 12/14] bitcoind_rpc!: bring back `CheckPoint`s to `Emitter` * `bdk_chain` dependency is added. In the future, we will introduce a separate `bdk_core` crate to contain shared types. * replace `Emitter::new` with `from_height` and `from_checkpoint` * `from_height` emits from the given start height * `from_checkpoint` uses the provided cp to find agreement point * introduce logic that ensures emitted blocks can connect with receiver's `LocalChain` * in our rpc example, we can now `expect()` chain updates to always since we are using checkpoints and receiving blocks in order --- crates/bitcoind_rpc/Cargo.toml | 6 +- crates/bitcoind_rpc/src/lib.rs | 76 ++++++++++++++----- crates/bitcoind_rpc/tests/test_emitter.rs | 16 ++-- .../example_bitcoind_rpc_polling/src/main.rs | 29 +++---- 4 files changed, 84 insertions(+), 43 deletions(-) diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml index eeb9de581..f04469d27 100644 --- a/crates/bitcoind_rpc/Cargo.toml +++ b/crates/bitcoind_rpc/Cargo.toml @@ -9,13 +9,13 @@ edition = "2021" # For no-std, remember to enable the bitcoin/no-std feature bitcoin = { version = "0.30", default-features = false } bitcoincore-rpc = { version = "0.17" } +bdk_chain = { path = "../chain", version = "0.5", default-features = false } [dev-dependencies] -bdk_chain = { path = "../chain", version = "0.5", features = ["serde", "miniscript"] } bitcoind = { version = "0.33", features = ["25_0"] } anyhow = { version = "1" } [features] default = ["std"] -std = ["bitcoin/std"] -serde = ["bitcoin/serde"] +std = ["bitcoin/std", "bdk_chain/std"] +serde = ["bitcoin/serde", "bdk_chain/serde"] diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 8ed646c81..f200550bd 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -9,8 +9,7 @@ //! mempool. #![warn(missing_docs)] -use std::collections::BTreeMap; - +use bdk_chain::{local_chain::CheckPoint, BlockId}; use bitcoin::{block::Header, Block, BlockHash, Transaction}; pub use bitcoincore_rpc; use bitcoincore_rpc::bitcoincore_rpc_json; @@ -24,7 +23,7 @@ pub struct Emitter<'c, C> { client: &'c C, start_height: u32, - emitted_blocks: BTreeMap, + last_cp: Option, last_block: Option, /// The latest first-seen epoch of emitted mempool transactions. This is used to determine @@ -37,14 +36,29 @@ pub struct Emitter<'c, C> { } impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { - /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`]. + /// Construct a new [`Emitter`] with the given RPC `client` and `start_height`. /// /// `start_height` is the block height to start emitting blocks from. - pub fn new(client: &'c C, start_height: u32) -> Self { + pub fn from_height(client: &'c C, start_height: u32) -> Self { Self { client, start_height, - emitted_blocks: BTreeMap::new(), + last_cp: None, + last_block: None, + last_mempool_time: 0, + last_mempool_tip: None, + } + } + + /// Construct a new [`Emitter`] with the given RPC `client` and `checkpoint`. + /// + /// `checkpoint` is used to find the latest block which is still part of the best chain. The + /// [`Emitter`] will emit blocks starting right above this block. + pub fn from_checkpoint(client: &'c C, checkpoint: CheckPoint) -> Self { + Self { + client, + start_height: 0, + last_cp: Some(checkpoint), last_block: None, last_mempool_time: 0, last_mempool_tip: None, @@ -114,7 +128,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { .collect::, _>>()?; self.last_mempool_time = latest_time; - self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height); + self.last_mempool_tip = self.last_cp.as_ref().map(|cp| cp.height()); Ok(txs_to_emit) } @@ -135,7 +149,7 @@ enum PollResponse { NoMoreBlocks, /// Fetched block is not in the best chain. BlockNotInBestChain, - AgreementFound(bitcoincore_rpc_json::GetBlockResult), + AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint), AgreementPointNotFound, } @@ -146,7 +160,10 @@ where let client = emitter.client; if let Some(last_res) = &emitter.last_block { - assert!(!emitter.emitted_blocks.is_empty()); + assert!( + emitter.last_cp.is_some(), + "must not have block result without last cp" + ); let next_hash = match last_res.nextblockhash { None => return Ok(PollResponse::NoMoreBlocks), @@ -160,7 +177,7 @@ where return Ok(PollResponse::Block(res)); } - if emitter.emitted_blocks.is_empty() { + if emitter.last_cp.is_none() { let hash = client.get_block_hash(emitter.start_height as _)?; let res = client.get_block_info(&hash)?; @@ -170,15 +187,15 @@ where return Ok(PollResponse::Block(res)); } - for (&_, hash) in emitter.emitted_blocks.iter().rev() { - let res = client.get_block_info(hash)?; + for cp in emitter.last_cp.iter().flat_map(CheckPoint::iter) { + let res = client.get_block_info(&cp.hash())?; if res.confirmations < 0 { // block is not in best chain continue; } // agreement point found - return Ok(PollResponse::AgreementFound(res)); + return Ok(PollResponse::AgreementFound(res, cp)); } Ok(PollResponse::AgreementPointNotFound) @@ -196,9 +213,28 @@ where match poll_once(emitter)? { PollResponse::Block(res) => { let height = res.height as u32; - let item = get_item(&res.hash)?; - assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None); + let hash = res.hash; + let item = get_item(&hash)?; + + let this_id = BlockId { height, hash }; + let prev_id = res.previousblockhash.map(|prev_hash| BlockId { + height: height - 1, + hash: prev_hash, + }); + + match (&mut emitter.last_cp, prev_id) { + (Some(cp), _) => *cp = cp.clone().push(this_id).expect("must push"), + (last_cp, None) => *last_cp = Some(CheckPoint::new(this_id)), + // When the receiver constructs a local_chain update from a block, the previous + // checkpoint is also included in the update. We need to reflect this state in + // `Emitter::last_cp` as well. + (last_cp, Some(prev_id)) => { + *last_cp = Some(CheckPoint::new(prev_id).push(this_id).expect("must push")) + } + } + emitter.last_block = Some(res); + return Ok(Some((height, item))); } PollResponse::NoMoreBlocks => { @@ -209,11 +245,11 @@ where emitter.last_block = None; continue; } - PollResponse::AgreementFound(res) => { + PollResponse::AgreementFound(res, cp) => { let agreement_h = res.height as u32; // get rid of evicted blocks - emitter.emitted_blocks.split_off(&(agreement_h + 1)); + emitter.last_cp = Some(cp); // The tip during the last mempool emission needs to in the best chain, we reduce // it if it is not. @@ -226,7 +262,11 @@ where continue; } PollResponse::AgreementPointNotFound => { - emitter.emitted_blocks.clear(); + // We want to clear `last_cp` and set `start_height` to the first checkpoint's + // height. This way, the first checkpoint in `LocalChain` can be replaced. + if let Some(last_cp) = emitter.last_cp.take() { + emitter.start_height = last_cp.height(); + } emitter.last_block = None; continue; } diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 5d57bde18..f0bbd3d15 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -189,7 +189,7 @@ fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Up pub fn test_sync_local_chain() -> anyhow::Result<()> { let env = TestEnv::new()?; let mut local_chain = LocalChain::default(); - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // mine some blocks and returned the actual block hashes let exp_hashes = { @@ -305,7 +305,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { index }); - let emitter = &mut Emitter::new(&env.client, 0); + let emitter = &mut Emitter::from_height(&env.client, 0); while let Some((height, block)) = emitter.next_block()? { let _ = chain.apply_update(block_to_chain_update(&block, height))?; @@ -393,7 +393,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { const CHAIN_TIP_HEIGHT: usize = 110; let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, EMITTER_START_HEIGHT as _); + let mut emitter = Emitter::from_height(&env.client, EMITTER_START_HEIGHT as _); env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; while emitter.next_header()?.is_some() {} @@ -461,7 +461,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { const SEND_AMOUNT: Amount = Amount::from_sat(10_000); let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // setup addresses let addr_to_mine = env.client.get_new_address(None, None)?.assume_checked(); @@ -542,7 +542,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { const MEMPOOL_TX_COUNT: usize = 2; let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // mine blocks and sync up emitter let addr = env.client.get_new_address(None, None)?.assume_checked(); @@ -597,7 +597,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() const MEMPOOL_TX_COUNT: usize = 21; let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // mine blocks to get initial balance, sync emitter up to tip let addr = env.client.get_new_address(None, None)?.assume_checked(); @@ -674,7 +674,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { const PREMINE_COUNT: usize = 101; let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // mine blocks to get initial balance let addr = env.client.get_new_address(None, None)?.assume_checked(); @@ -789,7 +789,7 @@ fn no_agreement_point() -> anyhow::Result<()> { let env = TestEnv::new()?; // start height is 99 - let mut emitter = Emitter::new(&env.client, (PREMINE_COUNT - 2) as u32); + let mut emitter = Emitter::from_height(&env.client, (PREMINE_COUNT - 2) as u32); // mine 101 blocks env.mine_blocks(PREMINE_COUNT, None)?; diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index c9bcc9728..ad77030ae 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -27,8 +27,6 @@ const DB_MAGIC: &[u8] = b"bdk_example_rpc"; const DB_PATH: &str = ".bdk_example_rpc.db"; const CHANNEL_BOUND: usize = 10; -/// The block depth which we assume no reorgs can happen at. -const ASSUME_FINAL_DEPTH: u32 = 6; /// Delay for printing status to stdout. const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6); /// Delay between mempool emissions. @@ -160,13 +158,12 @@ fn main() -> anyhow::Result<()> { let mut db = db.lock().unwrap(); graph.index.set_lookahead_for_all(lookahead); - // we start at a height lower than last-seen tip in case of reorgs - let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| { - cp.height().saturating_sub(ASSUME_FINAL_DEPTH) - }); let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, start_height); + let mut emitter = match chain.tip() { + Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), + None => Emitter::from_height(&rpc_client, fallback_height), + }; let mut last_db_commit = Instant::now(); let mut last_print = Instant::now(); @@ -174,7 +171,9 @@ fn main() -> anyhow::Result<()> { while let Some((height, block)) = emitter.next_block()? { let chain_update = CheckPoint::from_header(&block.header, height).into_update(false); - let chain_changeset = chain.apply_update(chain_update)?; + let chain_changeset = chain + .apply_update(chain_update) + .expect("must always apply as we recieve blocks in order from emitter"); let graph_changeset = graph.apply_block_relevant(block, height); db.stage((chain_changeset, graph_changeset)); @@ -227,17 +226,17 @@ fn main() -> anyhow::Result<()> { let sigterm_flag = start_ctrlc_handler(); graph.lock().unwrap().index.set_lookahead_for_all(lookahead); - // we start at a height lower than last-seen tip in case of reorgs - let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| { - cp.height().saturating_sub(ASSUME_FINAL_DEPTH) - }); + let last_cp = chain.lock().unwrap().tip(); let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { println!("emitter thread started..."); let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, start_height); + let mut emitter = match last_cp { + Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), + None => Emitter::from_height(&rpc_client, fallback_height), + }; let mut block_count = rpc_client.get_block_count()? as u32; tx.send(Emission::Tip(block_count))?; @@ -284,7 +283,9 @@ fn main() -> anyhow::Result<()> { Emission::Block { height, block } => { let chain_update = CheckPoint::from_header(&block.header, height).into_update(false); - let chain_changeset = chain.apply_update(chain_update)?; + let chain_changeset = chain + .apply_update(chain_update) + .expect("must always apply as we recieve blocks in order from emitter"); let graph_changeset = graph.apply_block_relevant(block, height); (chain_changeset, graph_changeset) } From b69c13ddf6aa7cfb9be8c841b255e7f5f13ad328 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sun, 8 Oct 2023 02:29:04 +0800 Subject: [PATCH 13/14] example_bitcoind_rpc: tweaks * avoid holding mutex lock over io * document `CHANNEL_BOUND` const * use the `relevant` variant of `batch_insert_unconfirmed` * print elapsed time in stdout for various updates --- .../example_bitcoind_rpc_polling/src/main.rs | 72 ++++++++++++------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index ad77030ae..32735022d 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -26,12 +26,13 @@ use example_cli::{ const DB_MAGIC: &[u8] = b"bdk_example_rpc"; const DB_PATH: &str = ".bdk_example_rpc.db"; +/// The mpsc channel bound for emissions from [`Emitter`]. const CHANNEL_BOUND: usize = 10; /// Delay for printing status to stdout. const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6); /// Delay between mempool emissions. const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30); -/// Delay for commiting to persistance. +/// Delay for committing to persistance. const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); type ChangeSet = ( @@ -111,18 +112,30 @@ enum RpcCommands { } fn main() -> anyhow::Result<()> { + let start = Instant::now(); + let (args, keymap, index, db, init_changeset) = example_cli::init::(DB_MAGIC, DB_PATH)?; + println!( + "[{:>10}s] loaded initial changeset from db", + start.elapsed().as_secs_f32() + ); let graph = Mutex::new({ let mut graph = IndexedTxGraph::new(index); graph.apply_changeset(init_changeset.1); graph }); - println!("loaded indexed tx graph from db"); + println!( + "[{:>10}s] loaded indexed tx graph from changeset", + start.elapsed().as_secs_f32() + ); let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); - println!("loaded local chain from db"); + println!( + "[{:>10}s] loaded local chain from changeset", + start.elapsed().as_secs_f32() + ); let rpc_cmd = match args.command { example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, @@ -153,14 +166,11 @@ fn main() -> anyhow::Result<()> { .. } = rpc_args; - let mut chain = chain.lock().unwrap(); - let mut graph = graph.lock().unwrap(); - let mut db = db.lock().unwrap(); - - graph.index.set_lookahead_for_all(lookahead); + graph.lock().unwrap().index.set_lookahead_for_all(lookahead); + let chain_tip = chain.lock().unwrap().tip(); let rpc_client = rpc_args.new_client()?; - let mut emitter = match chain.tip() { + let mut emitter = match chain_tip { Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), None => Emitter::from_height(&rpc_client, fallback_height), }; @@ -169,6 +179,10 @@ fn main() -> anyhow::Result<()> { let mut last_print = Instant::now(); while let Some((height, block)) = emitter.next_block()? { + let mut chain = chain.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + let mut db = db.lock().unwrap(); + let chain_update = CheckPoint::from_header(&block.header, height).into_update(false); let chain_changeset = chain @@ -182,7 +196,8 @@ fn main() -> anyhow::Result<()> { last_db_commit = Instant::now(); db.commit()?; println!( - "commited to db (took {}s)", + "[{:>10}s] commited to db (took {}s)", + start.elapsed().as_secs_f32(), last_db_commit.elapsed().as_secs_f32() ); } @@ -200,7 +215,8 @@ fn main() -> anyhow::Result<()> { ) }; println!( - "synced to {} @ {} | total: {} sats", + "[{:>10}s] synced to {} @ {} | total: {} sats", + start.elapsed().as_secs_f32(), synced_to.hash(), synced_to.height(), balance.total() @@ -209,13 +225,15 @@ fn main() -> anyhow::Result<()> { } } - // mempool let mempool_txs = emitter.mempool()?; - let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs); - db.stage((local_chain::ChangeSet::default(), graph_changeset)); - - // commit one last time! - db.commit()?; + let graph_changeset = graph.lock().unwrap().batch_insert_relevant_unconfirmed( + mempool_txs.iter().map(|(tx, time)| (tx, *time)), + ); + { + let mut db = db.lock().unwrap(); + db.stage((local_chain::ChangeSet::default(), graph_changeset)); + db.commit()?; // commit one last time + } } RpcCommands::Live { rpc_args } => { let RpcArgs { @@ -228,10 +246,12 @@ fn main() -> anyhow::Result<()> { graph.lock().unwrap().index.set_lookahead_for_all(lookahead); let last_cp = chain.lock().unwrap().tip(); + println!( + "[{:>10}s] starting emitter thread...", + start.elapsed().as_secs_f32() + ); let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { - println!("emitter thread started..."); - let rpc_client = rpc_args.new_client()?; let mut emitter = match last_cp { Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), @@ -270,15 +290,15 @@ fn main() -> anyhow::Result<()> { Ok(()) }); - let mut db = db.lock().unwrap(); - let mut graph = graph.lock().unwrap(); - let mut chain = chain.lock().unwrap(); let mut tip_height = 0_u32; - let mut last_db_commit = Instant::now(); let mut last_print = Option::::None; for emission in rx { + let mut db = db.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + let mut chain = chain.lock().unwrap(); + let changeset = match emission { Emission::Block { height, block } => { let chain_update = @@ -307,7 +327,8 @@ fn main() -> anyhow::Result<()> { last_db_commit = Instant::now(); db.commit()?; println!( - "commited to db (took {}s)", + "[{:>10}s] commited to db (took {}s)", + start.elapsed().as_secs_f32(), last_db_commit.elapsed().as_secs_f32() ); } @@ -324,7 +345,8 @@ fn main() -> anyhow::Result<()> { ) }; println!( - "synced to {} @ {} / {} | total: {} sats", + "[{:>10}s] synced to {} @ {} / {} | total: {} sats", + start.elapsed().as_secs_f32(), synced_to.hash(), synced_to.height(), tip_height, From 85c62532a55cfc94eade4d20ca3075dc4cd4882e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sun, 8 Oct 2023 03:04:13 +0800 Subject: [PATCH 14/14] docs(bitcoind_rpc): better `Emitter::mempool` explanation Also better docs for `Emitter` fields. --- crates/bitcoind_rpc/src/lib.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index f200550bd..a5016ce6f 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -1,5 +1,5 @@ -//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface (excluding the -//! RPC wallet API). +//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface. It does not +//! use the wallet RPC API, so this crate can be used with wallet-disabled Bitcoin Core nodes. //! //! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`]. //! @@ -23,7 +23,14 @@ pub struct Emitter<'c, C> { client: &'c C, start_height: u32, + /// The checkpoint of the last-emitted block that is in the best chain. If it is later found + /// that the block is no longer in the best chain, it will be popped off from here. last_cp: Option, + + /// The block result returned from rpc of the last-emitted block. As this result contains the + /// next block's block hash (which we use to fetch the next block), we set this to `None` + /// whenever there are no more blocks, or the next block is no longer in the best chain. This + /// gives us an opportunity to re-fetch this result. last_block: Option, /// The latest first-seen epoch of emitted mempool transactions. This is used to determine @@ -67,15 +74,14 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { /// Emit mempool transactions, alongside their first-seen unix timestamps. /// - /// Ideally, this method would only emit the same transaction once. However, if the receiver - /// filters transactions based on whether it alters the output set of tracked script pubkeys, - /// there are situations where we would want to re-emit. For example, if an emitted mempool - /// transaction spends a tracked UTXO which is confirmed at height `h`, but the receiver has - /// only seen up to block of height `h-1`, we want to re-emit this transaction until the - /// receiver has seen the block at height `h`. + /// This method emits each transaction only once, unless we cannot guarantee the transaction's + /// ancestors are already emitted. /// - /// In other words, we want to re-emit a transaction if we cannot guarantee it's ancestors are - /// already emitted. + /// To understand why, consider a receiver which filters transactions based on whether it + /// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a + /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block + /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block + /// at height `h`. pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { let client = self.client;