Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPROD-1061][EPROD-1083] Pool and batching for eth_sendRawTransaction calls #46

Open
wants to merge 33 commits into
base: bitfinity-archive-node
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
713006f
txs priority queue
F3kilo Nov 21, 2024
5faa2d3
added forwarder to the node
F3kilo Nov 21, 2024
c359295
transaction sender
F3kilo Nov 22, 2024
2d87ebb
tests
F3kilo Nov 25, 2024
67fe67d
single tx test with queue
F3kilo Nov 25, 2024
119312f
test transactions order
F3kilo Nov 25, 2024
690b162
test fixed
F3kilo Nov 27, 2024
5289ab1
tests fixed
F3kilo Nov 27, 2024
3a5b76d
clear global state
F3kilo Nov 27, 2024
4502c54
build fixed
F3kilo Nov 27, 2024
d26ccc7
remove global txs list
F3kilo Nov 29, 2024
3e1a018
move send txs code to tasks module
F3kilo Nov 29, 2024
992f6d1
better cli args types + use channel
F3kilo Nov 29, 2024
b4ca64a
get transactions from forwarder, reth, and evmc
F3kilo Dec 3, 2024
b589177
better tests + clippy fixes
F3kilo Dec 4, 2024
ccbec99
query transactions test
F3kilo Dec 5, 2024
b2a6db0
taks manager live long enough
F3kilo Dec 6, 2024
01c73c6
fixed block number bug
F3kilo Dec 6, 2024
6afe3f5
increased timeout to fix test
F3kilo Dec 6, 2024
170cecc
increase timeout in another test
F3kilo Dec 6, 2024
da1747b
higher timeout for bitfinity_test_reset_should_extract_all_accounts_data
F3kilo Dec 9, 2024
76b59b3
much more timeout
F3kilo Dec 9, 2024
bc30706
send tx batches concurrently
F3kilo Dec 10, 2024
78ccfba
use newer evm-sdk
F3kilo Dec 16, 2024
a0da333
Merge branch 'bitfinity-archive-node' into send_raw_transaction_pool
F3kilo Jan 6, 2025
79dd6b8
conflicts solved + generic tx in BitfinityEvmRpc
F3kilo Jan 13, 2025
3f1457c
Merge branch 'bitfinity-archive-node' into send_raw_transaction_pool
F3kilo Jan 13, 2025
ae0eb85
tx forwarder as EthApi module
F3kilo Jan 14, 2025
82c8696
put tx forwarder into EthApi
F3kilo Jan 14, 2025
31537d0
compilation fixed
F3kilo Jan 14, 2025
297e6b2
tests fixed
F3kilo Jan 14, 2025
7228809
revert unnecessary changes in reth files
F3kilo Jan 16, 2025
e4cbd25
re-apply bitfinity changes
F3kilo Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
414 changes: 219 additions & 195 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,14 @@ iai-callgrind = "0.11"
# Bitfinity Deps
async-channel = "2"
candid = "0.10"
did = { git = "https://github.com/bitfinity-network/bitfinity-evm-sdk", package = "did", features = ["alloy-primitives-07"], tag = "v0.34.x" }
did = { git = "https://github.com/bitfinity-network/bitfinity-evm-sdk", package = "did", features = ["alloy-primitives-07"], tag = "v0.36.x" }
dirs = "5.0.1"
ethereum-json-rpc-client = { git = "https://github.com/bitfinity-network/bitfinity-evm-sdk", package = "ethereum-json-rpc-client", tag = "v0.34.x", features = [
ethereum-json-rpc-client = { git = "https://github.com/bitfinity-network/bitfinity-evm-sdk", package = "ethereum-json-rpc-client", tag = "v0.36.x", features = [
"reqwest",
] }
evm-canister-client = { git = "https://github.com/bitfinity-network/bitfinity-evm-sdk", package = "evm-canister-client", features = [
"ic-agent-client",
], tag = "v0.34.x" }
], tag = "v0.36.x" }
ic-cbor = "2.3"
ic-certificate-verification = "2.3"
ic-certification = "2.3"
Expand Down
3 changes: 3 additions & 0 deletions bin/reth/src/bitfinity_tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! This module contains tasks to run in parallel with reth node.

pub mod send_txs;
370 changes: 370 additions & 0 deletions bin/reth/src/bitfinity_tasks/send_txs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,370 @@
//! Utils for raw transaction batching.

use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use std::time::Duration;

use crate::primitives::ruint::Uint;
use alloy_rlp::Decodable;
use did::H256;
use ethereum_json_rpc_client::reqwest::ReqwestClient;
use ethereum_json_rpc_client::{EthJsonRpcClient, Id, Params};
use eyre::eyre;
use futures::future::join_all;
use lightspeed_scheduler::job::Job;
use lightspeed_scheduler::scheduler::Scheduler;
use lightspeed_scheduler::JobExecutor;
use reth_node_core::version::SHORT_VERSION;
use reth_primitives::hex;
use reth_primitives::{TransactionSigned, B256, U256};
use reth_rpc::eth::RawTransactionForwarder;
use reth_rpc_eth_types::{EthApiError, EthResult};
use tokio::sync::Mutex;
use tracing::{debug, info, trace, warn};

/// Alias for multithread transactions priority queue.
pub type SharedQueue = Arc<Mutex<TransactionsPriorityQueue>>;

/// Periodically sends transactions from priority queue.
#[derive(Debug, Clone)]
pub struct BitfinityTransactionSender {
queue: SharedQueue,
rpc_url: String,
period: Duration,
batch_size: usize,
txs_per_execution_threshold: usize,
}

impl BitfinityTransactionSender {
/// Creates new instance of the transaction sender.
pub const fn new(
queue: SharedQueue,
rpc_url: String,
period: Duration,
batch_size: usize,
txs_per_execution_threshold: usize,
) -> Self {
Self { queue, rpc_url, period, batch_size, txs_per_execution_threshold }
}

/// Schedule the transaction sending job and return a handle to it.
pub async fn schedule_execution(
self,
job_executor: Option<JobExecutor>,
) -> eyre::Result<(JobExecutor, tokio::task::JoinHandle<()>)> {
info!(target: "reth::cli - BitfinityTransactionSender", "reth {} starting", SHORT_VERSION);

let job_executor = job_executor.unwrap_or_else(JobExecutor::new_with_local_tz);

// Schedule the import job
{
let interval =
Scheduler::Interval { interval_duration: self.period, execute_at_startup: true };
job_executor
.add_job_with_scheduler(
interval,
Job::new("send transactions", "bitfinity tx sending", None, move || {
let tx_sender = self.clone();
Box::pin(async move {
tx_sender.single_execution().await?;
Ok(())
})
}),
)
.await;
}

let job_handle = job_executor.run().await?;
Ok((job_executor, job_handle))
}

/// Execute the transaction sending job.
pub async fn single_execution(&self) -> eyre::Result<()> {
let mut to_send = self.get_transactions_to_send().await;
let batch_size = self.batch_size.max(1);
let mut send_futures = vec![];

loop {
let last_idx = batch_size.min(to_send.len());
if last_idx == 0 {
break;
}

let to_send_batch: Vec<_> = to_send.drain(..last_idx).collect();

let send_future = async move {
let result = match to_send_batch.len() {
0 => return,
1 => self.send_single_tx(&to_send_batch[0].1).await,
_ => self.send_txs_batch(&to_send_batch).await,
};

if let Err(e) = result {
warn!("Failed to send transactions to EVM: {e}");
}
};
send_futures.push(send_future);
}

join_all(send_futures).await;

Ok(())
}

async fn get_transactions_to_send(&self) -> Vec<(U256, Vec<u8>)> {
let mut batch = Vec::with_capacity(self.txs_per_execution_threshold);
let mut queue = self.queue.lock().await;
let txs_to_pop = self.txs_per_execution_threshold.max(1); // if batch size is zero, take at least one tx.

for _ in 0..txs_to_pop {
let Some(entry) = queue.pop_tx_with_highest_price() else {
break;
};

batch.push(entry);
}

batch
}

async fn send_single_tx(&self, to_send: &[u8]) -> Result<(), eyre::Error> {
let client = self.get_client()?;
let hash = client
.send_raw_transaction_bytes(to_send)
.await
.map_err(|e| eyre!("failed to send single transaction: {e}"))?;

trace!("Single transaction with hash {hash} sent.");

Ok(())
}

async fn send_txs_batch(&self, to_send: &[(U256, Vec<u8>)]) -> Result<(), eyre::Error> {
let client = self.get_client()?;

let params =
to_send.iter().map(|(_, raw)| (Params::Array(vec![hex::encode(raw).into()]), Id::Null));
let max_batch_size = usize::MAX;
let hashes = client
.batch_request::<H256>("eth_sendRawTransaction".into(), params, max_batch_size)
.await
.map_err(|e| eyre!("failed to send single transaction: {e}"))?;

trace!("Raw transactions batch sent. Hashes: {hashes:?}");

Ok(())
}

fn get_client(&self) -> eyre::Result<EthJsonRpcClient<ReqwestClient>> {
let client = EthJsonRpcClient::new(ReqwestClient::new(self.rpc_url.clone()));

Ok(client)
}
}

/// Forwarder to push transactions to the priority queue.
#[derive(Debug)]
pub struct BitfinityTransactionsForwarder {
queue: SharedQueue,
}

impl BitfinityTransactionsForwarder {
/// Creates new forwarder with the given parameters.
pub const fn new(queue: SharedQueue) -> Self {
Self { queue }
}
}

#[async_trait::async_trait]
impl RawTransactionForwarder for BitfinityTransactionsForwarder {
async fn forward_raw_transaction(&self, raw: &[u8]) -> EthResult<()> {
let typed_tx = TransactionSigned::decode(&mut (&raw[..])).map_err(|e| {
warn!("Failed to decode signed transaction in the BitfinityTransactionsForwarder: {e}");
EthApiError::FailedToDecodeSignedTransaction
})?;

debug!("Pushing tx with hash {} to priority queue", typed_tx.hash);
let gas_price = typed_tx.effective_gas_price(None);

self.queue.lock().await.push(typed_tx.hash(), Uint::from(gas_price), raw.to_vec());

Ok(())
}

async fn get_transaction_by_hash(&self, hash: B256) -> Option<Vec<u8>> {
self.queue.lock().await.get(&hash)
}
}

/// Priority queue to get transactions sorted by gas price.
#[derive(Debug)]
pub struct TransactionsPriorityQueue {
priority: BTreeSet<TxKey>,
transactions: HashMap<B256, Vec<u8>>,
size_limit: usize,
}

impl TransactionsPriorityQueue {
/// Creates new instance of the queue with the given limil.
pub fn new(size_limit: usize) -> Self {
Self { priority: BTreeSet::default(), transactions: HashMap::default(), size_limit }
}

/// Adds the tx with the given gas price.
pub fn push(&mut self, hash: B256, gas_price: U256, tx: Vec<u8>) {
let key = TxKey { gas_price, hash };
self.priority.insert(key);
self.transactions.insert(hash, tx);

if self.len() > self.size_limit {
self.pop_tx_with_lowest_price();
}
}

/// Returns raw transaction if it is present in the queue.
pub fn get(&self, hash: &B256) -> Option<Vec<u8>> {
self.transactions.get(hash).cloned()
}

/// Returns tx with highest gas price, if present.
pub fn pop_tx_with_highest_price(&mut self) -> Option<(U256, Vec<u8>)> {
let tx_key = self.priority.pop_last()?;
let Some(tx) = self.transactions.remove(&tx_key.hash) else {
warn!("Transaction key present in priority queue, but not found in transactions map.");
return None;
};

Some((tx_key.gas_price, tx))
}

/// Returns tx with lowest gas price, if present.
pub fn pop_tx_with_lowest_price(&mut self) -> Option<(U256, Vec<u8>)> {
let tx_key = self.priority.pop_first()?;
let Some(tx) = self.transactions.remove(&tx_key.hash) else {
warn!("Transaction key present in priority queue, but not found in transactions map.");
return None;
};

Some((tx_key.gas_price, tx))
}

/// Number of transactions in the queue.
pub fn len(&self) -> usize {
self.transactions.len()
}

/// Change size limit of the queue.
pub fn set_size_limit(&mut self, new_limit: usize) {
self.size_limit = new_limit;

while self.len() > self.size_limit {
self.pop_tx_with_lowest_price();
}
}

/// Returns true if length == 0.
pub fn is_empty(&self) -> bool {
self.transactions.is_empty()
}
}

/// This struct will sort transactions by gas price,
/// but if it is equal, the key will still be different due to hash difference.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct TxKey {
gas_price: U256,
hash: B256,
}

#[cfg(test)]
mod tests {
use super::TransactionsPriorityQueue;
use crate::primitives::U256;
use reth_primitives::TransactionSigned;
use reth_transaction_pool::test_utils::MockTransaction;
use reth_transaction_pool::PoolTransaction;

#[test]
fn test_pop_order() {
let mut queue = TransactionsPriorityQueue::new(10);
let tx1 = transaction_with_gas_price(100);
let tx2 = transaction_with_gas_price(300);
let tx3 = transaction_with_gas_price(200);

let tx1_bytes = alloy_rlp::encode(&tx1);
let tx2_bytes = alloy_rlp::encode(&tx2);
let tx3_bytes = alloy_rlp::encode(&tx3);

queue.push(tx1.hash(), U256::from(tx1.effective_gas_price(None)), tx1_bytes.clone());
queue.push(tx2.hash(), U256::from(tx2.effective_gas_price(None)), tx2_bytes.clone());
queue.push(tx3.hash(), U256::from(tx3.effective_gas_price(None)), tx3_bytes.clone());

let expected_order = [tx2_bytes, tx3_bytes, tx1_bytes];
for expected_tx in expected_order {
let popped_tx = queue.pop_tx_with_highest_price().unwrap().1;
assert_eq!(popped_tx, expected_tx);
}

assert!(queue.is_empty())
}

#[test]
fn test_size_limit_should_shrink_tx_with_lowest_price() {
let mut queue = TransactionsPriorityQueue::new(2);
let tx1 = transaction_with_gas_price(100);
let tx2 = transaction_with_gas_price(300);
let tx3 = transaction_with_gas_price(200);

let tx1_bytes = alloy_rlp::encode(&tx1);
let tx2_bytes = alloy_rlp::encode(&tx2);
let tx3_bytes = alloy_rlp::encode(&tx3);

queue.push(tx1.hash(), U256::from(tx1.effective_gas_price(None)), tx1_bytes);
queue.push(tx2.hash(), U256::from(tx2.effective_gas_price(None)), tx2_bytes.clone());
queue.push(tx3.hash(), U256::from(tx3.effective_gas_price(None)), tx3_bytes.clone());

let expected_order = [tx2_bytes, tx3_bytes];
for expected_tx in expected_order {
let popped_tx = queue.pop_tx_with_highest_price().unwrap().1;
assert_eq!(popped_tx, expected_tx);
}

assert!(queue.is_empty())
}

#[test]
fn test_get_transaction_from_queue() {
let mut queue = TransactionsPriorityQueue::new(100);
let tx1 = transaction_with_gas_price(100);
let tx2 = transaction_with_gas_price(300);
let tx3 = transaction_with_gas_price(200);

let tx1_bytes = alloy_rlp::encode(&tx1);
let tx2_bytes = alloy_rlp::encode(&tx2);
let tx3_bytes = alloy_rlp::encode(&tx3);

queue.push(tx1.hash(), U256::from(tx1.effective_gas_price(None)), tx1_bytes);
queue.push(tx2.hash(), U256::from(tx2.effective_gas_price(None)), tx2_bytes);
queue.push(tx3.hash(), U256::from(tx3.effective_gas_price(None)), tx3_bytes);

let hashes = [tx1.hash(), tx2.hash(), tx3.hash()];
for hash in hashes {
assert!(queue.get(&hash).is_some());
}

assert_eq!(queue.len(), 3);

let tx4 = transaction_with_gas_price(400);
assert!(queue.get(&tx4.hash()).is_none());
}

fn transaction_with_gas_price(gas_price: u128) -> TransactionSigned {
let tx = MockTransaction::legacy().with_gas_price(gas_price).rng_hash();

TransactionSigned {
hash: *tx.hash(),
signature: Default::default(),
transaction: tx.into(),
}
}
}
Loading
Loading