From 992f6d1809804149e08c0d34fd890fe628a4fba5 Mon Sep 17 00:00:00 2001 From: f3kilo Date: Fri, 29 Nov 2024 12:23:58 +0300 Subject: [PATCH] better cli args types + use channel --- bin/reth/src/bitfinity_tasks/send_txs.rs | 12 ++-- bin/reth/src/main.rs | 4 +- bin/reth/tests/commands/bitfinity_node_it.rs | 63 +++++++++----------- crates/node/core/src/args/bitfinity_args.rs | 15 +++-- crates/rpc/rpc-eth-api/Cargo.toml | 2 +- 5 files changed, 48 insertions(+), 48 deletions(-) diff --git a/bin/reth/src/bitfinity_tasks/send_txs.rs b/bin/reth/src/bitfinity_tasks/send_txs.rs index 33fad3856fbd..a5ceb341d379 100644 --- a/bin/reth/src/bitfinity_tasks/send_txs.rs +++ b/bin/reth/src/bitfinity_tasks/send_txs.rs @@ -26,8 +26,8 @@ pub struct BitfinityTransactionSender { queue: SharedQueue, rpc_url: String, period: Duration, - batch_size: u32, - txs_per_execution_threshold: u32, + batch_size: usize, + txs_per_execution_threshold: usize, } impl BitfinityTransactionSender { @@ -36,8 +36,8 @@ impl BitfinityTransactionSender { queue: SharedQueue, rpc_url: String, period: Duration, - batch_size: u32, - txs_per_execution_threshold: u32, + batch_size: usize, + txs_per_execution_threshold: usize, ) -> Self { Self { queue, rpc_url, period, batch_size, txs_per_execution_threshold } } @@ -90,14 +90,14 @@ impl BitfinityTransactionSender { } total_sent += to_send.len(); - if total_sent > self.txs_per_execution_threshold as usize { + if total_sent > self.txs_per_execution_threshold { return Ok(()); } } } async fn get_transactions_to_send(&self) -> Vec<(U256, Vec)> { - let mut batch = Vec::with_capacity(self.batch_size as _); + let mut batch = Vec::with_capacity(self.batch_size); let mut queue = self.queue.lock().await; let txs_to_pop = self.batch_size.max(1); // if batch size is zero, take at least one tx. diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index 5dc8d76c8393..f8ec74db2444 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -40,7 +40,7 @@ fn main() { // Add custom forwarder with transactions priority queue. queue .blocking_lock() - .set_size_limit(ctx.config().bitfinity_import_arg.tx_queue_size as _); + .set_size_limit(ctx.config().bitfinity_import_arg.tx_queue_size); let forwarder = BitfinityTransactionsForwarder::new(queue); ctx.registry.set_eth_raw_transaction_forwarder(Arc::new(forwarder)); @@ -76,7 +76,7 @@ fn main() { let url = bitfinity.send_raw_transaction_rpc_url.unwrap_or(bitfinity.rpc_url); // Init transaction sending cycle. - let period = Duration::from_secs(bitfinity.send_queued_txs_period_secs as _); + let period = Duration::from_secs(bitfinity.send_queued_txs_period_secs); let transaction_sending = BitfinityTransactionSender::new( queue_clone, url, diff --git a/bin/reth/tests/commands/bitfinity_node_it.rs b/bin/reth/tests/commands/bitfinity_node_it.rs index 447a4ebdd53c..adce2b7a5da4 100644 --- a/bin/reth/tests/commands/bitfinity_node_it.rs +++ b/bin/reth/tests/commands/bitfinity_node_it.rs @@ -28,6 +28,7 @@ use reth_transaction_pool::test_utils::MockTransaction; use revm_primitives::{Address, B256, U256}; use std::time::Duration; use std::{net::SocketAddr, str::FromStr, sync::Arc}; +use tokio::sync::mpsc::Receiver; use tokio::sync::Mutex; #[tokio::test] @@ -45,7 +46,7 @@ async fn bitfinity_test_node_forward_ic_or_eth_get_last_certified_block() { // Arrange let _log = init_logs(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; let (reth_client, _reth_node) = @@ -75,7 +76,7 @@ async fn bitfinity_test_node_forward_get_gas_price_requests() { // Arrange let _log = init_logs(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let gas_price = eth_server.gas_price; let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; @@ -94,7 +95,7 @@ async fn bitfinity_test_node_forward_max_priority_fee_per_gas_requests() { // Arrange let _log = init_logs(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let max_priority_fee_per_gas = eth_server.max_priority_fee_per_gas; let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; @@ -113,7 +114,7 @@ async fn bitfinity_test_node_forward_eth_get_genesis_balances() { // Arrange let _log = init_logs(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; let (reth_client, _reth_node) = @@ -147,7 +148,7 @@ async fn bitfinity_test_node_forward_ic_get_genesis_balances() { // Arrange let _log = init_logs(); - let eth_server = EthImpl::new(); + let eth_server = EthImpl::default(); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; let (reth_client, _reth_node) = @@ -174,8 +175,8 @@ async fn bitfinity_test_node_forward_send_raw_transaction_requests() { // Arrange let _log = init_logs(); - let eth_server = EthImpl::new(); - let received_txs = eth_server.txs.clone(); + let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10); + let eth_server = EthImpl::new(Some(tx_sender)); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; @@ -193,9 +194,9 @@ async fn bitfinity_test_node_forward_send_raw_transaction_requests() { // Assert assert_eq!(result.to_fixed_bytes(), expected_tx_hash.0.to_fixed_bytes()); - assert!(check_transactions_received(&received_txs, 1).await); + let received_txs = consume_received_txs(&mut tx_receiver, 1).await.unwrap(); - assert_eq!(received_txs.lock().await[0].0, expected_tx_hash.0.to_fixed_bytes()); + assert_eq!(received_txs[0].0, expected_tx_hash.0.to_fixed_bytes()); } #[tokio::test] @@ -203,8 +204,8 @@ async fn bitfinity_test_node_send_raw_transaction_in_gas_price_order() { // Arrange let _log = init_logs(); - let eth_server = EthImpl::new(); - let received_txs = eth_server.txs.clone(); + let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(10); + let eth_server = EthImpl::new(Some(tx_sender)); let (_server, eth_server_address) = mock_eth_server_start(EthServer::into_rpc(eth_server)).await; @@ -226,29 +227,27 @@ async fn bitfinity_test_node_send_raw_transaction_in_gas_price_order() { assert_eq!(hash.to_fixed_bytes(), expected_hash.0.to_fixed_bytes()); } - assert!(check_transactions_received(&received_txs, TXS_NUMBER).await); + let received_txs = consume_received_txs(&mut tx_receiver, 10).await.unwrap(); for (idx, expected_hash) in expected_hashes.iter().rev().enumerate() { - assert_eq!(received_txs.lock().await[idx].0, expected_hash.0.to_fixed_bytes()); + assert_eq!(received_txs[idx].0, expected_hash.0.to_fixed_bytes()); } } /// Waits until `n` transactions appear in `received_txs` with one second timeout. /// Returns true if `received_txs` contains at least `n` transactions. -async fn check_transactions_received(received_txs: &Mutex>, n: usize) -> bool { +async fn consume_received_txs(received_txs: &mut Receiver, n: usize) -> Option> { let wait_future = async { - loop { - let txs_number = received_txs.lock().await.len(); - if txs_number >= n { - break; - } - - tokio::time::sleep(Duration::from_millis(50)).await; + let mut txs = Vec::with_capacity(n); + while txs.len() < n { + let tx = received_txs.recv().await.unwrap(); + txs.push(tx); } + txs }; let wait_result = tokio::time::timeout(Duration::from_secs(3), wait_future).await; - wait_result.is_ok() + wait_result.ok() } fn transaction_with_gas_price(gas_price: u128) -> TransactionSigned { @@ -378,14 +377,12 @@ async fn mock_eth_server_start(methods: impl Into) -> (ServerHandle, So } pub mod eth_server { - use std::sync::Arc; - use alloy_rlp::{Bytes, Decodable}; use ethereum_json_rpc_client::{Block, CertifiedResult, H256}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use reth_primitives::TransactionSigned; use revm_primitives::{hex, Address, B256, U256}; - use tokio::sync::Mutex; + use tokio::sync::mpsc::Sender; #[rpc(server, namespace = "eth")] pub trait Eth { @@ -409,22 +406,18 @@ pub mod eth_server { pub struct EthImpl { pub gas_price: u128, pub max_priority_fee_per_gas: u128, - pub txs: Arc>>, + pub txs_sender: Option>, } impl EthImpl { - pub fn new() -> Self { - Self { - gas_price: rand::random(), - max_priority_fee_per_gas: rand::random(), - txs: Arc::default(), - } + pub fn new(txs_sender: Option>) -> Self { + Self { gas_price: rand::random(), max_priority_fee_per_gas: rand::random(), txs_sender } } } impl Default for EthImpl { fn default() -> Self { - Self::new() + Self::new(None) } } @@ -442,7 +435,9 @@ pub mod eth_server { let decoded = hex::decode(&tx).unwrap(); let tx = TransactionSigned::decode(&mut decoded.as_ref()).unwrap(); let hash = tx.hash(); - self.txs.lock().await.push(hash); + if let Some(sender) = &self.txs_sender { + sender.send(hash).await.unwrap(); + } Ok(hash) } diff --git a/crates/node/core/src/args/bitfinity_args.rs b/crates/node/core/src/args/bitfinity_args.rs index 3af14af963ae..099df41955f2 100644 --- a/crates/node/core/src/args/bitfinity_args.rs +++ b/crates/node/core/src/args/bitfinity_args.rs @@ -58,30 +58,35 @@ pub struct BitfinityImportArgs { pub ic_root_key: String, /// Enable transactions priority queue + /// Default: true #[arg(long, value_name = "TX_PRIORITY_QUEUE", default_value = "true")] pub tx_queue: bool, /// Transactions priority queue will contain this much transactions at max. + /// Default: 1000 #[arg(long, value_name = "TX_PRIORITY_QUEUE_SIZE", default_value = "1000")] - pub tx_queue_size: u32, + pub tx_queue_size: usize, /// Time period to send transactions batch from the priority queue. /// Do nothing, if `tx_queue` is disabled. + /// Default: 3 #[arg(long, value_name = "SEND_QUEUED_TXS_PERIOD_SECS", default_value = "3")] - pub send_queued_txs_period_secs: u32, + pub send_queued_txs_period_secs: u64, /// Send queued transactions by batches with this number of entries. /// If set to 0 or 1, no batching is used. /// Do nothing, if `tx_queue` is disabled. + /// Default: 10 #[arg(long, value_name = "QUEUED_TXS_BATCH_SIZE", default_value = "10")] - pub queued_txs_batch_size: u32, + pub queued_txs_batch_size: usize, /// If transaction sender sent more queued transactions at single execution, /// it will wait for next execution to continue. /// If set to 0, transaction sender will try to empty queue at each execution. /// Do nothing, if `tx_queue` is disabled. - #[arg(long, value_name = "QUEUED_TXS_PER_EXECUTION", default_value = "100")] - pub queued_txs_per_execution_threshold: u32, + /// Default: 500 + #[arg(long, value_name = "QUEUED_TXS_PER_EXECUTION", default_value = "500")] + pub queued_txs_per_execution_threshold: usize, } /// Bitfinity Related Args diff --git a/crates/rpc/rpc-eth-api/Cargo.toml b/crates/rpc/rpc-eth-api/Cargo.toml index 114201465580..825c52f05719 100644 --- a/crates/rpc/rpc-eth-api/Cargo.toml +++ b/crates/rpc/rpc-eth-api/Cargo.toml @@ -32,7 +32,6 @@ reth-rpc-server-types.workspace = true # ethereum alloy-dyn-abi = { workspace = true, features = ["eip712"] } -alloy-rlp.workspace = true # rpc jsonrpsee = { workspace = true, features = ["server", "macros"] } @@ -49,6 +48,7 @@ dyn-clone.workspace = true tracing.workspace = true # Bitfinity dependencies +alloy-rlp.workspace = true ethereum-json-rpc-client.workspace = true [features]