Skip to content

Commit

Permalink
Use the same function for saving data transfer sessions in all places
Browse files Browse the repository at this point in the history
This is a relatively big query, and it is nice to not have to keep multiple copies of it in sync.

first_timestamp was added to handle the case where pending data transfer sessions are being moved back to the regular data transfer sessions table.
  • Loading branch information
michaeldjeffrey committed Jan 22, 2025
1 parent 50f1b08 commit e9b1181
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 45 deletions.
2 changes: 1 addition & 1 deletion mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn accumulate_sessions(
continue;
}

pending_burns::save(&mut *txn, &report.report, curr_file_ts).await?;
pending_burns::save_data_transfer_session_req(&mut *txn, &report.report, curr_file_ts).await?;
}

Ok(())
Expand Down
49 changes: 37 additions & 12 deletions mobile_packet_verifier/src/pending_burns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,35 +114,60 @@ pub async fn get_all_payer_burns(conn: &Pool<Postgres>) -> anyhow::Result<Vec<Pe
Ok(pending_payer_burns)
}

pub async fn save(
pub async fn save_data_transfer_session_req(
txn: &mut Transaction<'_, Postgres>,
req: &DataTransferSessionReq,
last_timestamp: DateTime<Utc>,
) -> anyhow::Result<()> {
) -> Result<(), sqlx::Error> {
save_data_transfer_session(
txn,
&DataTransferSession {
pub_key: req.data_transfer_usage.pub_key.clone(),
payer: req.data_transfer_usage.payer.clone(),
uploaded_bytes: req.data_transfer_usage.upload_bytes as i64,
downloaded_bytes: req.data_transfer_usage.download_bytes as i64,
rewardable_bytes: req.rewardable_bytes as i64,
// timestamps are the same upon ingest
first_timestamp: last_timestamp,
last_timestamp: last_timestamp,
},
)
.await?;

let dc_to_burn = bytes_to_dc(req.rewardable_bytes);
increment_metric(&req.data_transfer_usage.payer, dc_to_burn);

Ok(())
}

pub async fn save_data_transfer_session(
txn: &mut Transaction<'_, Postgres>,
data_transfer_session: &DataTransferSession,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $6)
INSERT INTO data_transfer_sessions
(pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp)
VALUES
($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (pub_key, payer) DO UPDATE SET
uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes,
downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes,
rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes,
first_timestamp = LEAST(data_transfer_sessions.first_timestamp, EXCLUDED.first_timestamp),
last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp)
"#
)
.bind(&req.data_transfer_usage.pub_key)
.bind(&req.data_transfer_usage.payer)
.bind(req.data_transfer_usage.upload_bytes as i64)
.bind(req.data_transfer_usage.download_bytes as i64)
.bind(req.rewardable_bytes as i64)
.bind(last_timestamp)
.bind(&data_transfer_session.pub_key)
.bind(&data_transfer_session.payer)
.bind(data_transfer_session.uploaded_bytes)
.bind(data_transfer_session.downloaded_bytes)
.bind(data_transfer_session.rewardable_bytes)
.bind(data_transfer_session.first_timestamp)
.bind(data_transfer_session.last_timestamp)
.execute(txn)
.await?;

increment_metric(&req.data_transfer_usage.payer, dc_to_burn);

Ok(())
}

Expand Down
42 changes: 11 additions & 31 deletions mobile_packet_verifier/src/pending_txns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use helium_crypto::PublicKeyBinary;
use solana::Signature;
use sqlx::{postgres::PgRow, FromRow, PgPool, Row};

use crate::pending_burns::DataTransferSession;
use crate::pending_burns::{self, DataTransferSession};

#[derive(Debug)]
pub struct PendingTxn {
Expand Down Expand Up @@ -135,42 +135,22 @@ pub async fn remove_pending_txn_failure(
.execute(&mut *txn)
.await?;

sqlx::query(
// Move pending data sessions back to the main table
let transfer_sessions: Vec<DataTransferSession> = sqlx::query_as(
r#"
WITH moved_rows AS (
DELETE FROM pending_data_transfer_sessions
WHERE signature = $1
RETURNING *
)
INSERT INTO data_transfer_sessions (
pub_key,
payer,
uploaded_bytes,
downloaded_bytes,
rewardable_bytes,
first_timestamp,
last_timestamp
)
SELECT
pub_key,
payer,
uploaded_bytes,
downloaded_bytes,
rewardable_bytes,
first_timestamp,
last_timestamp
FROM moved_rows
ON CONFLICT (pub_key, payer) DO UPDATE SET
uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes,
downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes,
rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes,
last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp)
DELETE FROM pending_data_transfer_sessions
WHERE signature = $1
RETURNING *
"#,
)
.bind(signature.to_string())
.execute(&mut *txn)
.fetch_all(&mut *txn)
.await?;

for session in transfer_sessions.iter() {
pending_burns::save_data_transfer_session(&mut txn, session).await?;
}

txn.commit().await?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion mobile_packet_verifier/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ async fn save_data_transfer_sessions(
let mut txn = pool.begin().await?;
for (payer, pubkey, amount) in sessions {
let session = mk_data_transfer_session(payer, pubkey, *amount);
pending_burns::save(&mut txn, &session, Utc::now()).await?;
pending_burns::save_data_transfer_session_req(&mut txn, &session, Utc::now()).await?;
}
txn.commit().await?;

Expand Down

0 comments on commit e9b1181

Please sign in to comment.