diff --git a/src/blockchain/any.rs b/src/blockchain/any.rs index 5ef1a33851..fea70cb25f 100644 --- a/src/blockchain/any.rs +++ b/src/blockchain/any.rs @@ -133,7 +133,7 @@ impl WalletSync for AnyBlockchain { &self, database: &mut D, progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { maybe_await!(impl_inner_method!( self, wallet_sync, @@ -146,7 +146,7 @@ impl WalletSync for AnyBlockchain { &self, database: &mut D, progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { maybe_await!(impl_inner_method!( self, wallet_setup, diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs index 7ca78a2c34..d82aac9a54 100644 --- a/src/blockchain/compact_filters/mod.rs +++ b/src/blockchain/compact_filters/mod.rs @@ -276,7 +276,7 @@ impl WalletSync for CompactFiltersBlockchain { &self, database: &mut D, progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { let first_peer = &self.peers[0]; let skip_blocks = self.skip_blocks.unwrap_or(0); @@ -474,7 +474,7 @@ impl WalletSync for CompactFiltersBlockchain { .unwrap() .update(100.0, Some("Done".into()))?; - Ok(()) + Ok(0) } } diff --git a/src/blockchain/electrum.rs b/src/blockchain/electrum.rs index faf7ea7565..70f179d185 100644 --- a/src/blockchain/electrum.rs +++ b/src/blockchain/electrum.rs @@ -110,7 +110,7 @@ impl WalletSync for ElectrumBlockchain { &self, database: &mut D, _progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { let mut request = script_sync::start(database, self.stop_gap)?; let mut block_times = HashMap::::new(); let mut txid_to_height = HashMap::::new(); @@ -124,7 +124,9 @@ impl WalletSync for ElectrumBlockchain { // tranascations than in the request. This should never happen but we don't want to panic. let electrum_goof = || Error::Generic("electrum server misbehaving".to_string()); - let batch_update = loop { + // `missing_count` is the min number of scripts to cache, so that we may try to satisfy + // `stop_gap` with the next attempted sync + let (batch_update, missing_count) = loop { request = match request { Request::Script(script_req) => { let scripts = script_req.request().take(chunk_size); @@ -232,12 +234,14 @@ impl WalletSync for ElectrumBlockchain { tx_req.satisfy(full_details)? } - Request::Finish(batch_update) => break batch_update, + Request::Finish(batch_update, missing_count) => { + break (batch_update, missing_count) + } } }; database.commit_batch(batch_update)?; - Ok(()) + Ok(missing_count) } } diff --git a/src/blockchain/esplora/reqwest.rs b/src/blockchain/esplora/reqwest.rs index 0d40506082..fa5add7560 100644 --- a/src/blockchain/esplora/reqwest.rs +++ b/src/blockchain/esplora/reqwest.rs @@ -131,12 +131,12 @@ impl WalletSync for EsploraBlockchain { &self, database: &mut D, _progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { use crate::blockchain::script_sync::Request; let mut request = script_sync::start(database, self.stop_gap)?; let mut tx_index: HashMap = HashMap::new(); - let batch_update = loop { + let (batch_update, missing_count) = loop { request = match request { Request::Script(script_req) => { let futures: FuturesOrdered<_> = script_req @@ -208,13 +208,14 @@ impl WalletSync for EsploraBlockchain { .collect::>()?; tx_req.satisfy(full_txs)? } - Request::Finish(batch_update) => break batch_update, + Request::Finish(batch_update, missing_count) => { + break (batch_update, missing_count) + } } }; database.commit_batch(batch_update)?; - - Ok(()) + Ok(missing_count) } } diff --git a/src/blockchain/esplora/ureq.rs b/src/blockchain/esplora/ureq.rs index 9899b90462..adec522c77 100644 --- a/src/blockchain/esplora/ureq.rs +++ b/src/blockchain/esplora/ureq.rs @@ -124,11 +124,11 @@ impl WalletSync for EsploraBlockchain { &self, database: &mut D, _progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { use crate::blockchain::script_sync::Request; let mut request = script_sync::start(database, self.stop_gap)?; let mut tx_index: HashMap = HashMap::new(); - let batch_update = loop { + let (batch_update, missing_count) = loop { request = match request { Request::Script(script_req) => { let scripts = script_req @@ -206,13 +206,15 @@ impl WalletSync for EsploraBlockchain { .collect::>()?; tx_req.satisfy(full_txs)? } - Request::Finish(batch_update) => break batch_update, + Request::Finish(batch_update, missing_count) => { + break (batch_update, missing_count) + } } }; database.commit_batch(batch_update)?; - Ok(()) + Ok(missing_count) } } diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index 1dc5c95a1c..3437b5382b 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -135,7 +135,7 @@ pub trait WalletSync { &self, database: &mut D, progress_update: Box, - ) -> Result<(), Error>; + ) -> Result; /// If not overridden, it defaults to calling [`Self::wallet_setup`] internally. /// @@ -158,7 +158,7 @@ pub trait WalletSync { &self, database: &mut D, progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { maybe_await!(self.wallet_setup(database, progress_update)) } } @@ -379,7 +379,7 @@ impl WalletSync for Arc { &self, database: &mut D, progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { maybe_await!(self.deref().wallet_setup(database, progress_update)) } @@ -387,7 +387,7 @@ impl WalletSync for Arc { &self, database: &mut D, progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { maybe_await!(self.deref().wallet_sync(database, progress_update)) } } diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index 1d0d884c09..30fd433c2a 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -180,7 +180,7 @@ impl WalletSync for RpcBlockchain { &self, database: &mut D, progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { let mut scripts_pubkeys = database.iter_script_pubkeys(Some(KeychainKind::External))?; scripts_pubkeys.extend(database.iter_script_pubkeys(Some(KeychainKind::Internal))?); debug!( @@ -262,7 +262,7 @@ impl WalletSync for RpcBlockchain { &self, db: &mut D, _progress_update: Box, - ) -> Result<(), Error> { + ) -> Result { let mut indexes = HashMap::new(); for keykind in &[KeychainKind::External, KeychainKind::Internal] { indexes.insert(*keykind, db.get_last_index(*keykind)?.unwrap_or(0)); @@ -395,7 +395,7 @@ impl WalletSync for RpcBlockchain { db.set_last_index(keykind, index)?; } - Ok(()) + Ok(0) } } diff --git a/src/blockchain/script_sync.rs b/src/blockchain/script_sync.rs index 0575273608..d87fe23ec8 100644 --- a/src/blockchain/script_sync.rs +++ b/src/blockchain/script_sync.rs @@ -21,7 +21,9 @@ pub enum Request<'a, D: BatchDatabase> { /// A request for full transaction details of some transactions. Tx(TxReq<'a, D>), /// Requests are finished here's a batch database update to reflect data gathered. - Finish(D::Batch), + /// we also return a value which represents the minimum number of scripts that we should cache + /// for next round (or the missing cache count) + Finish(D::Batch, usize), } /// starts a sync @@ -34,7 +36,8 @@ pub fn start(db: &D, stop_gap: usize) -> Result let scripts_needed = db .iter_script_pubkeys(Some(keychain))? .into_iter() - .collect(); + .collect::>(); + println!("scripts_needed count: {}", scripts_needed.len()); let state = State::new(db); Ok(Request::Script(ScriptReq { @@ -117,39 +120,57 @@ impl<'a, D: BatchDatabase> ScriptReq<'a, D> { self.scripts_needed.pop_front(); } - let last_active_index = self + // last active index + // 0 => No last active + let last = self .state .last_active_index .get(&self.keychain) - .map(|x| x + 1) - .unwrap_or(0); // so no addresses active maps to 0 + .map(|&l| l + 1) + .unwrap_or(0); - Ok( - if self.script_index > last_active_index + self.stop_gap - || self.scripts_needed.is_empty() - { + // difference between current index and last active index + let diff = self.script_index - last; + + if diff <= self.stop_gap { + if !self.scripts_needed.is_empty() { + // we have not finished requesting txs with script batches, so continue + return Ok(Request::Script(self)); + } + + // if we obtained an active index in this sync, we have missing scripts in db cache, + // report it + if last > 0 { debug!( - "finished scanning for transactions for keychain {:?} at index {}", - self.keychain, last_active_index + "reporting missing with: current={}, last={}, diff={}, gap={}", + self.script_index, last, diff, self.stop_gap ); - // we're done here -- check if we need to do the next keychain - if let Some(keychain) = self.next_keychains.pop() { - self.keychain = keychain; - self.script_index = 0; - self.scripts_needed = self - .state - .db - .iter_script_pubkeys(Some(keychain))? - .into_iter() - .collect(); - Request::Script(self) - } else { - Request::Tx(TxReq { state: self.state }) - } - } else { - Request::Script(self) - }, - ) + self.state + .missing_script_counts + .insert(self.keychain, self.stop_gap - diff); + } + } + + debug!( + "finished scanning for txs of keychain {:?} at index {:?}", + self.keychain, last + ); + + if let Some(keychain) = self.next_keychains.pop() { + // we still have another keychain to request txs with + self.keychain = keychain; + self.script_index = 0; + self.scripts_needed = self + .state + .db + .iter_script_pubkeys(Some(keychain))? + .into_iter() + .collect(); + return Ok(Request::Script(self)); + } + + // We have finished requesting txids, let's get the actual txs. + Ok(Request::Tx(TxReq { state: self.state })) } } @@ -276,7 +297,18 @@ impl<'a, D: BatchDatabase> ConftimeReq<'a, D> { } if self.state.tx_missing_conftime.is_empty() { - Ok(Request::Finish(self.state.into_db_update()?)) + // Obtain largest missing count (between external/internal) for simplicity + // The reasoning is that there is no point returning a map - in the future, a single + // database will only handle a single descriptor + let missing = self + .state + .missing_script_counts + .clone() + .into_values() + .reduce(std::cmp::max) + .unwrap_or(0); + + Ok(Request::Finish(self.state.into_db_update()?, missing)) } else { Ok(Request::Conftime(self)) } @@ -294,6 +326,8 @@ struct State<'a, D> { tx_missing_conftime: BTreeMap, /// The start of the sync start_time: Instant, + /// Missing number of scripts to cache per keychain + missing_script_counts: HashMap, } impl<'a, D: BatchDatabase> State<'a, D> { @@ -305,6 +339,7 @@ impl<'a, D: BatchDatabase> State<'a, D> { tx_needed: BTreeSet::default(), tx_missing_conftime: BTreeMap::default(), start_time: Instant::new(), + missing_script_counts: HashMap::default(), } } fn into_db_update(self) -> Result { diff --git a/src/testutils/configurable_blockchain_tests.rs b/src/testutils/configurable_blockchain_tests.rs index a39608bb14..fcc29ef646 100644 --- a/src/testutils/configurable_blockchain_tests.rs +++ b/src/testutils/configurable_blockchain_tests.rs @@ -29,6 +29,7 @@ pub trait ConfigurableBlockchainTester: Sized { if self.config_with_stop_gap(test_client, 0).is_some() { test_wallet_sync_with_stop_gaps(test_client, self); + test_wallet_sync_fulfills_missing_script_cache(test_client, self); } else { println!( "{}: Skipped tests requiring config_with_stop_gap.", @@ -113,16 +114,21 @@ where } else { max_balance }; + let details = format!( + "test_vector: [stop_gap: {}, actual_gap: {}, addrs_before: {}, addrs_after: {}]", + stop_gap, actual_gap, addrs_before, addrs_after, + ); + println!("{}", details); // perform wallet sync wallet.sync(&blockchain, Default::default()).unwrap(); let wallet_balance = wallet.get_balance().unwrap(); - - let details = format!( - "test_vector: [stop_gap: {}, actual_gap: {}, addrs_before: {}, addrs_after: {}]", - stop_gap, actual_gap, addrs_before, addrs_after, + println!( + "max: {}, min: {}, actual: {}", + max_balance, min_balance, wallet_balance ); + assert!( wallet_balance <= max_balance, "wallet balance is greater than received amount: {}", @@ -138,3 +144,54 @@ where test_client.generate(1, None); } } + +/// With a `stop_gap` of x and every x addresses having a balance of 1000 (for y addresses), +/// we expect `Wallet::sync` to correctly self-cache addresses, so that the resulting balance, +/// after sync, should be y * 1000. +fn test_wallet_sync_fulfills_missing_script_cache(test_client: &mut TestClient, tester: &T) +where + T: ConfigurableBlockchainTester, + B: ConfigurableBlockchain, +{ + // wallet descriptor + let descriptor = "wpkh([c258d2e4/84h/1h/0h]tpubDDYkZojQFQjht8Tm4jsS3iuEmKjTiEGjG6KnuFNKKJb5A6ZUCUZKdvLdSDWofKi4ToRCwb9poe1XdqfUnP4jaJjCB2Zwv11ZLgSbnZSNecE/200/*)"; + + // amount in sats per tx + const AMOUNT_PER_TX: u64 = 1000; + + // addr constants + const ADDR_COUNT: usize = 6; + const ADDR_GAP: usize = 60; + + let blockchain = + B::from_config(&tester.config_with_stop_gap(test_client, ADDR_GAP).unwrap()).unwrap(); + + let wallet = Wallet::new(descriptor, None, Network::Regtest, MemoryDatabase::new()).unwrap(); + + let expected_balance = (0..ADDR_COUNT).fold(0_u64, |sum, i| { + let addr_i = i * ADDR_GAP; + let address = wallet.get_address(AddressIndex::Peek(addr_i as _)).unwrap(); + + println!( + "tx: {} sats => [{}] {}", + AMOUNT_PER_TX, + addr_i, + address.to_string() + ); + + test_client.receive(testutils! { + @tx ( (@addr address.address) => AMOUNT_PER_TX ) + }); + test_client.generate(1, None); + + sum + AMOUNT_PER_TX + }); + println!("expected balance: {}, syncing...", expected_balance); + + // perform sync + wallet.sync(&blockchain, Default::default()).unwrap(); + println!("sync done!"); + + let balance = wallet.get_balance().unwrap(); + assert_eq!(balance, expected_balance); +} diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 9231c3b749..098c5a655b 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -30,6 +30,7 @@ use bitcoin::{ Txid, Witness, }; +use log::warn; use miniscript::descriptor::DescriptorTrait; use miniscript::psbt::PsbtInputSatisfier; use miniscript::ToPublicKey; @@ -1689,16 +1690,62 @@ where let progress = progress.unwrap_or_else(|| Box::new(NoopProgress)); let run_setup = self.ensure_addresses_cached(CACHE_ADDR_BATCH_SIZE)?; - debug!("run_setup: {}", run_setup); + // TODO: what if i generate an address first and cache some addresses? // TODO: we should sync if generating an address triggers a new batch to be stored - if run_setup { - maybe_await!( - blockchain.wallet_setup(self.database.borrow_mut().deref_mut(), progress,) - )?; + + // first run, we record progress... + let mut missing_cache_count = if run_setup { + maybe_await!(blockchain.wallet_setup(self.database.borrow_mut().deref_mut(), progress))? } else { - maybe_await!(blockchain.wallet_sync(self.database.borrow_mut().deref_mut(), progress,))?; + maybe_await!(blockchain.wallet_sync(self.database.borrow_mut().deref_mut(), progress))? + }; + + // We need to ensure descriptor is derivable to fullfil "missing cache", otherwise we will + // end up with an infinite loop + let is_deriveable = self.descriptor.is_deriveable() + && (self.change_descriptor.is_none() + || self.change_descriptor.as_ref().unwrap().is_deriveable()); + + // Restrict max rounds in case of faulty "missing cache" implementation by blockchain + const MAX_ROUNDS: usize = 100; + let mut round = 0_usize; + + while is_deriveable && missing_cache_count > 0 { + // as each call to `wallet_{sync|setup}` is rather expensive, we wish to maximize on + // additional `scriptPubKey`s to search for on each call + let to_cache = std::cmp::max(missing_cache_count as u32, CACHE_ADDR_BATCH_SIZE); + + // cache and try again + for keychain in [KeychainKind::External, KeychainKind::Internal] { + let (_, keychain) = self._get_descriptor_for_keychain(keychain); + let from = self + .database + .borrow() + .iter_script_pubkeys(Some(keychain))? + .len() as _; + self.cache_addresses(keychain, from, to_cache)?; + } + + // TODO: for the next runs, we cannot reuse progress obj (due to trait restriction) + let progress = Box::new(NoopProgress); + + missing_cache_count = if run_setup { + maybe_await!( + blockchain.wallet_setup(self.database.borrow_mut().deref_mut(), progress) + )? + } else { + maybe_await!( + blockchain.wallet_sync(self.database.borrow_mut().deref_mut(), progress) + )? + }; + + round += 1; + if round > MAX_ROUNDS { + warn!("sync has reached max rounds, faulty `Blockchain` implementation?"); + break; + } } let sync_time = SyncTime {