Skip to content

Commit

Permalink
delete pending payments unknown to the node
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Dec 5, 2024
1 parent f7518ea commit 800bdeb
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 3 deletions.
1 change: 1 addition & 0 deletions libs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rusqlite = { version = "0.29", features = [
"backup",
"trace",
"hooks",
"array",
] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
6 changes: 5 additions & 1 deletion libs/sdk-core/src/breez_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,16 +1158,20 @@ impl BreezServices {

// First query the changes since last sync state.
let sync_state = self.persister.get_sync_state()?;
let pending_payments = self.persister.get_pending_payment_hashes()?;
let new_data = &self
.node_api
.pull_changed(sync_state.clone(), match_local_balance)
.pull_changed(sync_state.clone(), match_local_balance, pending_payments)
.await?;

debug!(
"pull changed old state={:?} new state={:?}",
sync_state, new_data.sync_state
);

// Delete any pending payments that don't exist on the node.
self.persister.delete_payments(&new_data.deleted)?;

// update node state and channels state
self.persister.set_node_state(&new_data.node_state)?;

Expand Down
73 changes: 71 additions & 2 deletions libs/sdk-core/src/greenlight/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Result};
use ecies::symmetric::{sym_decrypt, sym_encrypt};
use futures::{Future, Stream};
use futures::future::join_all;
use futures::{Future, FutureExt, Stream};
use gl_client::credentials::{Device, Nobody};
use gl_client::node;
use gl_client::node::ClnClient;
Expand All @@ -21,7 +22,8 @@ use gl_client::pb::cln::listsendpays_request::ListsendpaysIndex;
use gl_client::pb::cln::{
self, Amount, GetrouteRequest, GetrouteRoute, ListchannelsRequest,
ListclosedchannelsClosedchannels, ListpaysPays, ListpeerchannelsChannels, ListsendpaysPayments,
PreapproveinvoiceRequest, SendpayRequest, SendpayRoute, WaitsendpayRequest,
ListsendpaysRequest, PreapproveinvoiceRequest, SendpayRequest, SendpayRoute,
WaitsendpayRequest,
};
use gl_client::pb::{OffChainPayment, TrampolinePayRequest};
use gl_client::scheduler::Scheduler;
Expand Down Expand Up @@ -729,6 +731,68 @@ impl Greenlight {
Ok((new_state, payments))
}

/// If there are any pending transactions supplied, this function checks
/// whether there was an update for that pending transaction in the current
/// sync round. If not, it checks whether the transaction exists on the
/// remote node. If it doesn't exist, the pending transaction is marked to
/// be deleted.
async fn update_pending_transactions(
&self,
payments: &[Payment],
pending: Vec<String>,
) -> NodeResult<Vec<String>> {
if pending.is_empty() {
return Ok(Vec::new());
}

let mut pending_map: HashSet<String> = HashSet::from_iter(pending);
for payment in payments {
if pending_map.is_empty() {
return Ok(Vec::new());
}

// If the payment is already in the update, the pending payment will
// be resolved automatically.
pending_map.remove(&payment.id);
}

if pending_map.is_empty() {
return Ok(Vec::new());
}

let client = self.get_node_client().await?;

// If there's any pending payments remaining, check whether they exist
// in parallel.
let mut futures = Vec::new();
for hash in pending_map {
let mut client = client.clone();
let hash_vec = hex::decode(&hash)?;
let req = ListsendpaysRequest {
index: Some(ListinvoicesIndex::Created.into()),
limit: Some(1),
payment_hash: Some(hash_vec),
..Default::default()
};
futures.push(async move {
with_connection_retry!(client.list_send_pays(req.clone()))
.map(|r| (hash, r))
.await
});
}

let mut deleted = Vec::new();
let results = join_all(futures).await;
for (hash, result) in results {
let result = result?.into_inner();
if result.payments.is_empty() {
deleted.push(hash);
}
}

Ok(deleted)
}

async fn pull_receive_payments(
&self,
state: &SyncIndex,
Expand Down Expand Up @@ -1096,6 +1160,7 @@ impl NodeAPI for Greenlight {
&self,
sync_state: Option<Value>,
match_local_balance: bool,
pending_payments: Vec<String>,
) -> NodeResult<SyncResponse> {
let sync_state: SyncState = match sync_state {
Some(sync_state) => serde_json::from_value(sync_state)?,
Expand Down Expand Up @@ -1204,12 +1269,16 @@ impl NodeAPI for Greenlight {
}

let (new_sync_state, payments) = self.pull_transactions(&sync_state, htlc_list).await?;
let deleted = self
.update_pending_transactions(&payments, pending_payments)
.await?;

Ok(SyncResponse {
sync_state: serde_json::to_value(new_sync_state)?,
node_state,
payments,
channels: all_channel_models,
deleted,
})
}

Expand Down
1 change: 1 addition & 0 deletions libs/sdk-core/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ pub struct SyncResponse {
pub node_state: NodeState,
pub payments: Vec<crate::models::Payment>,
pub channels: Vec<crate::models::Channel>,
pub deleted: Vec<String>,
}

/// The status of a payment
Expand Down
1 change: 1 addition & 0 deletions libs/sdk-core/src/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub trait NodeAPI: Send + Sync {
&self,
sync_state: Option<Value>,
match_local_balance: bool,
pending_payments: Vec<String>,
) -> NodeResult<SyncResponse>;
/// As per the `pb::PayRequest` docs, `amount_msat` is only needed when the invoice doesn't specify an amount
async fn send_payment(
Expand Down
1 change: 1 addition & 0 deletions libs/sdk-core/src/persist/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl SqliteStorage {

pub(crate) fn get_connection(&self) -> PersistResult<Connection> {
let con = Connection::open(self.main_db_file.clone())?;
rusqlite::vtab::array::load_module(&con)?;
let sql = "ATTACH DATABASE ? AS sync;";
con.execute(sql, [self.sync_db_file.clone()])?;
// We want to notify any subscribers with hook events.
Expand Down
46 changes: 46 additions & 0 deletions libs/sdk-core/src/persist/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::rc::Rc;
use std::str::FromStr;

use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, Type, ValueRef};
Expand Down Expand Up @@ -177,6 +178,51 @@ impl SqliteStorage {
Ok(())
}

pub fn get_pending_payment_hashes(&self) -> PersistResult<Vec<String>> {
let con = self.get_connection()?;
let mut stmt = con.prepare(
"SELECT p.id
FROM payments p
WHERE p.status = :status AND p.payment_type = :payment_type",
)?;
let res = stmt
.query_map(
named_params! {
":status": PaymentStatus::Pending,
":payment_type": PaymentType::Sent.to_string(),
},
|row| {
let id: String = row.get(0)?;
Ok(id)
},
)?
.map(|i| i.unwrap())
.collect();
Ok(res)
}

pub fn delete_payments(&self, hashes: &[String]) -> PersistResult<()> {
if hashes.is_empty() {
return Ok(());
}

let values = Rc::new(
hashes
.iter()
.cloned()
.map(rusqlite::types::Value::from)
.collect::<Vec<_>>(),
);
let con = self.get_connection()?;
let mut stmt = con.prepare(
"DELETE FROM payments
WHERE id in rarray(?1)",
)?;
stmt.execute([values])?;
debug!("deleted {} payments", hashes.len());
Ok(())
}

/// Constructs [Payment] by joining data in the `payment` and `payments_external_info` tables
///
/// This queries all payments. To query a single payment, see [Self::get_payment_by_hash]
Expand Down
2 changes: 2 additions & 0 deletions libs/sdk-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ impl NodeAPI for MockNodeAPI {
&self,
_sync_state: Option<Value>,
_match_local_balance: bool,
_pending_payments: Vec<String>,
) -> NodeResult<SyncResponse> {
Ok(SyncResponse {
sync_state: Value::Null,
Expand All @@ -365,6 +366,7 @@ impl NodeAPI for MockNodeAPI {
.flat_map(TryInto::try_into)
.collect(),
channels: Vec::new(),
deleted: Vec::new(),
})
}

Expand Down

0 comments on commit 800bdeb

Please sign in to comment.