Skip to content

Commit

Permalink
Introduce persisted NodeMetrics struct
Browse files Browse the repository at this point in the history
Previously, we persisted some of the `latest_` fields exposed via
`NodeStatus`. Here, we now refactor this via a persisted `NodeMetrics`
struct which allows to persist more fields across restarts. In
particular, we now persist the latest time we sync the on-chain wallet,
resulting in only doing a full scan on first initialization, and doing
incremental syncing afterwards.

As both of these operations are really really lightweight, we don't
bother to migrate the old persisted timestamps for RGS updates and node
announcement broadcasts over to the new data format.
  • Loading branch information
tnull committed Oct 3, 2024
1 parent 12f9140 commit d39ec15
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 217 deletions.
3 changes: 2 additions & 1 deletion bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,12 @@ dictionary NodeStatus {
boolean is_running;
boolean is_listening;
BestBlock current_best_block;
u64? latest_wallet_sync_timestamp;
u64? latest_lightning_wallet_sync_timestamp;
u64? latest_onchain_wallet_sync_timestamp;
u64? latest_fee_rate_cache_update_timestamp;
u64? latest_rgs_snapshot_timestamp;
u64? latest_node_announcement_broadcast_timestamp;
u32? latest_channel_monitor_archival_height;
};

dictionary BestBlock {
Expand Down
66 changes: 32 additions & 34 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::connection::ConnectionManager;
use crate::event::EventQueue;
use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
use crate::io;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{read_node_metrics, write_node_metrics};
#[cfg(any(vss, vss_test))]
use crate::io::vss_store::VssStore;
use crate::liquidity::LiquiditySource;
Expand All @@ -28,6 +28,7 @@ use crate::types::{
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
use crate::{io, NodeMetrics};
use crate::{LogLevel, Node};

use lightning::chain::{chainmonitor, BestBlock, Watch};
Expand Down Expand Up @@ -531,12 +532,16 @@ fn build_with_store_internal(
) -> Result<Node, BuildError> {
// Initialize the status fields.
let is_listening = Arc::new(AtomicBool::new(false));
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None));
let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None));
let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None));
let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None));
let latest_channel_monitor_archival_height = Arc::new(RwLock::new(None));
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(metrics) => Arc::new(RwLock::new(metrics)),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(RwLock::new(NodeMetrics::default()))
} else {
return Err(BuildError::ReadFailed);
}
},
};

// Initialize the on-chain wallet and chain access
let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| {
Expand Down Expand Up @@ -585,12 +590,10 @@ fn build_with_store_internal(
Arc::clone(&wallet),
Arc::clone(&fee_estimator),
Arc::clone(&tx_broadcaster),
Arc::clone(&kv_store),
Arc::clone(&config),
Arc::clone(&logger),
Arc::clone(&latest_wallet_sync_timestamp),
Arc::clone(&latest_onchain_wallet_sync_timestamp),
Arc::clone(&latest_fee_rate_cache_update_timestamp),
latest_channel_monitor_archival_height,
Arc::clone(&node_metrics),
)),
None => {
// Default to Esplora client.
Expand All @@ -600,12 +603,10 @@ fn build_with_store_internal(
Arc::clone(&wallet),
Arc::clone(&fee_estimator),
Arc::clone(&tx_broadcaster),
Arc::clone(&kv_store),
Arc::clone(&config),
Arc::clone(&logger),
Arc::clone(&latest_wallet_sync_timestamp),
Arc::clone(&latest_onchain_wallet_sync_timestamp),
Arc::clone(&latest_fee_rate_cache_update_timestamp),
latest_channel_monitor_archival_height,
Arc::clone(&node_metrics),
))
},
};
Expand Down Expand Up @@ -797,23 +798,24 @@ fn build_with_store_internal(
Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger)));

// Reset the RGS sync timestamp in case we somehow switch gossip sources
io::utils::write_latest_rgs_sync_timestamp(
0,
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.map_err(|e| {
log_error!(logger, "Failed writing to store: {}", e);
BuildError::WriteFailed
})?;
{
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_rgs_snapshot_timestamp = None;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.map_err(|e| {
log_error!(logger, "Failed writing to store: {}", e);
BuildError::WriteFailed
})?;
}
p2p_source
},
GossipSourceConfig::RapidGossipSync(rgs_server) => {
let latest_sync_timestamp = io::utils::read_latest_rgs_sync_timestamp(
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.unwrap_or(0);
let latest_sync_timestamp =
node_metrics.read().unwrap().latest_rgs_snapshot_timestamp.unwrap_or(0);
Arc::new(GossipSource::new_rgs(
rgs_server.clone(),
latest_sync_timestamp,
Expand Down Expand Up @@ -998,11 +1000,7 @@ fn build_with_store_internal(
peer_store,
payment_store,
is_listening,
latest_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_node_announcement_broadcast_timestamp,
node_metrics,
})
}

Expand Down
83 changes: 52 additions & 31 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use crate::fee_estimator::{
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
OnchainFeeEstimator,
};
use crate::io::utils::write_node_metrics;
use crate::logger::{log_bytes, log_error, log_info, log_trace, FilesystemLogger, Logger};
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, Sweeper, Wallet};
use crate::Error;
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, NodeMetrics};

use lightning::chain::{Confirm, Filter};
use lightning::util::ser::Writeable;
Expand Down Expand Up @@ -102,23 +103,18 @@ pub(crate) enum ChainSource {
lightning_wallet_sync_status: Mutex<WalletSyncStatus>,
fee_estimator: Arc<OnchainFeeEstimator>,
tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>,
config: Arc<Config>,
logger: Arc<FilesystemLogger>,
latest_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_onchain_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_fee_rate_cache_update_timestamp: Arc<RwLock<Option<u64>>>,
latest_channel_monitor_archival_height: Arc<RwLock<Option<u32>>>,
node_metrics: Arc<RwLock<NodeMetrics>>,
},
}

impl ChainSource {
pub(crate) fn new_esplora(
server_url: String, onchain_wallet: Arc<Wallet>, fee_estimator: Arc<OnchainFeeEstimator>,
tx_broadcaster: Arc<Broadcaster>, config: Arc<Config>, logger: Arc<FilesystemLogger>,
latest_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_onchain_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_fee_rate_cache_update_timestamp: Arc<RwLock<Option<u64>>>,
latest_channel_monitor_archival_height: Arc<RwLock<Option<u32>>>,
tx_broadcaster: Arc<Broadcaster>, kv_store: Arc<DynStore>, config: Arc<Config>,
logger: Arc<FilesystemLogger>, node_metrics: Arc<RwLock<NodeMetrics>>,
) -> Self {
let mut client_builder = esplora_client::Builder::new(&server_url);
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
Expand All @@ -135,12 +131,10 @@ impl ChainSource {
lightning_wallet_sync_status,
fee_estimator,
tx_broadcaster,
kv_store,
config,
logger,
latest_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_channel_monitor_archival_height,
node_metrics,
}
}

Expand Down Expand Up @@ -211,8 +205,9 @@ impl ChainSource {
esplora_client,
onchain_wallet,
onchain_wallet_sync_status,
kv_store,
logger,
latest_onchain_wallet_sync_timestamp,
node_metrics,
..
} => {
let receiver_res = {
Expand All @@ -232,7 +227,7 @@ impl ChainSource {
// If this is our first sync, do a full scan with the configured gap limit.
// Otherwise just do an incremental sync.
let incremental_sync =
latest_onchain_wallet_sync_timestamp.read().unwrap().is_some();
node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();

macro_rules! get_and_apply_wallet_update {
($sync_future: expr) => {{
Expand All @@ -251,8 +246,11 @@ impl ChainSource {
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*latest_onchain_wallet_sync_timestamp.write().unwrap() =
unix_time_secs_opt;
{
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), Arc::clone(&logger))?;
}
Ok(())
},
Err(e) => Err(e),
Expand Down Expand Up @@ -327,9 +325,9 @@ impl ChainSource {
Self::Esplora {
tx_sync,
lightning_wallet_sync_status,
kv_store,
logger,
latest_wallet_sync_timestamp,
latest_channel_monitor_archival_height,
node_metrics,
..
} => {
let sync_cman = Arc::clone(&channel_manager);
Expand Down Expand Up @@ -372,13 +370,24 @@ impl ChainSource {
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*latest_wallet_sync_timestamp.write().unwrap() = unix_time_secs_opt;
{
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&kv_store),
Arc::clone(&logger),
)?;
}

periodically_archive_fully_resolved_monitors(
Arc::clone(&channel_manager),
Arc::clone(&chain_monitor),
Arc::clone(&latest_channel_monitor_archival_height),
);
Arc::clone(&kv_store),
Arc::clone(&logger),
Arc::clone(&node_metrics),
)?;
Ok(())
},
Err(e) => {
Expand Down Expand Up @@ -406,8 +415,9 @@ impl ChainSource {
esplora_client,
fee_estimator,
config,
kv_store,
logger,
latest_fee_rate_cache_update_timestamp,
node_metrics,
..
} => {
let now = Instant::now();
Expand Down Expand Up @@ -479,7 +489,15 @@ impl ChainSource {
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*latest_fee_rate_cache_update_timestamp.write().unwrap() = unix_time_secs_opt;
{
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&kv_store),
Arc::clone(&logger),
)?;
}

Ok(())
},
Expand Down Expand Up @@ -580,16 +598,19 @@ impl Filter for ChainSource {

fn periodically_archive_fully_resolved_monitors(
channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
latest_channel_monitor_archival_height: Arc<RwLock<Option<u32>>>,
) {
let mut latest_archival_height_lock = latest_channel_monitor_archival_height.write().unwrap();
kv_store: Arc<DynStore>, logger: Arc<FilesystemLogger>, node_metrics: Arc<RwLock<NodeMetrics>>,
) -> Result<(), Error> {
let mut locked_node_metrics = node_metrics.write().unwrap();
let cur_height = channel_manager.current_best_block().height;
let should_archive = latest_archival_height_lock
let should_archive = locked_node_metrics
.latest_channel_monitor_archival_height
.as_ref()
.map_or(true, |h| cur_height >= h + RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL);

if should_archive {
chain_monitor.archive_fully_resolved_channel_monitors();
*latest_archival_height_lock = Some(cur_height);
locked_node_metrics.latest_channel_monitor_archival_height = Some(cur_height);
write_node_metrics(&*locked_node_metrics, kv_store, logger)?;
}
Ok(())
}
10 changes: 0 additions & 10 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,6 @@ pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = "";
pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = "";
pub(crate) const NODE_METRICS_KEY: &str = "node_metrics";

/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = "";
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE: &str = "";
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp";

/// The last time we broadcast a node announcement will be persisted under this key.
pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_PRIMARY_NAMESPACE: &str = "";
pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_SECONDARY_NAMESPACE: &str = "";
pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY: &str = "latest_node_ann_bcast_timestamp";

/// The BDK wallet's [`ChangeSet::descriptor`] will be persisted under this key.
///
/// [`ChangeSet::descriptor`]: bdk_wallet::ChangeSet::descriptor
Expand Down
Loading

0 comments on commit d39ec15

Please sign in to comment.