Skip to content

Commit

Permalink
better cli args types + use channel
Browse files Browse the repository at this point in the history
  • Loading branch information
F3kilo committed Nov 29, 2024
1 parent 3e1a018 commit 992f6d1
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 48 deletions.
12 changes: 6 additions & 6 deletions bin/reth/src/bitfinity_tasks/send_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub struct BitfinityTransactionSender {
queue: SharedQueue,
rpc_url: String,
period: Duration,
batch_size: u32,
txs_per_execution_threshold: u32,
batch_size: usize,
txs_per_execution_threshold: usize,
}

impl BitfinityTransactionSender {
Expand All @@ -36,8 +36,8 @@ impl BitfinityTransactionSender {
queue: SharedQueue,
rpc_url: String,
period: Duration,
batch_size: u32,
txs_per_execution_threshold: u32,
batch_size: usize,
txs_per_execution_threshold: usize,
) -> Self {
Self { queue, rpc_url, period, batch_size, txs_per_execution_threshold }
}
Expand Down Expand Up @@ -90,14 +90,14 @@ impl BitfinityTransactionSender {
}

total_sent += to_send.len();
if total_sent > self.txs_per_execution_threshold as usize {
if total_sent > self.txs_per_execution_threshold {
return Ok(());
}
}
}

async fn get_transactions_to_send(&self) -> Vec<(U256, Vec<u8>)> {
let mut batch = Vec::with_capacity(self.batch_size as _);
let mut batch = Vec::with_capacity(self.batch_size);
let mut queue = self.queue.lock().await;
let txs_to_pop = self.batch_size.max(1); // if batch size is zero, take at least one tx.

Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn main() {
// Add custom forwarder with transactions priority queue.
queue
.blocking_lock()
.set_size_limit(ctx.config().bitfinity_import_arg.tx_queue_size as _);
.set_size_limit(ctx.config().bitfinity_import_arg.tx_queue_size);

let forwarder = BitfinityTransactionsForwarder::new(queue);
ctx.registry.set_eth_raw_transaction_forwarder(Arc::new(forwarder));
Expand Down Expand Up @@ -76,7 +76,7 @@ fn main() {
let url = bitfinity.send_raw_transaction_rpc_url.unwrap_or(bitfinity.rpc_url);

// Init transaction sending cycle.
let period = Duration::from_secs(bitfinity.send_queued_txs_period_secs as _);
let period = Duration::from_secs(bitfinity.send_queued_txs_period_secs);
let transaction_sending = BitfinityTransactionSender::new(
queue_clone,
url,
Expand Down
63 changes: 29 additions & 34 deletions bin/reth/tests/commands/bitfinity_node_it.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use reth_transaction_pool::test_utils::MockTransaction;
use revm_primitives::{Address, B256, U256};
use std::time::Duration;
use std::{net::SocketAddr, str::FromStr, sync::Arc};
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;

#[tokio::test]
Expand All @@ -45,7 +46,7 @@ async fn bitfinity_test_node_forward_ic_or_eth_get_last_certified_block() {
// Arrange
let _log = init_logs();

let eth_server = EthImpl::new();
let eth_server = EthImpl::default();
let (_server, eth_server_address) =
mock_eth_server_start(EthServer::into_rpc(eth_server)).await;
let (reth_client, _reth_node) =
Expand Down Expand Up @@ -75,7 +76,7 @@ async fn bitfinity_test_node_forward_get_gas_price_requests() {
// Arrange
let _log = init_logs();

let eth_server = EthImpl::new();
let eth_server = EthImpl::default();
let gas_price = eth_server.gas_price;
let (_server, eth_server_address) =
mock_eth_server_start(EthServer::into_rpc(eth_server)).await;
Expand All @@ -94,7 +95,7 @@ async fn bitfinity_test_node_forward_max_priority_fee_per_gas_requests() {
// Arrange
let _log = init_logs();

let eth_server = EthImpl::new();
let eth_server = EthImpl::default();
let max_priority_fee_per_gas = eth_server.max_priority_fee_per_gas;
let (_server, eth_server_address) =
mock_eth_server_start(EthServer::into_rpc(eth_server)).await;
Expand All @@ -113,7 +114,7 @@ async fn bitfinity_test_node_forward_eth_get_genesis_balances() {
// Arrange
let _log = init_logs();

let eth_server = EthImpl::new();
let eth_server = EthImpl::default();
let (_server, eth_server_address) =
mock_eth_server_start(EthServer::into_rpc(eth_server)).await;
let (reth_client, _reth_node) =
Expand Down Expand Up @@ -147,7 +148,7 @@ async fn bitfinity_test_node_forward_ic_get_genesis_balances() {
// Arrange
let _log = init_logs();

let eth_server = EthImpl::new();
let eth_server = EthImpl::default();
let (_server, eth_server_address) =
mock_eth_server_start(EthServer::into_rpc(eth_server)).await;
let (reth_client, _reth_node) =
Expand All @@ -174,8 +175,8 @@ async fn bitfinity_test_node_forward_send_raw_transaction_requests() {
// Arrange
let _log = init_logs();

let eth_server = EthImpl::new();
let received_txs = eth_server.txs.clone();
let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10);
let eth_server = EthImpl::new(Some(tx_sender));

let (_server, eth_server_address) =
mock_eth_server_start(EthServer::into_rpc(eth_server)).await;
Expand All @@ -193,18 +194,18 @@ async fn bitfinity_test_node_forward_send_raw_transaction_requests() {
// Assert
assert_eq!(result.to_fixed_bytes(), expected_tx_hash.0.to_fixed_bytes());

assert!(check_transactions_received(&received_txs, 1).await);
let received_txs = consume_received_txs(&mut tx_receiver, 1).await.unwrap();

assert_eq!(received_txs.lock().await[0].0, expected_tx_hash.0.to_fixed_bytes());
assert_eq!(received_txs[0].0, expected_tx_hash.0.to_fixed_bytes());
}

#[tokio::test]
async fn bitfinity_test_node_send_raw_transaction_in_gas_price_order() {
// Arrange
let _log = init_logs();

let eth_server = EthImpl::new();
let received_txs = eth_server.txs.clone();
let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10);
let eth_server = EthImpl::new(Some(tx_sender));

let (_server, eth_server_address) =
mock_eth_server_start(EthServer::into_rpc(eth_server)).await;
Expand All @@ -226,29 +227,27 @@ async fn bitfinity_test_node_send_raw_transaction_in_gas_price_order() {
assert_eq!(hash.to_fixed_bytes(), expected_hash.0.to_fixed_bytes());
}

assert!(check_transactions_received(&received_txs, TXS_NUMBER).await);
let received_txs = consume_received_txs(&mut tx_receiver, 10).await.unwrap();

for (idx, expected_hash) in expected_hashes.iter().rev().enumerate() {
assert_eq!(received_txs.lock().await[idx].0, expected_hash.0.to_fixed_bytes());
assert_eq!(received_txs[idx].0, expected_hash.0.to_fixed_bytes());
}
}

/// Waits until `n` transactions appear in `received_txs` with one second timeout.
/// Returns true if `received_txs` contains at least `n` transactions.
async fn check_transactions_received(received_txs: &Mutex<Vec<B256>>, n: usize) -> bool {
async fn consume_received_txs(received_txs: &mut Receiver<B256>, n: usize) -> Option<Vec<B256>> {
let wait_future = async {
loop {
let txs_number = received_txs.lock().await.len();
if txs_number >= n {
break;
}

tokio::time::sleep(Duration::from_millis(50)).await;
let mut txs = Vec::with_capacity(n);
while txs.len() < n {
let tx = received_txs.recv().await.unwrap();
txs.push(tx);
}
txs
};

let wait_result = tokio::time::timeout(Duration::from_secs(3), wait_future).await;
wait_result.is_ok()
wait_result.ok()
}

fn transaction_with_gas_price(gas_price: u128) -> TransactionSigned {
Expand Down Expand Up @@ -378,14 +377,12 @@ async fn mock_eth_server_start(methods: impl Into<Methods>) -> (ServerHandle, So
}

pub mod eth_server {
use std::sync::Arc;

use alloy_rlp::{Bytes, Decodable};
use ethereum_json_rpc_client::{Block, CertifiedResult, H256};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use reth_primitives::TransactionSigned;
use revm_primitives::{hex, Address, B256, U256};
use tokio::sync::Mutex;
use tokio::sync::mpsc::Sender;

#[rpc(server, namespace = "eth")]
pub trait Eth {
Expand All @@ -409,22 +406,18 @@ pub mod eth_server {
pub struct EthImpl {
pub gas_price: u128,
pub max_priority_fee_per_gas: u128,
pub txs: Arc<Mutex<Vec<B256>>>,
pub txs_sender: Option<Sender<B256>>,
}

impl EthImpl {
pub fn new() -> Self {
Self {
gas_price: rand::random(),
max_priority_fee_per_gas: rand::random(),
txs: Arc::default(),
}
pub fn new(txs_sender: Option<Sender<B256>>) -> Self {
Self { gas_price: rand::random(), max_priority_fee_per_gas: rand::random(), txs_sender }
}
}

impl Default for EthImpl {
fn default() -> Self {
Self::new()
Self::new(None)
}
}

Expand All @@ -442,7 +435,9 @@ pub mod eth_server {
let decoded = hex::decode(&tx).unwrap();
let tx = TransactionSigned::decode(&mut decoded.as_ref()).unwrap();
let hash = tx.hash();
self.txs.lock().await.push(hash);
if let Some(sender) = &self.txs_sender {
sender.send(hash).await.unwrap();
}
Ok(hash)
}

Expand Down
15 changes: 10 additions & 5 deletions crates/node/core/src/args/bitfinity_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,35 @@ pub struct BitfinityImportArgs {
pub ic_root_key: String,

/// Enable transactions priority queue
/// Default: true
#[arg(long, value_name = "TX_PRIORITY_QUEUE", default_value = "true")]
pub tx_queue: bool,

/// Transactions priority queue will contain this much transactions at max.
/// Default: 1000
#[arg(long, value_name = "TX_PRIORITY_QUEUE_SIZE", default_value = "1000")]
pub tx_queue_size: u32,
pub tx_queue_size: usize,

/// Time period to send transactions batch from the priority queue.
/// Do nothing, if `tx_queue` is disabled.
/// Default: 3
#[arg(long, value_name = "SEND_QUEUED_TXS_PERIOD_SECS", default_value = "3")]
pub send_queued_txs_period_secs: u32,
pub send_queued_txs_period_secs: u64,

/// Send queued transactions by batches with this number of entries.
/// If set to 0 or 1, no batching is used.
/// Do nothing, if `tx_queue` is disabled.
/// Default: 10
#[arg(long, value_name = "QUEUED_TXS_BATCH_SIZE", default_value = "10")]
pub queued_txs_batch_size: u32,
pub queued_txs_batch_size: usize,

/// If transaction sender sent more queued transactions at single execution,
/// it will wait for next execution to continue.
/// If set to 0, transaction sender will try to empty queue at each execution.
/// Do nothing, if `tx_queue` is disabled.
#[arg(long, value_name = "QUEUED_TXS_PER_EXECUTION", default_value = "100")]
pub queued_txs_per_execution_threshold: u32,
/// Default: 500
#[arg(long, value_name = "QUEUED_TXS_PER_EXECUTION", default_value = "500")]
pub queued_txs_per_execution_threshold: usize,
}

/// Bitfinity Related Args
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc-eth-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ reth-rpc-server-types.workspace = true

# ethereum
alloy-dyn-abi = { workspace = true, features = ["eip712"] }
alloy-rlp.workspace = true

# rpc
jsonrpsee = { workspace = true, features = ["server", "macros"] }
Expand All @@ -49,6 +48,7 @@ dyn-clone.workspace = true
tracing.workspace = true

# Bitfinity dependencies
alloy-rlp.workspace = true
ethereum-json-rpc-client.workspace = true

[features]
Expand Down

0 comments on commit 992f6d1

Please sign in to comment.