Skip to content

Commit

Permalink
feat(wallet)!: change persist API to use StageExt and StageExtAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin committed Jun 13, 2024
1 parent e004e93 commit ded0bfe
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 49 deletions.
1 change: 1 addition & 0 deletions crates/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ js-sys = "0.3"
[features]
default = ["std"]
std = ["bitcoin/std", "miniscript/std", "bdk_chain/std"]
async = ["bdk_chain/async"]
compiler = ["miniscript/compiler"]
all-keys = ["keys-bip39"]
keys-bip39 = ["bip39"]
Expand Down
91 changes: 56 additions & 35 deletions crates/wallet/src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use bdk_chain::{
local_chain::{
self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain,
},
persist::{PersistBackend, StageExt},
spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult},
tx_graph::{CanonicalTx, TxGraph},
Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeHeightAnchor, FullTxOut,
Expand All @@ -40,7 +41,6 @@ use bitcoin::{
use bitcoin::{consensus::encode::serialize, transaction, BlockHash, Psbt};
use bitcoin::{constants::genesis_block, Amount};
use core::fmt;
use core::mem;
use core::ops::Deref;
use descriptor::error::Error as DescriptorError;
use miniscript::psbt::{PsbtExt, PsbtInputExt, PsbtInputSatisfier};
Expand Down Expand Up @@ -393,18 +393,6 @@ impl Wallet {
})
}

/// Stage a ['ChangeSet'] to be persisted later.
///
/// [`commit`]: Self::commit
fn stage(&mut self, changeset: ChangeSet) {
self.stage.append(changeset)
}

/// Take the staged [`ChangeSet`] to be persisted now.
pub fn take_staged(&mut self) -> ChangeSet {
mem::take(&mut self.stage)
}

/// Load [`Wallet`] from the given previously persisted [`ChangeSet`].
///
/// Note that the descriptor secret keys are not persisted to the db; this means that after
Expand Down Expand Up @@ -687,7 +675,7 @@ impl Wallet {
/// # let changeset = ChangeSet::default();
/// # let mut wallet = Wallet::load_from_changeset(changeset).expect("load wallet");
/// let next_address = wallet.reveal_next_address(KeychainKind::External);
/// db.write_changes(&wallet.take_staged())?;
/// wallet.commit_to(&mut db)?;
///
/// // Now it's safe to show the user their next address!
/// println!("Next address: {}", next_address.address);
Expand Down Expand Up @@ -730,7 +718,7 @@ impl Wallet {
.reveal_to_target(&keychain, index)
.expect("keychain must exist");

self.stage(indexed_tx_graph::ChangeSet::from(index_changeset).into());
self.stage.append(index_changeset.into());

spk_iter.map(move |(index, spk)| AddressInfo {
index,
Expand Down Expand Up @@ -913,7 +901,7 @@ impl Wallet {
/// [`list_output`]: Self::list_output
pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) {
let additions = self.indexed_graph.insert_txout(outpoint, txout);
self.stage(ChangeSet::from(additions));
self.stage.append(additions.into());
}

/// Calculates the fee of a given transaction. Returns [`Amount::ZERO`] if `tx` is a coinbase transaction.
Expand Down Expand Up @@ -1082,7 +1070,7 @@ impl Wallet {
) -> Result<bool, local_chain::AlterCheckPointError> {
let changeset = self.chain.insert_block(block_id)?;
let changed = !changeset.is_empty();
self.stage(changeset.into());
self.stage.append(changeset.into());
Ok(changed)
}

Expand Down Expand Up @@ -1144,7 +1132,7 @@ impl Wallet {
}

let changed = !changeset.is_empty();
self.stage(changeset);
self.stage.append(changeset);
Ok(changed)
}

Expand Down Expand Up @@ -1469,9 +1457,7 @@ impl Wallet {
.expect("keychain must exist");
let spk = spk.into();
self.indexed_graph.index.mark_used(change_keychain, index);
self.stage(ChangeSet::from(indexed_tx_graph::ChangeSet::from(
index_changeset,
)));
self.stage.append(index_changeset.into());
spk
}
};
Expand Down Expand Up @@ -2281,25 +2267,60 @@ impl Wallet {
/// [`commit`]: Self::commit
pub fn apply_update(&mut self, update: impl Into<Update>) -> Result<(), CannotConnectError> {
let update = update.into();
let mut changeset = match update.chain {
Some(chain_update) => ChangeSet::from(self.chain.apply_update(chain_update)?),
None => ChangeSet::default(),
};

let mut changeset = ChangeSet::default();
if let Some(chain_update) = update.chain {
changeset.append(self.chain.apply_update(chain_update)?.into());
}
let (_, index_changeset) = self
.indexed_graph
.index
.reveal_to_target_multi(&update.last_active_indices);
changeset.append(ChangeSet::from(indexed_tx_graph::ChangeSet::from(
index_changeset,
)));
changeset.append(ChangeSet::from(
self.indexed_graph.apply_update(update.graph),
));
self.stage(changeset);
changeset.append(index_changeset.into());
changeset.append(self.indexed_graph.apply_update(update.graph).into());
self.stage.append(changeset);
Ok(())
}

/// Commits all currently [`staged`](Wallet::staged) changes to the `persist_backend`.
///
/// This returns whether anything was persisted.
///
/// # Error
///
/// Returns a backend-defined error if this fails.
pub fn commit_to<B>(&mut self, persist_backend: &mut B) -> Result<bool, B::WriteError>
where
B: PersistBackend<ChangeSet>,
{
let committed = StageExt::commit_to(&mut self.stage, persist_backend)?;
Ok(committed.is_some())
}

/// Commits all currently [`staged`](Wallet::staged) changes to the async `persist_backend`.
///
/// This returns whether anything was persisted.
///
/// # Error
///
/// Returns a backend-defined error if this fails.
#[cfg(feature = "async")]
pub async fn commit_to_async<B>(
&mut self,
persist_backend: &mut B,
) -> Result<bool, B::WriteError>
where
B: bdk_chain::persist::PersistBackendAsync<ChangeSet> + Send + Sync,
{
let committed =
bdk_chain::persist::StageExtAsync::commit_to(&mut self.stage, persist_backend).await?;
Ok(committed.is_some())
}

/// Get the staged [`ChangeSet`] that is yet to be committed.
pub fn staged(&self) -> &ChangeSet {
&self.stage
}

/// Get a reference to the inner [`TxGraph`].
pub fn tx_graph(&self) -> &TxGraph<ConfirmationTimeHeightAnchor> {
self.indexed_graph.graph()
Expand Down Expand Up @@ -2369,7 +2390,7 @@ impl Wallet {
.apply_block_relevant(block, height)
.into(),
);
self.stage(changeset);
self.stage.append(changeset);
Ok(())
}

Expand All @@ -2392,7 +2413,7 @@ impl Wallet {
let indexed_graph_changeset = self
.indexed_graph
.batch_insert_relevant_unconfirmed(unconfirmed_txs);
self.stage(ChangeSet::from(indexed_graph_changeset));
self.stage.append(indexed_graph_changeset.into());
}
}

Expand Down
11 changes: 5 additions & 6 deletions crates/wallet/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ fn load_recovers_wallet() -> anyhow::Result<()> {
wallet.reveal_next_address(KeychainKind::External);

// persist new wallet changes
let staged_changeset = wallet.take_staged();
let db = &mut create_new(&file_path).expect("must create db");
db.write_changes(&staged_changeset)
let mut db = create_new(&file_path).expect("must create db");
wallet
.commit_to(&mut db)
.map_err(|e| anyhow!("write changes error: {}", e))?;

wallet.spk_index().clone()
};

Expand Down Expand Up @@ -158,9 +157,9 @@ fn new_or_load() -> anyhow::Result<()> {
let wallet_keychains: BTreeMap<_, _> = {
let wallet = &mut Wallet::new_or_load(desc, change_desc, None, Network::Testnet)
.expect("must init wallet");
let staged_changeset = wallet.take_staged();
let mut db = new_or_load(&file_path).expect("must create db");
db.write_changes(&staged_changeset)
wallet
.commit_to(&mut db)
.map_err(|e| anyhow!("write changes error: {}", e))?;
wallet.keychains().map(|(k, v)| (*k, v.clone())).collect()
};
Expand Down
4 changes: 2 additions & 2 deletions example-crates/wallet_electrum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn main() -> Result<(), anyhow::Error> {
)?;

let address = wallet.next_unused_address(KeychainKind::External);
db.write_changes(&wallet.take_staged())?;
wallet.commit_to(&mut db)?;
println!("Generated Address: {}", address);

let balance = wallet.balance();
Expand Down Expand Up @@ -72,7 +72,7 @@ fn main() -> Result<(), anyhow::Error> {
println!();

wallet.apply_update(update)?;
db.write_changes(&wallet.take_staged())?;
wallet.commit_to(&mut db)?;

let balance = wallet.balance();
println!("Wallet balance after syncing: {} sats", balance.total());
Expand Down
4 changes: 2 additions & 2 deletions example-crates/wallet_esplora_async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() -> Result<(), anyhow::Error> {
)?;

let address = wallet.next_unused_address(KeychainKind::External);
db.write_changes(&wallet.take_staged())?;
wallet.commit_to(&mut db)?;
println!("Generated Address: {}", address);

let balance = wallet.balance();
Expand Down Expand Up @@ -78,7 +78,7 @@ async fn main() -> Result<(), anyhow::Error> {
let _ = update.graph_update.update_last_seen_unconfirmed(now);

wallet.apply_update(update)?;
db.write_changes(&wallet.take_staged())?;
wallet.commit_to(&mut db)?;
println!();

let balance = wallet.balance();
Expand Down
4 changes: 2 additions & 2 deletions example-crates/wallet_esplora_blocking/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn main() -> Result<(), anyhow::Error> {
)?;

let address = wallet.next_unused_address(KeychainKind::External);
db.write_changes(&wallet.take_staged())?;
wallet.commit_to(&mut db)?;
println!("Generated Address: {}", address);

let balance = wallet.balance();
Expand All @@ -55,7 +55,7 @@ fn main() -> Result<(), anyhow::Error> {
let _ = update.graph_update.update_last_seen_unconfirmed(now);

wallet.apply_update(update)?;
db.write_changes(&wallet.take_staged())?;
wallet.commit_to(&mut db)?;
println!();

let balance = wallet.balance();
Expand Down
4 changes: 2 additions & 2 deletions example-crates/wallet_rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn main() -> anyhow::Result<()> {
let connected_to = block_emission.connected_to();
let start_apply_block = Instant::now();
wallet.apply_block_connected_to(&block_emission.block, height, connected_to)?;
db.write_changes(&wallet.take_staged())?;
wallet.commit_to(&mut db)?;
let elapsed = start_apply_block.elapsed().as_secs_f32();
println!(
"Applied block {} at height {} in {}s",
Expand All @@ -157,7 +157,7 @@ fn main() -> anyhow::Result<()> {
Emission::Mempool(mempool_emission) => {
let start_apply_mempool = Instant::now();
wallet.apply_unconfirmed_txs(mempool_emission.iter().map(|(tx, time)| (tx, *time)));
db.write_changes(&wallet.take_staged())?;
wallet.commit_to(&mut db)?;
println!(
"Applied unconfirmed transactions in {}s",
start_apply_mempool.elapsed().as_secs_f32()
Expand Down

0 comments on commit ded0bfe

Please sign in to comment.