diff --git a/CHANGELOG.md b/CHANGELOG.md index fa369a47f..3fda3e060 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add the ability to specify whether a taproot transaction should be signed using the internal key or not, using `sign_with_tap_internal_key` in `SignOptions` - Consolidate params `fee_amount` and `amount_needed` in `target_amount` in `CoinSelectionAlgorithm::coin_select` signature. - Change the meaning of the `fee_amount` field inside `CoinSelectionResult`: from now on the `fee_amount` will represent only the fees asociated with the utxos in the `selected` field of `CoinSelectionResult`. +- New `RpcBlockchain` implementation with various fixes. ## [v0.20.0] - [v0.19.0] diff --git a/examples/rpcwallet.rs b/examples/rpcwallet.rs index 3178af6bb..24a555910 100644 --- a/examples/rpcwallet.rs +++ b/examples/rpcwallet.rs @@ -103,7 +103,7 @@ fn main() -> Result<(), Box> { auth: bitcoind_auth, network: Network::Regtest, wallet_name, - skip_blocks: None, + sync_params: None, }; // Use the above configuration to create a RPC blockchain backend diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index 1d0d884c0..914d03759 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -26,30 +26,33 @@ //! }, //! network: bdk::bitcoin::Network::Testnet, //! wallet_name: "wallet_name".to_string(), -//! skip_blocks: None, +//! sync_params: None, //! }; //! let blockchain = RpcBlockchain::from_config(&config); //! ``` -use crate::bitcoin::consensus::deserialize; use crate::bitcoin::hashes::hex::ToHex; -use crate::bitcoin::{Address, Network, OutPoint, Transaction, TxOut, Txid}; +use crate::bitcoin::{Network, OutPoint, Transaction, TxOut, Txid}; use crate::blockchain::*; -use crate::database::{BatchDatabase, DatabaseUtils}; +use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; use crate::descriptor::get_checksum; +use crate::error::MissingCachedScripts; use crate::{BlockTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails}; +use bitcoin::Script; use bitcoincore_rpc::json::{ - GetAddressInfoResultLabel, ImportMultiOptions, ImportMultiRequest, - ImportMultiRequestScriptPubkey, ImportMultiRescanSince, + GetTransactionResultDetailCategory, ImportMultiOptions, ImportMultiRequest, + ImportMultiRequestScriptPubkey, ImportMultiRescanSince, ListTransactionResult, + ListUnspentResultEntry, ScanningDetails, }; use bitcoincore_rpc::jsonrpc::serde_json::{json, Value}; use bitcoincore_rpc::Auth as RpcAuth; use bitcoincore_rpc::{Client, RpcApi}; -use log::debug; +use log::{debug, info}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; -use std::str::FromStr; +use std::thread; +use std::time::Duration; /// The main struct for RPC backend implementing the [crate::blockchain::Blockchain] trait #[derive(Debug)] @@ -60,11 +63,8 @@ pub struct RpcBlockchain { is_descriptors: bool, /// Blockchain capabilities, cached here at startup capabilities: HashSet, - /// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block - skip_blocks: Option, - - /// This is a fixed Address used as a hack key to store information on the node - _storage_address: Address, + /// Sync parameters. + sync_params: RpcSyncParams, } /// RpcBlockchain configuration options @@ -78,8 +78,36 @@ pub struct RpcConfig { pub network: Network, /// The wallet name in the bitcoin node, consider using [crate::wallet::wallet_name_from_descriptor] for this pub wallet_name: String, - /// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block - pub skip_blocks: Option, + /// Sync parameters + pub sync_params: Option, +} + +/// Sync parameters for Bitcoin Core RPC. +/// +/// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with +/// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining +/// how the `importdescriptors` RPC calls are to be made. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct RpcSyncParams { + /// The minimum number of scripts to scan for on initial sync. + pub start_script_count: usize, + /// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis). + pub start_time: u64, + /// Forces every sync to use `start_time` as import timestamp. + pub force_start_time: bool, + /// RPC poll rate (in seconds) to get state updates. + pub poll_rate_sec: u64, +} + +impl Default for RpcSyncParams { + fn default() -> Self { + Self { + start_script_count: 100, + start_time: 0, + force_start_time: false, + poll_rate_sec: 3, + } + } } /// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize] @@ -115,27 +143,6 @@ impl From for RpcAuth { } } -impl RpcBlockchain { - fn get_node_synced_height(&self) -> Result { - let info = self.client.get_address_info(&self._storage_address)?; - if let Some(GetAddressInfoResultLabel::Simple(label)) = info.labels.first() { - Ok(label - .parse::() - .unwrap_or_else(|_| self.skip_blocks.unwrap_or(0))) - } else { - Ok(self.skip_blocks.unwrap_or(0)) - } - } - - /// Set the synced height in the core node by using a label of a fixed address so that - /// another client with the same descriptor doesn't rescan the blockchain - fn set_node_synced_height(&self, height: u32) -> Result<(), Error> { - Ok(self - .client - .set_label(&self._storage_address, &height.to_string())?) - } -} - impl Blockchain for RpcBlockchain { fn get_capabilities(&self) -> HashSet { self.capabilities.clone() @@ -176,226 +183,15 @@ impl GetBlockHash for RpcBlockchain { } impl WalletSync for RpcBlockchain { - fn wallet_setup( - &self, - database: &mut D, - progress_update: Box, - ) -> Result<(), Error> { - let mut scripts_pubkeys = database.iter_script_pubkeys(Some(KeychainKind::External))?; - scripts_pubkeys.extend(database.iter_script_pubkeys(Some(KeychainKind::Internal))?); - debug!( - "importing {} script_pubkeys (some maybe already imported)", - scripts_pubkeys.len() - ); - - if self.is_descriptors { - // Core still doesn't support complex descriptors like BDK, but when the wallet type is - // "descriptors" we should import individual addresses using `importdescriptors` rather - // than `importmulti`, using the `raw()` descriptor which allows us to specify an - // arbitrary script - let requests = Value::Array( - scripts_pubkeys - .iter() - .map(|s| { - let desc = format!("raw({})", s.to_hex()); - json!({ - "timestamp": "now", - "desc": format!("{}#{}", desc, get_checksum(&desc).unwrap()), - }) - }) - .collect(), - ); - - let res: Vec = self.client.call("importdescriptors", &[requests])?; - res.into_iter() - .map(|v| match v["success"].as_bool() { - Some(true) => Ok(()), - Some(false) => Err(Error::Generic( - v["error"]["message"] - .as_str() - .unwrap_or("Unknown error") - .to_string(), - )), - _ => Err(Error::Generic("Unexpected response from Core".to_string())), - }) - .collect::, _>>()?; - } else { - let requests: Vec<_> = scripts_pubkeys - .iter() - .map(|s| ImportMultiRequest { - timestamp: ImportMultiRescanSince::Timestamp(0), - script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(s)), - watchonly: Some(true), - ..Default::default() - }) - .collect(); - let options = ImportMultiOptions { - rescan: Some(false), - }; - self.client.import_multi(&requests, Some(&options))?; - } - - loop { - let current_height = self.get_height()?; - - // min because block invalidate may cause height to go down - let node_synced = self.get_node_synced_height()?.min(current_height); - - let sync_up_to = node_synced.saturating_add(10_000).min(current_height); - - debug!("rescan_blockchain from:{} to:{}", node_synced, sync_up_to); - self.client - .rescan_blockchain(Some(node_synced as usize), Some(sync_up_to as usize))?; - progress_update.update((sync_up_to as f32) / (current_height as f32), None)?; - - self.set_node_synced_height(sync_up_to)?; - - if sync_up_to == current_height { - break; - } - } - - self.wallet_sync(database, progress_update) - } - - fn wallet_sync( - &self, - db: &mut D, - _progress_update: Box, - ) -> Result<(), Error> { - let mut indexes = HashMap::new(); - for keykind in &[KeychainKind::External, KeychainKind::Internal] { - indexes.insert(*keykind, db.get_last_index(*keykind)?.unwrap_or(0)); - } - - let mut known_txs: HashMap<_, _> = db - .iter_txs(true)? - .into_iter() - .map(|tx| (tx.txid, tx)) - .collect(); - let known_utxos: HashSet<_> = db.iter_utxos()?.into_iter().collect(); - - //TODO list_since_blocks would be more efficient - let current_utxo = self - .client - .list_unspent(Some(0), None, None, Some(true), None)?; - debug!("current_utxo len {}", current_utxo.len()); - - //TODO supported up to 1_000 txs, should use since_blocks or do paging - let list_txs = self - .client - .list_transactions(None, Some(1_000), None, Some(true))?; - let mut list_txs_ids = HashSet::new(); - - for tx_result in list_txs.iter().filter(|t| { - // list_txs returns all conflicting txs, we want to - // filter out replaced tx => unconfirmed and not in the mempool - t.info.confirmations > 0 || self.client.get_mempool_entry(&t.info.txid).is_ok() - }) { - let txid = tx_result.info.txid; - list_txs_ids.insert(txid); - if let Some(mut known_tx) = known_txs.get_mut(&txid) { - let confirmation_time = - BlockTime::new(tx_result.info.blockheight, tx_result.info.blocktime); - if confirmation_time != known_tx.confirmation_time { - // reorg may change tx height - debug!( - "updating tx({}) confirmation time to: {:?}", - txid, confirmation_time - ); - known_tx.confirmation_time = confirmation_time; - db.set_tx(known_tx)?; - } - } else { - //TODO check there is already the raw tx in db? - let tx_result = self.client.get_transaction(&txid, Some(true))?; - let tx: Transaction = deserialize(&tx_result.hex)?; - let mut received = 0u64; - let mut sent = 0u64; - for output in tx.output.iter() { - if let Ok(Some((kind, index))) = - db.get_path_from_script_pubkey(&output.script_pubkey) - { - if index > *indexes.get(&kind).unwrap() { - indexes.insert(kind, index); - } - received += output.value; - } - } - - for input in tx.input.iter() { - if let Some(previous_output) = db.get_previous_output(&input.previous_output)? { - if db.is_mine(&previous_output.script_pubkey)? { - sent += previous_output.value; - } - } - } - - let td = TransactionDetails { - transaction: Some(tx), - txid: tx_result.info.txid, - confirmation_time: BlockTime::new( - tx_result.info.blockheight, - tx_result.info.blocktime, - ), - received, - sent, - fee: tx_result.fee.map(|f| f.as_sat().unsigned_abs()), - }; - debug!( - "saving tx: {} tx_result.fee:{:?} td.fees:{:?}", - td.txid, tx_result.fee, td.fee - ); - db.set_tx(&td)?; - } - } - - for known_txid in known_txs.keys() { - if !list_txs_ids.contains(known_txid) { - debug!("removing tx: {}", known_txid); - db.del_tx(known_txid, false)?; - } - } - - // Filter out trasactions that are for script pubkeys that aren't in this wallet. - let current_utxos = current_utxo - .into_iter() - .filter_map( - |u| match db.get_path_from_script_pubkey(&u.script_pub_key) { - Err(e) => Some(Err(e)), - Ok(None) => None, - Ok(Some(path)) => Some(Ok(LocalUtxo { - outpoint: OutPoint::new(u.txid, u.vout), - keychain: path.0, - txout: TxOut { - value: u.amount.as_sat(), - script_pubkey: u.script_pub_key, - }, - is_spent: false, - })), - }, - ) - .collect::, Error>>()?; - - let spent: HashSet<_> = known_utxos.difference(¤t_utxos).collect(); - for utxo in spent { - debug!("setting as spent utxo: {:?}", utxo); - let mut spent_utxo = utxo.clone(); - spent_utxo.is_spent = true; - db.set_utxo(&spent_utxo)?; - } - let received: HashSet<_> = current_utxos.difference(&known_utxos).collect(); - for utxo in received { - debug!("adding utxo: {:?}", utxo); - db.set_utxo(utxo)?; - } - - for (keykind, index) in indexes { - debug!("{:?} max {}", keykind, index); - db.set_last_index(keykind, index)?; - } - - Ok(()) + fn wallet_setup(&self, db: &mut D, prog: Box) -> Result<(), Error> + where + D: BatchDatabase, + { + let batch = DbState::new(db, &self.sync_params, &*prog)? + .sync_with_core(&self.client, self.is_descriptors)? + .as_db_batch()?; + + db.commit_batch(batch) } } @@ -464,17 +260,11 @@ impl ConfigurableBlockchain for RpcBlockchain { } } - // this is just a fixed address used only to store a label containing the synced height in the node - let mut storage_address = - Address::from_str("bc1qst0rewf0wm4kw6qn6kv0e5tc56nkf9yhcxlhqv").unwrap(); - storage_address.network = network; - Ok(RpcBlockchain { client, capabilities, is_descriptors, - _storage_address: storage_address, - skip_blocks: config.skip_blocks, + sync_params: config.sync_params.clone().unwrap_or_default(), }) } } @@ -495,6 +285,519 @@ fn list_wallet_dir(client: &Client) -> Result, Error> { Ok(result.wallets.into_iter().map(|n| n.name).collect()) } +/// Represents the state of the [`crate::database::Database`]. +struct DbState<'a, D> { + db: &'a D, + params: &'a RpcSyncParams, + prog: &'a dyn Progress, + + ext_spks: Vec