Skip to content

Commit

Permalink
fix wallet sync, bump version number for release
Browse files Browse the repository at this point in the history
  • Loading branch information
Ash-L2L committed Jun 21, 2024
1 parent f68d25b commit ae399c4
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 45 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ authors = [
"Nikita Chashchinskii <[email protected]>"
]
edition = "2021"
version = "0.9.2"
version = "0.9.3"

[workspace.dependencies.bip300301]
git = "https://github.com/Ash-L2L/bip300301.git"
Expand Down
107 changes: 72 additions & 35 deletions app/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use futures::{StreamExt, TryFutureExt};
use parking_lot::RwLock;
use rustreexo::accumulator::proof::Proof;
use thunder::{
Expand All @@ -10,7 +11,7 @@ use thunder::{
types::{self, OutPoint, Output, Transaction},
wallet::{self, Wallet},
};
use tokio::sync::RwLock as TokioRwLock;
use tokio::{spawn, sync::RwLock as TokioRwLock, task::JoinHandle};
use tokio_util::task::LocalPoolHandle;

use crate::cli::Config;
Expand All @@ -33,18 +34,67 @@ pub enum Error {
Wallet(#[from] wallet::Error),
}

fn update_wallet(node: &Node, wallet: &Wallet) -> Result<(), Error> {
let addresses = wallet.get_addresses()?;
let utxos = node.get_utxos_by_addresses(&addresses)?;
let outpoints: Vec<_> = wallet.get_utxos()?.into_keys().collect();
let spent: Vec<_> = node
.get_spent_utxos(&outpoints)?
.into_iter()
.map(|(outpoint, spent_output)| (outpoint, spent_output.inpoint))
.collect();
wallet.put_utxos(&utxos)?;
wallet.spend_utxos(&spent)?;
Ok(())
}

/// Update utxos & wallet
fn update(
node: &Node,
utxos: &mut HashMap<OutPoint, Output>,
wallet: &Wallet,
) -> Result<(), Error> {
let () = update_wallet(node, wallet)?;
*utxos = wallet.get_utxos()?;
Ok(())
}

#[derive(Clone)]
pub struct App {
pub node: Arc<Node>,
pub wallet: Wallet,
pub miner: Arc<TokioRwLock<Miner>>,
pub utxos: Arc<RwLock<HashMap<OutPoint, Output>>>,
task: Arc<JoinHandle<()>>,
pub transaction: Arc<RwLock<Transaction>>,
pub runtime: Arc<tokio::runtime::Runtime>,
pub local_pool: LocalPoolHandle,
}

impl App {
async fn task(
node: Arc<Node>,
utxos: Arc<RwLock<HashMap<OutPoint, Output>>>,
wallet: Wallet,
) -> Result<(), Error> {
let mut state_changes = node.watch_state();
while let Some(()) = state_changes.next().await {
let () = update(&node, &mut utxos.write(), &wallet)?;
}
Ok(())
}

fn spawn_task(
node: Arc<Node>,
utxos: Arc<RwLock<HashMap<OutPoint, Output>>>,
wallet: Wallet,
) -> JoinHandle<()> {
spawn(Self::task(node, utxos, wallet).unwrap_or_else(|err| {
let err = anyhow::Error::from(err);
tracing::error!("{err:#}")
}))
}

pub fn new(config: &Config) -> Result<Self, Error> {
// Node launches some tokio tasks for p2p networking, that is why we need a tokio runtime
// here.
Expand Down Expand Up @@ -72,7 +122,6 @@ impl App {
&config.main_password,
local_pool.clone(),
)?;
drop(rt_guard);
let utxos = {
let mut utxos = wallet.get_utxos()?;
let transactions = node.get_all_transactions()?;
Expand All @@ -81,13 +130,18 @@ impl App {
utxos.remove(outpoint);
}
}
utxos
Arc::new(RwLock::new(utxos))
};
let node = Arc::new(node);
let task =
Self::spawn_task(node.clone(), utxos.clone(), wallet.clone());
drop(rt_guard);
Ok(Self {
node: Arc::new(node),
node,
wallet,
miner: Arc::new(TokioRwLock::new(miner)),
utxos: Arc::new(RwLock::new(utxos)),
utxos,
task: Arc::new(task),
transaction: Arc::new(RwLock::new(Transaction {
inputs: vec![],
proof: Proof::default(),
Expand All @@ -98,10 +152,15 @@ impl App {
})
}

/// Update utxos & wallet
fn update(&self) -> Result<(), Error> {
update(self.node.as_ref(), &mut self.utxos.write(), &self.wallet)
}

pub fn sign_and_send(&self, tx: Transaction) -> Result<(), Error> {
let authorized_transaction = self.wallet.authorize(tx)?;
self.node.submit_transaction(authorized_transaction)?;
self.update_utxos()?;
let () = self.update()?;
Ok(())
}

Expand Down Expand Up @@ -191,39 +250,11 @@ impl App {
self.node.submit_block(main_hash, &header, &body).await?;
}
drop(miner_write);
self.update_wallet()?;
self.update_utxos()?;
let () = self.update()?;
self.node.regenerate_proof(&mut self.transaction.write())?;
Ok(())
}

fn update_wallet(&self) -> Result<(), Error> {
let addresses = self.wallet.get_addresses()?;
let utxos = self.node.get_utxos_by_addresses(&addresses)?;
let outpoints: Vec<_> = self.wallet.get_utxos()?.into_keys().collect();
let spent: Vec<_> = self
.node
.get_spent_utxos(&outpoints)?
.into_iter()
.map(|(outpoint, spent_output)| (outpoint, spent_output.inpoint))
.collect();
self.wallet.put_utxos(&utxos)?;
self.wallet.spend_utxos(&spent)?;
Ok(())
}

fn update_utxos(&self) -> Result<(), Error> {
let mut utxos = self.wallet.get_utxos()?;
let transactions = self.node.get_all_transactions()?;
for transaction in &transactions {
for (outpoint, _) in &transaction.transaction.inputs {
utxos.remove(outpoint);
}
}
*self.utxos.write() = utxos;
Ok(())
}

pub fn deposit(
&mut self,
amount: bitcoin::Amount,
Expand All @@ -249,3 +280,9 @@ impl App {
})
}
}

impl Drop for App {
fn drop(&mut self) {
self.task.abort()
}
}
7 changes: 7 additions & 0 deletions lib/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{

use bip300301::{bitcoin, DepositInfo};
use fallible_iterator::FallibleIterator;
use futures::Stream;
use tokio_util::task::LocalPoolHandle;

use crate::{
Expand All @@ -19,6 +20,7 @@ use crate::{
Body, GetValue, Header, OutPoint, Output, SpentOutput, Tip,
Transaction, Txid, WithdrawalBundle,
},
util::Watchable,
};

mod mainchain_task;
Expand Down Expand Up @@ -548,4 +550,9 @@ impl Node {
}
Ok(true)
}

/// Get a notification whenever the tip changes
pub fn watch_state(&self) -> impl Stream<Item = ()> {
self.state.watch()
}
}
20 changes: 15 additions & 5 deletions lib/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{BTreeMap, HashMap, HashSet};

use futures::Stream;
use heed::{types::SerdeBincode, Database, RoTxn, RwTxn};

use bip300301::{
Expand All @@ -19,7 +20,7 @@ use crate::{
PointedOutput, SpentOutput, Transaction, Txid, Verify,
WithdrawalBundle,
},
util::UnitKey,
util::{EnvExt, UnitKey, Watchable, WatchableDb},
};

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -84,7 +85,7 @@ pub enum Error {
#[derive(Clone)]
pub struct State {
/// Current tip
tip: Database<SerdeBincode<UnitKey>, SerdeBincode<BlockHash>>,
tip: WatchableDb<SerdeBincode<UnitKey>, SerdeBincode<BlockHash>>,
/// Current height
height: Database<SerdeBincode<UnitKey>, SerdeBincode<u32>>,
pub utxos: Database<SerdeBincode<OutPoint>, SerdeBincode<Output>>,
Expand All @@ -110,7 +111,7 @@ impl State {

pub fn new(env: &heed::Env) -> Result<Self, Error> {
let mut rwtxn = env.write_txn()?;
let tip = env.create_database(&mut rwtxn, Some("tip"))?;
let tip = env.create_watchable_db(&mut rwtxn, "tip")?;
let height = env.create_database(&mut rwtxn, Some("height"))?;
let utxos = env.create_database(&mut rwtxn, Some("utxos"))?;
let stxos = env.create_database(&mut rwtxn, Some("stxos"))?;
Expand All @@ -136,7 +137,7 @@ impl State {
}

pub fn get_tip(&self, rotxn: &RoTxn) -> Result<BlockHash, Error> {
let tip = self.tip.get(rotxn, &UnitKey)?.unwrap_or_default();
let tip = self.tip.try_get(rotxn, &UnitKey)?.unwrap_or_default();
Ok(tip)
}

Expand Down Expand Up @@ -936,7 +937,7 @@ impl State {
header: &Header,
body: &Body,
) -> Result<(), Error> {
let tip_hash = self.tip.get(rwtxn, &UnitKey)?.unwrap_or_default();
let tip_hash = self.tip.try_get(rwtxn, &UnitKey)?.unwrap_or_default();
if tip_hash != header.hash() {
let err = InvalidHeaderError::BlockHash {
expected: tip_hash,
Expand Down Expand Up @@ -1070,3 +1071,12 @@ impl State {
Ok(total_wealth)
}
}

impl Watchable<()> for State {
type WatchStream = impl Stream<Item = ()>;

/// Get a signal that notifies whenever the tip changes
fn watch(&self) -> Self::WatchStream {
tokio_stream::wrappers::WatchStream::new(self.tip.watch())
}
}

0 comments on commit ae399c4

Please sign in to comment.