From ae399c4f9c4e2eff42b4b243f6aaab05d71de4ef Mon Sep 17 00:00:00 2001 From: Ash Manning Date: Fri, 21 Jun 2024 21:05:11 +0800 Subject: [PATCH] fix wallet sync, bump version number for release --- Cargo.lock | 8 ++-- Cargo.toml | 2 +- app/app.rs | 107 ++++++++++++++++++++++++++++++++---------------- lib/node/mod.rs | 7 ++++ lib/state.rs | 20 ++++++--- 5 files changed, 99 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ece839d..7db7b0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4532,7 +4532,7 @@ dependencies = [ [[package]] name = "thunder" -version = "0.9.2" +version = "0.9.3" dependencies = [ "anyhow", "bincode", @@ -4570,7 +4570,7 @@ dependencies = [ [[package]] name = "thunder_app" -version = "0.9.2" +version = "0.9.3" dependencies = [ "anyhow", "base64 0.21.7", @@ -4607,7 +4607,7 @@ dependencies = [ [[package]] name = "thunder_app_cli" -version = "0.9.2" +version = "0.9.3" dependencies = [ "anyhow", "bip300301", @@ -4622,7 +4622,7 @@ dependencies = [ [[package]] name = "thunder_app_rpc_api" -version = "0.9.2" +version = "0.9.3" dependencies = [ "bip300301", "jsonrpsee", diff --git a/Cargo.toml b/Cargo.toml index 27fa1be..7975000 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ authors = [ "Nikita Chashchinskii " ] edition = "2021" -version = "0.9.2" +version = "0.9.3" [workspace.dependencies.bip300301] git = "https://github.com/Ash-L2L/bip300301.git" diff --git a/app/app.rs b/app/app.rs index 27358e8..9ba1cdd 100644 --- a/app/app.rs +++ b/app/app.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use futures::{StreamExt, TryFutureExt}; use parking_lot::RwLock; use rustreexo::accumulator::proof::Proof; use thunder::{ @@ -10,7 +11,7 @@ use thunder::{ types::{self, OutPoint, Output, Transaction}, wallet::{self, Wallet}, }; -use tokio::sync::RwLock as TokioRwLock; +use tokio::{spawn, sync::RwLock as TokioRwLock, task::JoinHandle}; use tokio_util::task::LocalPoolHandle; use crate::cli::Config; @@ -33,18 +34,67 @@ pub enum Error { Wallet(#[from] wallet::Error), } +fn update_wallet(node: &Node, wallet: &Wallet) -> Result<(), Error> { + let addresses = wallet.get_addresses()?; + let utxos = node.get_utxos_by_addresses(&addresses)?; + let outpoints: Vec<_> = wallet.get_utxos()?.into_keys().collect(); + let spent: Vec<_> = node + .get_spent_utxos(&outpoints)? + .into_iter() + .map(|(outpoint, spent_output)| (outpoint, spent_output.inpoint)) + .collect(); + wallet.put_utxos(&utxos)?; + wallet.spend_utxos(&spent)?; + Ok(()) +} + +/// Update utxos & wallet +fn update( + node: &Node, + utxos: &mut HashMap, + wallet: &Wallet, +) -> Result<(), Error> { + let () = update_wallet(node, wallet)?; + *utxos = wallet.get_utxos()?; + Ok(()) +} + #[derive(Clone)] pub struct App { pub node: Arc, pub wallet: Wallet, pub miner: Arc>, pub utxos: Arc>>, + task: Arc>, pub transaction: Arc>, pub runtime: Arc, pub local_pool: LocalPoolHandle, } impl App { + async fn task( + node: Arc, + utxos: Arc>>, + wallet: Wallet, + ) -> Result<(), Error> { + let mut state_changes = node.watch_state(); + while let Some(()) = state_changes.next().await { + let () = update(&node, &mut utxos.write(), &wallet)?; + } + Ok(()) + } + + fn spawn_task( + node: Arc, + utxos: Arc>>, + wallet: Wallet, + ) -> JoinHandle<()> { + spawn(Self::task(node, utxos, wallet).unwrap_or_else(|err| { + let err = anyhow::Error::from(err); + tracing::error!("{err:#}") + })) + } + pub fn new(config: &Config) -> Result { // Node launches some tokio tasks for p2p networking, that is why we need a tokio runtime // here. @@ -72,7 +122,6 @@ impl App { &config.main_password, local_pool.clone(), )?; - drop(rt_guard); let utxos = { let mut utxos = wallet.get_utxos()?; let transactions = node.get_all_transactions()?; @@ -81,13 +130,18 @@ impl App { utxos.remove(outpoint); } } - utxos + Arc::new(RwLock::new(utxos)) }; + let node = Arc::new(node); + let task = + Self::spawn_task(node.clone(), utxos.clone(), wallet.clone()); + drop(rt_guard); Ok(Self { - node: Arc::new(node), + node, wallet, miner: Arc::new(TokioRwLock::new(miner)), - utxos: Arc::new(RwLock::new(utxos)), + utxos, + task: Arc::new(task), transaction: Arc::new(RwLock::new(Transaction { inputs: vec![], proof: Proof::default(), @@ -98,10 +152,15 @@ impl App { }) } + /// Update utxos & wallet + fn update(&self) -> Result<(), Error> { + update(self.node.as_ref(), &mut self.utxos.write(), &self.wallet) + } + pub fn sign_and_send(&self, tx: Transaction) -> Result<(), Error> { let authorized_transaction = self.wallet.authorize(tx)?; self.node.submit_transaction(authorized_transaction)?; - self.update_utxos()?; + let () = self.update()?; Ok(()) } @@ -191,39 +250,11 @@ impl App { self.node.submit_block(main_hash, &header, &body).await?; } drop(miner_write); - self.update_wallet()?; - self.update_utxos()?; + let () = self.update()?; self.node.regenerate_proof(&mut self.transaction.write())?; Ok(()) } - fn update_wallet(&self) -> Result<(), Error> { - let addresses = self.wallet.get_addresses()?; - let utxos = self.node.get_utxos_by_addresses(&addresses)?; - let outpoints: Vec<_> = self.wallet.get_utxos()?.into_keys().collect(); - let spent: Vec<_> = self - .node - .get_spent_utxos(&outpoints)? - .into_iter() - .map(|(outpoint, spent_output)| (outpoint, spent_output.inpoint)) - .collect(); - self.wallet.put_utxos(&utxos)?; - self.wallet.spend_utxos(&spent)?; - Ok(()) - } - - fn update_utxos(&self) -> Result<(), Error> { - let mut utxos = self.wallet.get_utxos()?; - let transactions = self.node.get_all_transactions()?; - for transaction in &transactions { - for (outpoint, _) in &transaction.transaction.inputs { - utxos.remove(outpoint); - } - } - *self.utxos.write() = utxos; - Ok(()) - } - pub fn deposit( &mut self, amount: bitcoin::Amount, @@ -249,3 +280,9 @@ impl App { }) } } + +impl Drop for App { + fn drop(&mut self) { + self.task.abort() + } +} diff --git a/lib/node/mod.rs b/lib/node/mod.rs index ac83276..02b0645 100644 --- a/lib/node/mod.rs +++ b/lib/node/mod.rs @@ -7,6 +7,7 @@ use std::{ use bip300301::{bitcoin, DepositInfo}; use fallible_iterator::FallibleIterator; +use futures::Stream; use tokio_util::task::LocalPoolHandle; use crate::{ @@ -19,6 +20,7 @@ use crate::{ Body, GetValue, Header, OutPoint, Output, SpentOutput, Tip, Transaction, Txid, WithdrawalBundle, }, + util::Watchable, }; mod mainchain_task; @@ -548,4 +550,9 @@ impl Node { } Ok(true) } + + /// Get a notification whenever the tip changes + pub fn watch_state(&self) -> impl Stream { + self.state.watch() + } } diff --git a/lib/state.rs b/lib/state.rs index 84653c1..e6ce533 100644 --- a/lib/state.rs +++ b/lib/state.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, HashMap, HashSet}; +use futures::Stream; use heed::{types::SerdeBincode, Database, RoTxn, RwTxn}; use bip300301::{ @@ -19,7 +20,7 @@ use crate::{ PointedOutput, SpentOutput, Transaction, Txid, Verify, WithdrawalBundle, }, - util::UnitKey, + util::{EnvExt, UnitKey, Watchable, WatchableDb}, }; #[derive(Debug, thiserror::Error)] @@ -84,7 +85,7 @@ pub enum Error { #[derive(Clone)] pub struct State { /// Current tip - tip: Database, SerdeBincode>, + tip: WatchableDb, SerdeBincode>, /// Current height height: Database, SerdeBincode>, pub utxos: Database, SerdeBincode>, @@ -110,7 +111,7 @@ impl State { pub fn new(env: &heed::Env) -> Result { let mut rwtxn = env.write_txn()?; - let tip = env.create_database(&mut rwtxn, Some("tip"))?; + let tip = env.create_watchable_db(&mut rwtxn, "tip")?; let height = env.create_database(&mut rwtxn, Some("height"))?; let utxos = env.create_database(&mut rwtxn, Some("utxos"))?; let stxos = env.create_database(&mut rwtxn, Some("stxos"))?; @@ -136,7 +137,7 @@ impl State { } pub fn get_tip(&self, rotxn: &RoTxn) -> Result { - let tip = self.tip.get(rotxn, &UnitKey)?.unwrap_or_default(); + let tip = self.tip.try_get(rotxn, &UnitKey)?.unwrap_or_default(); Ok(tip) } @@ -936,7 +937,7 @@ impl State { header: &Header, body: &Body, ) -> Result<(), Error> { - let tip_hash = self.tip.get(rwtxn, &UnitKey)?.unwrap_or_default(); + let tip_hash = self.tip.try_get(rwtxn, &UnitKey)?.unwrap_or_default(); if tip_hash != header.hash() { let err = InvalidHeaderError::BlockHash { expected: tip_hash, @@ -1070,3 +1071,12 @@ impl State { Ok(total_wealth) } } + +impl Watchable<()> for State { + type WatchStream = impl Stream; + + /// Get a signal that notifies whenever the tip changes + fn watch(&self) -> Self::WatchStream { + tokio_stream::wrappers::WatchStream::new(self.tip.watch()) + } +}