Skip to content

Commit

Permalink
feat(chain)!: Add time_of_sync to SyncRequest and FullScanRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
LagginTimes committed Aug 21, 2024
1 parent acccb59 commit e47204a
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 146 deletions.
14 changes: 14 additions & 0 deletions crates/chain/src/spk_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ pub struct SyncRequest<I = ()> {
txids_consumed: usize,
outpoints: VecDeque<OutPoint>,
outpoints_consumed: usize,
time_of_sync: u64,
inspect: Box<InspectSync<I>>,
}

Expand All @@ -252,6 +253,7 @@ impl<I> Default for SyncRequest<I> {
txids_consumed: 0,
outpoints: VecDeque::new(),
outpoints_consumed: 0,
time_of_sync: std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(),
inspect: Box::new(|_, _| {}),
}
}
Expand Down Expand Up @@ -333,6 +335,11 @@ impl<I> SyncRequest<I> {
SyncIter::<I, OutPoint>::new(self)
}

/// Retrive the `time_of_sync`.
pub fn get_time_of_sync(&self) -> u64 {
self.time_of_sync
}

fn _call_inspect(&mut self, item: SyncItem<I>) {
let progress = self.progress();
(*self.inspect)(item, progress);
Expand Down Expand Up @@ -435,6 +442,7 @@ impl<K: Ord> FullScanRequestBuilder<K> {
pub struct FullScanRequest<K> {
chain_tip: Option<CheckPoint>,
spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
time_of_sync: u64,
inspect: Box<InspectFullScan<K>>,
}

Expand All @@ -449,6 +457,7 @@ impl<K> Default for FullScanRequest<K> {
Self {
chain_tip: None,
spks_by_keychain: Default::default(),
time_of_sync: std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(),
inspect: Box::new(|_, _, _| {}),
}
}
Expand Down Expand Up @@ -488,6 +497,11 @@ impl<K: Ord + Clone> FullScanRequest<K> {
inspect,
}
}

/// Retrive the `time_of_sync`.
pub fn get_time_of_sync(&self) -> u64 {
self.time_of_sync
}
}

/// Data returned from a spk-based blockchain client full scan.
Expand Down
65 changes: 1 addition & 64 deletions crates/chain/src/tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,76 +565,13 @@ impl<A: Clone + Ord> TxGraph<A> {

/// Inserts the given `seen_at` for `txid` into [`TxGraph`].
///
/// Note that [`TxGraph`] only keeps track of the latest `seen_at`. To batch
/// update all unconfirmed transactions with the latest `seen_at`, see
/// [`update_last_seen_unconfirmed`].
///
/// [`update_last_seen_unconfirmed`]: Self::update_last_seen_unconfirmed
/// Note that [`TxGraph`] only keeps track of the latest `seen_at`.
pub fn insert_seen_at(&mut self, txid: Txid, seen_at: u64) -> ChangeSet<A> {
let mut update = Self::default();
update.last_seen.insert(txid, seen_at);
self.apply_update(update)
}

/// Update the last seen time for all unconfirmed transactions.
///
/// This method updates the last seen unconfirmed time for this [`TxGraph`] by inserting
/// the given `seen_at` for every transaction not yet anchored to a confirmed block,
/// and returns the [`ChangeSet`] after applying all updates to `self`.
///
/// This is useful for keeping track of the latest time a transaction was seen
/// unconfirmed, which is important for evaluating transaction conflicts in the same
/// [`TxGraph`]. For details of how [`TxGraph`] resolves conflicts, see the docs for
/// [`try_get_chain_position`].
///
/// A normal use of this method is to call it with the current system time. Although
/// block headers contain a timestamp, using the header time would be less effective
/// at tracking mempool transactions, because it can drift from actual clock time, plus
/// we may want to update a transaction's last seen time repeatedly between blocks.
///
/// # Example
///
/// ```rust
/// # use bdk_chain::example_utils::*;
/// # use std::time::UNIX_EPOCH;
/// # let tx = tx_from_hex(RAW_TX_1);
/// # let mut tx_graph = bdk_chain::TxGraph::<()>::new([tx]);
/// let now = std::time::SystemTime::now()
/// .duration_since(UNIX_EPOCH)
/// .expect("valid duration")
/// .as_secs();
/// let changeset = tx_graph.update_last_seen_unconfirmed(now);
/// assert!(!changeset.last_seen.is_empty());
/// ```
///
/// Note that [`TxGraph`] only keeps track of the latest `seen_at`, so the given time must
/// by strictly greater than what is currently stored for a transaction to have an effect.
/// To insert a last seen time for a single txid, see [`insert_seen_at`].
///
/// [`insert_seen_at`]: Self::insert_seen_at
/// [`try_get_chain_position`]: Self::try_get_chain_position
pub fn update_last_seen_unconfirmed(&mut self, seen_at: u64) -> ChangeSet<A> {
let mut changeset = ChangeSet::default();
let unanchored_txs: Vec<Txid> = self
.txs
.iter()
.filter_map(
|(&txid, (_, anchors))| {
if anchors.is_empty() {
Some(txid)
} else {
None
}
},
)
.collect();

for txid in unanchored_txs {
changeset.merge(self.insert_seen_at(txid, seen_at));
}
changeset
}

/// Extends this graph with another so that `self` becomes the union of the two sets of
/// transactions.
///
Expand Down
36 changes: 0 additions & 36 deletions crates/chain/tests/test_tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,42 +1088,6 @@ fn test_changeset_last_seen_merge() {
}
}

#[test]
fn update_last_seen_unconfirmed() {
let mut graph = TxGraph::<()>::default();
let tx = new_tx(0);
let txid = tx.compute_txid();

// insert a new tx
// initially we have a last_seen of None and no anchors
let _ = graph.insert_tx(tx);
let tx = graph.full_txs().next().unwrap();
assert_eq!(tx.last_seen_unconfirmed, None);
assert!(tx.anchors.is_empty());

// higher timestamp should update last seen
let changeset = graph.update_last_seen_unconfirmed(2);
assert_eq!(changeset.last_seen.get(&txid).unwrap(), &2);

// lower timestamp has no effect
let changeset = graph.update_last_seen_unconfirmed(1);
assert!(changeset.last_seen.is_empty());

// once anchored, last seen is not updated
let _ = graph.insert_anchor(txid, ());
let changeset = graph.update_last_seen_unconfirmed(4);
assert!(changeset.is_empty());
assert_eq!(
graph
.full_txs()
.next()
.unwrap()
.last_seen_unconfirmed
.unwrap(),
2
);
}

#[test]
fn transactions_inserted_into_tx_graph_are_not_canonical_until_they_have_an_anchor_in_best_chain() {
let txs = vec![new_tx(0), new_tx(1)];
Expand Down
47 changes: 38 additions & 9 deletions crates/electrum/src/bdk_electrum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,19 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
None => None,
};
let time_of_sync = request.get_time_of_sync();

let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
let mut last_active_indices = BTreeMap::<K, u32>::default();
for keychain in request.keychains() {
let spks = request.iter_spks(keychain.clone());
if let Some(last_active_index) =
self.populate_with_spks(&mut graph_update, spks, stop_gap, batch_size)?
{
if let Some(last_active_index) = self.populate_with_spks(
&mut graph_update,
spks,
stop_gap,
batch_size,
time_of_sync,
)? {
last_active_indices.insert(keychain, last_active_index);
}
}
Expand Down Expand Up @@ -204,6 +209,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?),
None => None,
};
let time_of_sync = request.get_time_of_sync();

let mut graph_update = TxGraph::<ConfirmationBlockTime>::default();
self.populate_with_spks(
Expand All @@ -214,9 +220,10 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
.map(|(i, spk)| (i as u32, spk)),
usize::MAX,
batch_size,
time_of_sync,
)?;
self.populate_with_txids(&mut graph_update, request.iter_txids())?;
self.populate_with_outpoints(&mut graph_update, request.iter_outpoints())?;
self.populate_with_txids(&mut graph_update, request.iter_txids(), time_of_sync)?;
self.populate_with_outpoints(&mut graph_update, request.iter_outpoints(), time_of_sync)?;

// Fetch previous `TxOut`s for fee calculation if flag is enabled.
if fetch_prev_txouts {
Expand Down Expand Up @@ -249,6 +256,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
mut spks: impl Iterator<Item = (u32, ScriptBuf)>,
stop_gap: usize,
batch_size: usize,
time_of_sync: u64,
) -> Result<Option<u32>, Error> {
let mut unused_spk_count = 0_usize;
let mut last_active_index = Option::<u32>::None;
Expand Down Expand Up @@ -279,7 +287,12 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {

for tx_res in spk_history {
let _ = graph_update.insert_tx(self.fetch_tx(tx_res.tx_hash)?);
self.validate_merkle_for_anchor(graph_update, tx_res.tx_hash, tx_res.height)?;
self.validate_merkle_for_anchor(
graph_update,
tx_res.tx_hash,
tx_res.height,
time_of_sync,
)?;
}
}
}
Expand All @@ -293,6 +306,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
outpoints: impl IntoIterator<Item = OutPoint>,
time_of_sync: u64,
) -> Result<(), Error> {
for outpoint in outpoints {
let op_txid = outpoint.txid;
Expand All @@ -315,7 +329,12 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
if !has_residing && res.tx_hash == op_txid {
has_residing = true;
let _ = graph_update.insert_tx(Arc::clone(&op_tx));
self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
self.validate_merkle_for_anchor(
graph_update,
res.tx_hash,
res.height,
time_of_sync,
)?;
}

if !has_spending && res.tx_hash != op_txid {
Expand All @@ -329,7 +348,12 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
continue;
}
let _ = graph_update.insert_tx(Arc::clone(&res_tx));
self.validate_merkle_for_anchor(graph_update, res.tx_hash, res.height)?;
self.validate_merkle_for_anchor(
graph_update,
res.tx_hash,
res.height,
time_of_sync,
)?;
}
}
}
Expand All @@ -341,6 +365,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
&self,
graph_update: &mut TxGraph<ConfirmationBlockTime>,
txids: impl IntoIterator<Item = Txid>,
time_of_sync: u64,
) -> Result<(), Error> {
for txid in txids {
let tx = match self.fetch_tx(txid) {
Expand All @@ -363,7 +388,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
.into_iter()
.find(|r| r.tx_hash == txid)
{
self.validate_merkle_for_anchor(graph_update, txid, r.height)?;
self.validate_merkle_for_anchor(graph_update, txid, r.height, time_of_sync)?;
}

let _ = graph_update.insert_tx(tx);
Expand All @@ -378,6 +403,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
graph_update: &mut TxGraph<ConfirmationBlockTime>,
txid: Txid,
confirmation_height: i32,
time_of_sync: u64,
) -> Result<(), Error> {
if let Ok(merkle_res) = self
.inner
Expand Down Expand Up @@ -413,6 +439,9 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
},
);
}
} else {
// If no merkle proof is returned, then the tx is unconfirmed and we set the last_seen.
let _ = graph_update.insert_seen_at(txid, time_of_sync);
}
Ok(())
}
Expand Down
9 changes: 1 addition & 8 deletions crates/electrum/tests/test_electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,12 @@ where
Spks: IntoIterator<Item = ScriptBuf>,
Spks::IntoIter: ExactSizeIterator + Send + 'static,
{
let mut update = client.sync(
let update = client.sync(
SyncRequest::builder().chain_tip(chain.tip()).spks(spks),
BATCH_SIZE,
true,
)?;

// Update `last_seen` to be able to calculate balance for unconfirmed transactions.
let now = std::time::UNIX_EPOCH
.elapsed()
.expect("must get time")
.as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);

if let Some(chain_update) = update.chain_update.clone() {
let _ = chain
.apply_update(chain_update)
Expand Down
4 changes: 2 additions & 2 deletions crates/wallet/src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2446,10 +2446,10 @@ impl Wallet {
.revealed_spks_from_indexer(&self.indexed_graph.index, ..)
}

/// Create a [`FullScanRequest] for this wallet.
/// Create a [`FullScanRequest`] for this wallet.
///
/// This is the first step when performing a spk-based wallet full scan, the returned
/// [`FullScanRequest] collects iterators for the wallet's keychain script pub keys needed to
/// [`FullScanRequest`] collects iterators for the wallet's keychain script pub keys needed to
/// start a blockchain full scan with a spk based blockchain client.
///
/// This operation is generally only used when importing or restoring a previously used wallet
Expand Down
8 changes: 1 addition & 7 deletions example-crates/example_electrum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ fn main() -> anyhow::Result<()> {
// Tell the electrum client about the txs we've already got locally so it doesn't re-download them
client.populate_tx_cache(&*graph.lock().unwrap());

let (chain_update, mut graph_update, keychain_update) = match electrum_cmd.clone() {
let (chain_update, graph_update, keychain_update) = match electrum_cmd.clone() {
ElectrumCommands::Scan {
stop_gap,
scan_options,
Expand Down Expand Up @@ -248,12 +248,6 @@ fn main() -> anyhow::Result<()> {
}
};

let now = std::time::UNIX_EPOCH
.elapsed()
.expect("must get time")
.as_secs();
let _ = graph_update.update_last_seen_unconfirmed(now);

let db_changeset = {
let mut chain = chain.lock().unwrap();
let mut graph = graph.lock().unwrap();
Expand Down
12 changes: 2 additions & 10 deletions example-crates/example_esplora/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,10 @@ fn main() -> anyhow::Result<()> {
// is reached. It returns a `TxGraph` update (`graph_update`) and a structure that
// represents the last active spk derivation indices of keychains
// (`keychain_indices_update`).
let mut update = client
let update = client
.full_scan(request, *stop_gap, scan_options.parallel_requests)
.context("scanning for transactions")?;

// We want to keep track of the latest time a transaction was seen unconfirmed.
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);

let mut graph = graph.lock().expect("mutex must not be poisoned");
let mut chain = chain.lock().expect("mutex must not be poisoned");
// Because we did a stop gap based scan we are likely to have some updates to our
Expand Down Expand Up @@ -265,11 +261,7 @@ fn main() -> anyhow::Result<()> {
}
}

let mut update = client.sync(request, scan_options.parallel_requests)?;

// Update last seen unconfirmed
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);
let update = client.sync(request, scan_options.parallel_requests)?;

(
chain
Expand Down
Loading

0 comments on commit e47204a

Please sign in to comment.