Skip to content

Commit

Permalink
Implement incremental Esplora syncing for the on-chain wallet
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Oct 3, 2024
1 parent 62ff22e commit b551fa3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 69 deletions.
119 changes: 74 additions & 45 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,71 @@ impl ChainSource {
})?;
}

let res =
{
let full_scan_request = onchain_wallet.get_full_scan_request();
let res = {
// If this is our first sync, do a full scan with the configured gap limit.
// Otherwise just do an incremental sync.
let incremental_sync =
latest_onchain_wallet_sync_timestamp.read().unwrap().is_some();

macro_rules! get_and_apply_wallet_update {
($sync_future: expr) => {{
let now = Instant::now();
match $sync_future.await {
Ok(res) => match res {
Ok(update) => match onchain_wallet.apply_update(update) {
Ok(()) => {
log_info!(
logger,
"{} of on-chain wallet finished in {}ms.",
if incremental_sync { "Incremental sync" } else { "Sync" },
now.elapsed().as_millis()
);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*latest_onchain_wallet_sync_timestamp.write().unwrap() =
unix_time_secs_opt;
Ok(())
},
Err(e) => Err(e),
},
Err(e) => match *e {
esplora_client::Error::Reqwest(he) => {
log_error!(
logger,
"{} of on-chain wallet failed due to HTTP connection error: {}",
if incremental_sync { "Incremental sync" } else { "Sync" },
he
);
Err(Error::WalletOperationFailed)
},
_ => {
log_error!(
logger,
"{} of on-chain wallet failed due to Esplora error: {}",
if incremental_sync { "Incremental sync" } else { "Sync" },
e
);
Err(Error::WalletOperationFailed)
},
},
},
Err(e) => {
log_error!(
logger,
"{} of on-chain wallet timed out: {}",
if incremental_sync { "Incremental sync" } else { "Sync" },
e
);
Err(Error::WalletOperationTimeout)
},
}
}}
}

if incremental_sync {
let full_scan_request = onchain_wallet.get_full_scan_request();
let wallet_sync_timeout_fut = tokio::time::timeout(
Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS),
esplora_client.full_scan(
Expand All @@ -240,48 +301,16 @@ impl ChainSource {
BDK_CLIENT_CONCURRENCY,
),
);

let now = Instant::now();
match wallet_sync_timeout_fut.await {
Ok(res) => match res {
Ok(update) => match onchain_wallet.apply_update(update) {
Ok(()) => {
log_info!(
logger,
"Sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*latest_onchain_wallet_sync_timestamp.write().unwrap() =
unix_time_secs_opt;
Ok(())
},
Err(e) => Err(e),
},
Err(e) => match *e {
esplora_client::Error::Reqwest(he) => {
log_error!(
logger,
"Sync failed due to HTTP connection error: {}",
he
);
Err(Error::WalletOperationFailed)
},
_ => {
log_error!(logger, "Sync of on-chain wallet failed due to Esplora error: {}", e);
Err(Error::WalletOperationFailed)
},
},
},
Err(e) => {
log_error!(logger, "On-chain wallet sync timed out: {}", e);
Err(Error::WalletOperationTimeout)
},
}
};
get_and_apply_wallet_update!(wallet_sync_timeout_fut)
} else {
let sync_request = onchain_wallet.get_incremental_sync_request();
let wallet_sync_timeout_fut = tokio::time::timeout(
Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS),
esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY),
);
get_and_apply_wallet_update!(wallet_sync_timeout_fut)
}
};

onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);

Expand Down
24 changes: 1 addition & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,31 +240,9 @@ impl Node {

// Block to ensure we update our fee rate cache once on startup
let chain_source = Arc::clone(&self.chain_source);
let sync_logger = Arc::clone(&self.logger);
let sync_fee_rate_update_timestamp =
Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let runtime_ref = &runtime;
tokio::task::block_in_place(move || {
runtime_ref.block_on(async move {
let now = Instant::now();
match chain_source.update_fee_rate_estimates().await {
Ok(()) => {
log_info!(
sync_logger,
"Initial fee rate cache update finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt;
Ok(())
},
Err(e) => {
log_error!(sync_logger, "Initial fee rate cache update failed: {}", e,);
Err(e)
},
}
})
runtime_ref.block_on(async move { chain_source.update_fee_rate_estimates().await })
})?;

// Spawn background task continuously syncing onchain, lightning, and fee rate cache.
Expand Down
6 changes: 5 additions & 1 deletion src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use lightning::sign::{
use lightning::util::message_signing;
use lightning_invoice::RawBolt11Invoice;

use bdk_chain::spk_client::FullScanRequest;
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
use bdk_chain::ChainPosition;
use bdk_wallet::{KeychainKind, PersistedWallet, SignOptions, Update};

Expand Down Expand Up @@ -80,6 +80,10 @@ where
self.inner.lock().unwrap().start_full_scan().build()
}

pub(crate) fn get_incremental_sync_request(&self) -> SyncRequest<(KeychainKind, u32)> {
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
}

pub(crate) fn apply_update(&self, update: impl Into<Update>) -> Result<(), Error> {
let mut locked_wallet = self.inner.lock().unwrap();
match locked_wallet.apply_update(update) {
Expand Down

0 comments on commit b551fa3

Please sign in to comment.