Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Helium Lib attempt: 2 #930

Open
wants to merge 61 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
a673ed3
use local helium-lib for testing
michaeldjeffrey Dec 20, 2024
a001518
Remove SolanaRpcError::Test
michaeldjeffrey Dec 20, 2024
cc9c8dd
misformatted tokio::select!
michaeldjeffrey Dec 20, 2024
d87b8e0
Use helium-lib for burn txns
michaeldjeffrey Dec 20, 2024
8f16c45
Initialize SolanaRpc with SubDao
michaeldjeffrey Dec 20, 2024
f174c33
update helium-lib deps
michaeldjeffrey Jan 7, 2025
bbc908f
Use helium-lib for sending and confirming mobile burn txn
michaeldjeffrey Jan 7, 2025
3ca6f90
use helium-lib for sending and confirming iot burn txns
michaeldjeffrey Jan 7, 2025
1ed6aa0
remove panics
michaeldjeffrey Jan 8, 2025
5d0f353
Update to receive TransactionWithBlockhash and Name
michaeldjeffrey Jan 8, 2025
63aac59
Start updating integration tests to use new helium-lib interactino, a…
michaeldjeffrey Jan 9, 2025
f59484c
Use sqlx for iot integration tests
michaeldjeffrey Jan 9, 2025
3a5c327
Box potential large error
michaeldjeffrey Jan 9, 2025
38b973e
remove unneccessary clone
michaeldjeffrey Jan 9, 2025
2413347
Add inner impl for adding pending transaction to override date
michaeldjeffrey Jan 9, 2025
a7607e1
remove duplicated integration test with a bunch of mocks
michaeldjeffrey Jan 9, 2025
e2a1a78
Move send_with_retry macro into start_boost
michaeldjeffrey Jan 10, 2025
89db58d
Bring in TxnSender from helium-lib to Oracles
michaeldjeffrey Jan 10, 2025
e9206f3
Remove retry options
michaeldjeffrey Jan 10, 2025
5d9eca4
Update logging
michaeldjeffrey Jan 10, 2025
ac87c51
Move TestSolanaClient to test file
michaeldjeffrey Jan 10, 2025
d169245
Point at published helium-lib branch
michaeldjeffrey Jan 10, 2025
8e3c19c
Remove unused callbacks
michaeldjeffrey Jan 10, 2025
08da5d4
Bring in sender unit tests
michaeldjeffrey Jan 10, 2025
7387c29
more reasonable real world default, and override for test
michaeldjeffrey Jan 10, 2025
003e929
Go all the way back to helium-lib master
michaeldjeffrey Jan 13, 2025
be04eb9
Use local version of wrapping transaction
michaeldjeffrey Jan 13, 2025
19c07fe
Unsplit error conditions
michaeldjeffrey Jan 13, 2025
6c1ffff
Rename so both stores have the same name
michaeldjeffrey Jan 13, 2025
4b5615b
Alias SolanaTransaction so we don't have ot use a qualified path ever…
michaeldjeffrey Jan 13, 2025
9eba25f
Start adding integration tests for mobile packet verifier
michaeldjeffrey Jan 13, 2025
7e2e71e
check balances and written reports
michaeldjeffrey Jan 13, 2025
1f3af73
Track pending txns for mobile burns
michaeldjeffrey Jan 13, 2025
db72c1a
confirm pending txns on startup
michaeldjeffrey Jan 13, 2025
75b01d7
Remove unused struct
michaeldjeffrey Jan 13, 2025
88b0b47
Move solana test client to solana workspace
michaeldjeffrey Jan 13, 2025
775f168
I think we're past this now
michaeldjeffrey Jan 13, 2025
f5ed667
fixup comments
michaeldjeffrey Jan 13, 2025
01354d1
use helium-lib for making boosting txn
michaeldjeffrey Jan 13, 2025
fa272a7
Remove unused noopstore
michaeldjeffrey Jan 14, 2025
0ddf2f9
local noopstore
michaeldjeffrey Jan 14, 2025
08d4acc
return full error from preparation
michaeldjeffrey Jan 14, 2025
f89d1eb
confirm_pending_txns takes a FileSinkClient
michaeldjeffrey Jan 14, 2025
4026834
test for confirm pending txns writing out data transfer sessions
michaeldjeffrey Jan 14, 2025
141524c
write out data transfer sessions for pending burns
michaeldjeffrey Jan 15, 2025
f5cae95
clippy fixes
michaeldjeffrey Jan 15, 2025
65c9f84
A txn that cannot be finalized will move back to available
michaeldjeffrey Jan 15, 2025
5f4adbb
Break out test saving multiple data transfer sessions
michaeldjeffrey Jan 15, 2025
5814ee3
Helper TestBurner
michaeldjeffrey Jan 15, 2025
a7116be
add logging around burn success/failure
michaeldjeffrey Jan 15, 2025
cb81c86
Move txn db function to new crate
michaeldjeffrey Jan 15, 2025
ad0627b
get_payer_balance helper
michaeldjeffrey Jan 15, 2025
1f4e88a
transcation -> txn
michaeldjeffrey Jan 15, 2025
dd24c59
move bytes_to_dc to lib
michaeldjeffrey Jan 15, 2025
8020117
clippy suggestions
michaeldjeffrey Jan 15, 2025
ef9008e
Add ability to panic solana client when sending transaction
michaeldjeffrey Jan 16, 2025
67fd9bb
Skip burning when there is a pending txn
michaeldjeffrey Jan 16, 2025
8ae7f65
Test for skipping burn when there are pending txns
michaeldjeffrey Jan 16, 2025
fb5cf0f
clippy updates
michaeldjeffrey Jan 16, 2025
50f1b08
always confirm before burn, don't confirm on startup
michaeldjeffrey Jan 22, 2025
a09b1d7
Use the same function for saving data transfer sessions in all places
michaeldjeffrey Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
580 changes: 116 additions & 464 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,6 @@ sqlx = { git = "https://github.com/launchbadge/sqlx.git", rev = "42dd78fe931df65
# [patch.'https://github.com/helium/proto']
# helium-proto = { path = "../proto" }
# beacon = { path = "../proto/beacon" }

# [patch.'https://github.com/helium/helium-wallet-rs.git']
# helium-lib = { path = "../helium-wallet-rs/helium-lib" }
10 changes: 7 additions & 3 deletions boost_manager/tests/integrations/updater_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct MockTransaction {

pub struct MockSolanaConnection {
submitted: Mutex<Vec<MockTransaction>>,
error: Option<String>,
error: Option<tonic::Status>,
}

impl MockSolanaConnection {
Expand All @@ -34,7 +34,7 @@ impl MockSolanaConnection {
fn with_error(error: String) -> Self {
Self {
submitted: Mutex::new(vec![]),
error: Some(error),
error: Some(tonic::Status::internal(error)),
}
}
}
Expand All @@ -58,7 +58,11 @@ impl SolanaNetwork for MockSolanaConnection {

self.error
.as_ref()
.map(|err| Err(SolanaRpcError::Test(err.to_owned())))
.map(|err| {
Err(SolanaRpcError::HeliumLib(solana::error::Error::Grpc(
err.to_owned(),
)))
})
.unwrap_or(Ok(()))
}

Expand Down
5 changes: 5 additions & 0 deletions iot_packet_verifier/src/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::sync::Mutex;

/// Caches balances fetched from the solana chain and debits made by the
/// packet verifier.
#[derive(Clone)]
pub struct BalanceCache<S> {
payer_accounts: BalanceStore,
solana: S,
Expand Down Expand Up @@ -55,6 +56,10 @@ impl<S> BalanceCache<S> {
pub fn balances(&self) -> BalanceStore {
self.payer_accounts.clone()
}

pub async fn get_payer_balance(&self, payer: &PublicKeyBinary) -> Option<PayerAccount> {
self.payer_accounts.lock().await.get(payer).cloned()
}
}

#[async_trait::async_trait]
Expand Down
158 changes: 129 additions & 29 deletions iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use crate::{
},
};
use futures::{future::LocalBoxFuture, TryFutureExt};
use solana::{burn::SolanaNetwork, GetSignature, SolanaRpcError};
use helium_crypto::PublicKeyBinary;
use solana::{burn::SolanaNetwork, sender, SolanaRpcError};
use std::time::Duration;
use task_manager::ManagedTask;
use tokio::time::{self, MissedTickBehavior};
use tracing::Instrument;

pub struct Burner<P, S> {
pending_tables: P,
Expand All @@ -19,7 +21,7 @@ pub struct Burner<P, S> {

impl<P, S> ManagedTask for Burner<P, S>
where
P: PendingTables + Send + Sync + 'static,
P: PendingTables,
S: SolanaNetwork,
{
fn start_task(
Expand Down Expand Up @@ -76,9 +78,9 @@ where

loop {
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => {
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => {
match self.burn().await {
Ok(()) => continue,
Err(err) => {
Expand All @@ -94,49 +96,147 @@ where
}

pub async fn burn(&mut self) -> Result<(), BurnError> {
// There should only be a single pending txn at a time
let pending_txns = self.pending_tables.fetch_all_pending_txns().await?;
if !pending_txns.is_empty() {
tracing::info!(pending_txns = pending_txns.len(), "skipping burn");
return Ok(());
}

// Fetch the next payer and amount that should be burn. If no such burn
// exists, perform no action.
let Some(Burn { payer, amount }) = self.pending_tables.fetch_next_burn().await? else {
tracing::info!("no pending burns");
return Ok(());
};

tracing::info!(%amount, %payer, "Burning DC");

// Create a burn transaction and execute it:
let txn = self
.solana
.make_burn_transaction(&payer, amount)
.await
.map_err(BurnError::SolanaError)?;
self.pending_tables
.add_pending_transaction(&payer, amount, txn.get_signature())
.await?;

let store = BurnTxnStore::new(
self.pending_tables.clone(),
self.balances.clone(),
payer.clone(),
amount,
);

let burn_span = tracing::info_span!("burn_txn", %payer, amount);
self.solana
.submit_transaction(&txn)
.submit_transaction(&txn, &store)
.map_err(BurnError::SolanaError)
.instrument(burn_span)
.await
.map_err(BurnError::SolanaError)?;
}
}

pub struct BurnTxnStore<PT> {
pool: PT,
balances: BalanceStore,
payer: PublicKeyBinary,
amount: u64,
}

// Removing the pending transaction and subtract the burn amount
// now that we have confirmation that the burn transaction is confirmed
// on chain:
let mut pending_tables_txn = self.pending_tables.begin().await?;
pending_tables_txn
.remove_pending_transaction(txn.get_signature())
.await?;
pending_tables_txn
.subtract_burned_amount(&payer, amount)
.await?;
pending_tables_txn.commit().await?;
impl<PT: PendingTables + Clone> BurnTxnStore<PT> {
pub fn new(pool: PT, balances: BalanceStore, payer: PublicKeyBinary, amount: u64) -> Self {
Self {
pool,
balances,
payer,
amount,
}
}
}

let mut balance_lock = self.balances.lock().await;
let payer_account = balance_lock.get_mut(&payer).unwrap();
// Reduce the pending burn amount and the payer's balance by the amount
// we've burned.
payer_account.burned = payer_account.burned.saturating_sub(amount);
payer_account.balance = payer_account.balance.saturating_sub(amount);
#[async_trait::async_trait]
impl<PT: PendingTables> sender::TxnStore for BurnTxnStore<PT> {
async fn on_prepared(&self, txn: &solana::Transaction) -> sender::SenderResult<()> {
tracing::info!("txn prepared");

metrics::counter!("burned", "payer" => payer.to_string()).increment(amount);
let signature = txn.get_signature();
let add_pending = self
.pool
.add_pending_transaction(&self.payer, self.amount, signature);

let Ok(()) = add_pending.await else {
tracing::error!("failed to add pending transcation");
return Err(sender::SenderError::preparation(
"could not add pending transaction",
));
};

Ok(())
}

async fn on_finalized(&self, txn: &solana::Transaction) {
tracing::info!("txn finalized");

let Ok(mut db_txn) = self.pool.begin().await else {
tracing::error!("failed to start finalized txn db transaction");
return;
};

let signature = txn.get_signature();
let Ok(()) = db_txn.remove_pending_transaction(signature).await else {
tracing::error!("failed to remove pending");
return;
};

let Ok(()) = db_txn
.subtract_burned_amount(&self.payer, self.amount)
.await
else {
tracing::error!("failed to subtract burned amount");
return;
};

// Subtract balances from map before submitted db txn
let mut balance_lock = self.balances.lock().await;
let payer_account = balance_lock.get_mut(&self.payer).unwrap();
// Reduce the pending burn amount and the payer's balance by the amount we've burned
payer_account.burned = payer_account.burned.saturating_sub(self.amount);
payer_account.balance = payer_account.balance.saturating_sub(self.amount);

let Ok(()) = db_txn.commit().await else {
tracing::error!("failed to commit finalized txn db transaction");
return;
};

metrics::counter!(
"burned",
"payer" => self.payer.to_string(),
"success" => "true"
)
.increment(self.amount);
}

async fn on_error(&self, txn: &solana::Transaction, err: sender::SenderError) {
tracing::warn!(?err, "txn failed");
let Ok(mut db_txn) = self.pool.begin().await else {
tracing::error!("failed to start error transaction");
return;
};

let signature = txn.get_signature();
let Ok(()) = db_txn.remove_pending_transaction(signature).await else {
tracing::error!("failed to remove pending transaction on error");
return;
};

let Ok(()) = db_txn.commit().await else {
tracing::error!("failed to commit on error transaction");
return;
};

metrics::counter!(
"burned",
"payer" => self.payer.to_string(),
"success" => "false"
)
.increment(self.amount);
}
}
2 changes: 1 addition & 1 deletion iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Cmd {
bail!("Missing solana section in settings");
};
// Set up the solana RpcClient:
Some(SolanaRpc::new(solana_settings).await?)
Some(SolanaRpc::new(solana_settings, solana::SubDao::Iot).await?)
} else {
None
};
Expand Down
Loading