From 800bdebb60087204bbbe6f2194235d4e559e396e Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Thu, 5 Dec 2024 09:23:28 +0100 Subject: [PATCH] delete pending payments unknown to the node --- libs/Cargo.toml | 1 + libs/sdk-core/src/breez_services.rs | 6 +- libs/sdk-core/src/greenlight/node_api.rs | 73 ++++++++++++++++++++++- libs/sdk-core/src/models.rs | 1 + libs/sdk-core/src/node_api.rs | 1 + libs/sdk-core/src/persist/db.rs | 1 + libs/sdk-core/src/persist/transactions.rs | 46 ++++++++++++++ libs/sdk-core/src/test_utils.rs | 2 + 8 files changed, 128 insertions(+), 3 deletions(-) diff --git a/libs/Cargo.toml b/libs/Cargo.toml index 0c2a5dd6e..5b2d97ed9 100644 --- a/libs/Cargo.toml +++ b/libs/Cargo.toml @@ -37,6 +37,7 @@ rusqlite = { version = "0.29", features = [ "backup", "trace", "hooks", + "array", ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/libs/sdk-core/src/breez_services.rs b/libs/sdk-core/src/breez_services.rs index a5d9ccf5a..107f62c0d 100644 --- a/libs/sdk-core/src/breez_services.rs +++ b/libs/sdk-core/src/breez_services.rs @@ -1158,9 +1158,10 @@ impl BreezServices { // First query the changes since last sync state. let sync_state = self.persister.get_sync_state()?; + let pending_payments = self.persister.get_pending_payment_hashes()?; let new_data = &self .node_api - .pull_changed(sync_state.clone(), match_local_balance) + .pull_changed(sync_state.clone(), match_local_balance, pending_payments) .await?; debug!( @@ -1168,6 +1169,9 @@ impl BreezServices { sync_state, new_data.sync_state ); + // Delete any pending payments that don't exist on the node. + self.persister.delete_payments(&new_data.deleted)?; + // update node state and channels state self.persister.set_node_state(&new_data.node_state)?; diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index e2905a864..97d162d2b 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -9,7 +9,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Result}; use ecies::symmetric::{sym_decrypt, sym_encrypt}; -use futures::{Future, Stream}; +use futures::future::join_all; +use futures::{Future, FutureExt, Stream}; use gl_client::credentials::{Device, Nobody}; use gl_client::node; use gl_client::node::ClnClient; @@ -21,7 +22,8 @@ use gl_client::pb::cln::listsendpays_request::ListsendpaysIndex; use gl_client::pb::cln::{ self, Amount, GetrouteRequest, GetrouteRoute, ListchannelsRequest, ListclosedchannelsClosedchannels, ListpaysPays, ListpeerchannelsChannels, ListsendpaysPayments, - PreapproveinvoiceRequest, SendpayRequest, SendpayRoute, WaitsendpayRequest, + ListsendpaysRequest, PreapproveinvoiceRequest, SendpayRequest, SendpayRoute, + WaitsendpayRequest, }; use gl_client::pb::{OffChainPayment, TrampolinePayRequest}; use gl_client::scheduler::Scheduler; @@ -729,6 +731,68 @@ impl Greenlight { Ok((new_state, payments)) } + /// If there are any pending transactions supplied, this function checks + /// whether there was an update for that pending transaction in the current + /// sync round. If not, it checks whether the transaction exists on the + /// remote node. If it doesn't exist, the pending transaction is marked to + /// be deleted. + async fn update_pending_transactions( + &self, + payments: &[Payment], + pending: Vec, + ) -> NodeResult> { + if pending.is_empty() { + return Ok(Vec::new()); + } + + let mut pending_map: HashSet = HashSet::from_iter(pending); + for payment in payments { + if pending_map.is_empty() { + return Ok(Vec::new()); + } + + // If the payment is already in the update, the pending payment will + // be resolved automatically. + pending_map.remove(&payment.id); + } + + if pending_map.is_empty() { + return Ok(Vec::new()); + } + + let client = self.get_node_client().await?; + + // If there's any pending payments remaining, check whether they exist + // in parallel. + let mut futures = Vec::new(); + for hash in pending_map { + let mut client = client.clone(); + let hash_vec = hex::decode(&hash)?; + let req = ListsendpaysRequest { + index: Some(ListinvoicesIndex::Created.into()), + limit: Some(1), + payment_hash: Some(hash_vec), + ..Default::default() + }; + futures.push(async move { + with_connection_retry!(client.list_send_pays(req.clone())) + .map(|r| (hash, r)) + .await + }); + } + + let mut deleted = Vec::new(); + let results = join_all(futures).await; + for (hash, result) in results { + let result = result?.into_inner(); + if result.payments.is_empty() { + deleted.push(hash); + } + } + + Ok(deleted) + } + async fn pull_receive_payments( &self, state: &SyncIndex, @@ -1096,6 +1160,7 @@ impl NodeAPI for Greenlight { &self, sync_state: Option, match_local_balance: bool, + pending_payments: Vec, ) -> NodeResult { let sync_state: SyncState = match sync_state { Some(sync_state) => serde_json::from_value(sync_state)?, @@ -1204,12 +1269,16 @@ impl NodeAPI for Greenlight { } let (new_sync_state, payments) = self.pull_transactions(&sync_state, htlc_list).await?; + let deleted = self + .update_pending_transactions(&payments, pending_payments) + .await?; Ok(SyncResponse { sync_state: serde_json::to_value(new_sync_state)?, node_state, payments, channels: all_channel_models, + deleted, }) } diff --git a/libs/sdk-core/src/models.rs b/libs/sdk-core/src/models.rs index 9e655b461..cc38b316c 100644 --- a/libs/sdk-core/src/models.rs +++ b/libs/sdk-core/src/models.rs @@ -645,6 +645,7 @@ pub struct SyncResponse { pub node_state: NodeState, pub payments: Vec, pub channels: Vec, + pub deleted: Vec, } /// The status of a payment diff --git a/libs/sdk-core/src/node_api.rs b/libs/sdk-core/src/node_api.rs index 05dba8306..d3851c120 100644 --- a/libs/sdk-core/src/node_api.rs +++ b/libs/sdk-core/src/node_api.rs @@ -123,6 +123,7 @@ pub trait NodeAPI: Send + Sync { &self, sync_state: Option, match_local_balance: bool, + pending_payments: Vec, ) -> NodeResult; /// As per the `pb::PayRequest` docs, `amount_msat` is only needed when the invoice doesn't specify an amount async fn send_payment( diff --git a/libs/sdk-core/src/persist/db.rs b/libs/sdk-core/src/persist/db.rs index 0794a1458..a0b817427 100644 --- a/libs/sdk-core/src/persist/db.rs +++ b/libs/sdk-core/src/persist/db.rs @@ -68,6 +68,7 @@ impl SqliteStorage { pub(crate) fn get_connection(&self) -> PersistResult { let con = Connection::open(self.main_db_file.clone())?; + rusqlite::vtab::array::load_module(&con)?; let sql = "ATTACH DATABASE ? AS sync;"; con.execute(sql, [self.sync_db_file.clone()])?; // We want to notify any subscribers with hook events. diff --git a/libs/sdk-core/src/persist/transactions.rs b/libs/sdk-core/src/persist/transactions.rs index 9d3cbec06..f84c30b33 100644 --- a/libs/sdk-core/src/persist/transactions.rs +++ b/libs/sdk-core/src/persist/transactions.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::rc::Rc; use std::str::FromStr; use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, Type, ValueRef}; @@ -177,6 +178,51 @@ impl SqliteStorage { Ok(()) } + pub fn get_pending_payment_hashes(&self) -> PersistResult> { + let con = self.get_connection()?; + let mut stmt = con.prepare( + "SELECT p.id + FROM payments p + WHERE p.status = :status AND p.payment_type = :payment_type", + )?; + let res = stmt + .query_map( + named_params! { + ":status": PaymentStatus::Pending, + ":payment_type": PaymentType::Sent.to_string(), + }, + |row| { + let id: String = row.get(0)?; + Ok(id) + }, + )? + .map(|i| i.unwrap()) + .collect(); + Ok(res) + } + + pub fn delete_payments(&self, hashes: &[String]) -> PersistResult<()> { + if hashes.is_empty() { + return Ok(()); + } + + let values = Rc::new( + hashes + .iter() + .cloned() + .map(rusqlite::types::Value::from) + .collect::>(), + ); + let con = self.get_connection()?; + let mut stmt = con.prepare( + "DELETE FROM payments + WHERE id in rarray(?1)", + )?; + stmt.execute([values])?; + debug!("deleted {} payments", hashes.len()); + Ok(()) + } + /// Constructs [Payment] by joining data in the `payment` and `payments_external_info` tables /// /// This queries all payments. To query a single payment, see [Self::get_payment_by_hash] diff --git a/libs/sdk-core/src/test_utils.rs b/libs/sdk-core/src/test_utils.rs index d237ba7ad..7d92e88de 100644 --- a/libs/sdk-core/src/test_utils.rs +++ b/libs/sdk-core/src/test_utils.rs @@ -352,6 +352,7 @@ impl NodeAPI for MockNodeAPI { &self, _sync_state: Option, _match_local_balance: bool, + _pending_payments: Vec, ) -> NodeResult { Ok(SyncResponse { sync_state: Value::Null, @@ -365,6 +366,7 @@ impl NodeAPI for MockNodeAPI { .flat_map(TryInto::try_into) .collect(), channels: Vec::new(), + deleted: Vec::new(), }) }