diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index d3927a32c07..6f5b577f6dd 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -35,7 +35,8 @@ use ethexe_processor::{LocalOutcome, ProcessorConfig}; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; use ethexe_tx_pool::{ - EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender, StandardTxPoolInstantiationArtifacts, TxPoolService + EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender, + StandardTxPoolInstantiationArtifacts, }; use ethexe_validator::BlockCommitmentValidationRequest; use futures::{future, stream::StreamExt, FutureExt}; @@ -208,10 +209,14 @@ impl Service { .transpose()?; log::info!("🚅 Tx pool service starting..."); - let tx_pool_artifacts = TxPoolService::new((db.clone(),)); + let tx_pool_artifacts = ethexe_tx_pool::new((db.clone(),)); let rpc = config.rpc_config.as_ref().map(|config| { - ethexe_rpc::RpcService::new(config.clone(), db.clone(), tx_pool_artifacts.input_sender.clone()) + ethexe_rpc::RpcService::new( + config.clone(), + db.clone(), + tx_pool_artifacts.input_sender.clone(), + ) }); Ok(Self { @@ -492,9 +497,8 @@ impl Service { None }; - - let StandardTxPoolInstantiationArtifacts { - service: tx_pool_service, + let StandardTxPoolInstantiationArtifacts { + service: tx_pool_service, input_sender: tx_pool_input_task_sender, output_receiver: mut tx_pool_ouput_task_receiver, } = tx_pool_artifacts; @@ -864,7 +868,7 @@ impl Service { NetworkMessage::Transaction { transaction } => { let _ = tx_pool_input_task_sender .send(InputTask::AddTransaction { - transaction: transaction, + transaction, response_sender: None, }) .inspect_err(|e| { diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index c94712da108..f5f7def73cf 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -820,7 +820,8 @@ async fn tx_pool_gossip() { // Prepare tx data let raw_message = b"hello world".to_vec(); - let signature = env.signer + let signature = env + .signer .sign(env.validators[2], &raw_message) .expect("failed signing message"); let signature_bytes = signature.encode(); @@ -840,39 +841,44 @@ async fn tx_pool_gossip() { }) .await .expect("failed sending request"); - + assert!(resp.status().is_success()); tokio::time::sleep(Duration::from_secs(5)).await; // Check that node0 received the message - let tx = EthexeTransaction::Message { raw_message, signature: signature_bytes }; + let tx = EthexeTransaction::Message { + raw_message, + signature: signature_bytes, + }; let tx_hash = tx.tx_hash(); - - let tx_data = node0.db.validated_transaction(tx_hash).expect("tx not found"); - let node0_db_tx: EthexeTransaction = Decode::decode(&mut &tx_data[..]).expect("failed to decode tx"); + + let tx_data = node0 + .db + .validated_transaction(tx_hash) + .expect("tx not found"); + let node0_db_tx: EthexeTransaction = + Decode::decode(&mut &tx_data[..]).expect("failed to decode tx"); assert_eq!(node0_db_tx, tx); } -async fn send_json_request(rpc_server_url: String, create_request: impl Fn() -> serde_json::Value) -> Result { +async fn send_json_request( + rpc_server_url: String, + create_request: impl Fn() -> serde_json::Value, +) -> Result { let client = reqwest::Client::new(); let req_body = create_request(); - client - .post(rpc_server_url) - .json(&req_body) - .send() - .await + client.post(rpc_server_url).json(&req_body).send().await } mod utils { - use std::net::SocketAddr; use super::*; use ethexe_observer::SimpleBlockData; use ethexe_rpc::{RpcConfig, RpcService}; - use ethexe_tx_pool::TxPoolService; use futures::StreamExt; use gear_core::message::ReplyCode; + use std::net::SocketAddr; use tokio::sync::{broadcast::Sender, Mutex}; pub struct TestEnv { @@ -1208,10 +1214,7 @@ mod utils { pub fn service_rpc(mut self, rpc_port: u16) -> Self { let service_rpc_config = RpcConfig { - listen_addr: SocketAddr::new( - "127.0.0.1".parse().unwrap(), - rpc_port - ) + listen_addr: SocketAddr::new("127.0.0.1".parse().unwrap(), rpc_port), }; self.service_rpc_config = Some(service_rpc_config); @@ -1421,13 +1424,15 @@ mod utils { None => None, }; - let tx_pool_artifacts = TxPoolService::new((self.db.clone(),)); + let tx_pool_artifacts = ethexe_tx_pool::new((self.db.clone(),)); - let rpc = self.service_rpc_config - .as_ref() - .map(|service_rpc_config| { - RpcService::new(service_rpc_config.clone(), self.db.clone(), tx_pool_artifacts.input_sender.clone()) - }); + let rpc = self.service_rpc_config.as_ref().map(|service_rpc_config| { + RpcService::new( + service_rpc_config.clone(), + self.db.clone(), + tx_pool_artifacts.input_sender.clone(), + ) + }); let service = Service::new_from_parts( self.db.clone(), @@ -1442,7 +1447,7 @@ mod utils { validator, None, rpc, - tx_pool_artifacts + tx_pool_artifacts, ); let handle = task::spawn(service.run()); @@ -1461,11 +1466,11 @@ mod utils { let _ = handle.await; self.multiaddr = None; } - + pub fn service_rpc_url(&self) -> Option { - self.service_rpc_config.as_ref().map(|rpc| - format!("http://{}", rpc.listen_addr.to_string()) - ) + self.service_rpc_config + .as_ref() + .map(|rpc| format!("http://{}", rpc.listen_addr)) } } diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs index 9db31ebff2f..8dfa1251cb7 100644 --- a/ethexe/tx-pool/src/lib.rs +++ b/ethexe/tx-pool/src/lib.rs @@ -25,8 +25,8 @@ mod transaction; mod tests; pub use service::{ - InputTask, OutputTask, TxPoolInputTaskSender, TxPoolOutputTaskReceiver, TxPoolService, - TxPoolInstantiationArtifacts, + new, InputTask, OutputTask, TxPoolInputTaskSender, TxPoolInstantiationArtifacts, + TxPoolOutputTaskReceiver, TxPoolService, }; pub use transaction::{EthexeTransaction, Transaction}; diff --git a/ethexe/tx-pool/src/service.rs b/ethexe/tx-pool/src/service.rs index 8c95253eaa6..f49a4c6c1bb 100644 --- a/ethexe/tx-pool/src/service.rs +++ b/ethexe/tx-pool/src/service.rs @@ -26,6 +26,29 @@ use input::TxPoolInputTaskReceiver; use output::TxPoolOutputTaskSender; use tokio::sync::mpsc; +/// Creates a new transaction pool service. +pub fn new(tx_pool_core: impl Into) -> TxPoolInstantiationArtifacts +where + Tx: Transaction + Clone, + TxPool: TxPoolTrait, +{ + let tx_pool_core = tx_pool_core.into(); + let (tx_in, rx_in) = mpsc::unbounded_channel(); + let (tx_out, rx_out) = mpsc::unbounded_channel(); + + let service = TxPoolService { + core: tx_pool_core, + input_interface: TxPoolInputTaskReceiver { receiver: rx_in }, + output_inteface: TxPoolOutputTaskSender { sender: tx_out }, + }; + + TxPoolInstantiationArtifacts { + service, + input_sender: TxPoolInputTaskSender { sender: tx_in }, + output_receiver: TxPoolOutputTaskReceiver { receiver: rx_out }, + } +} + /// Transaction pool instantiation artifacts carrier. pub struct TxPoolInstantiationArtifacts> { pub service: TxPoolService, @@ -43,24 +66,6 @@ pub struct TxPoolService> } impl> TxPoolService { - pub fn new(tx_pool_core: impl Into) -> TxPoolInstantiationArtifacts { - let tx_pool_core = tx_pool_core.into(); - let (tx_in, rx_in) = mpsc::unbounded_channel(); - let (tx_out, rx_out) = mpsc::unbounded_channel(); - - let service = Self { - core: tx_pool_core, - input_interface: TxPoolInputTaskReceiver { receiver: rx_in }, - output_inteface: TxPoolOutputTaskSender { sender: tx_out }, - }; - - TxPoolInstantiationArtifacts { - service, - input_sender: TxPoolInputTaskSender { sender: tx_in }, - output_receiver: TxPoolOutputTaskReceiver { receiver: rx_out }, - } - } - /// Runs transaction pool service expecting to receive tasks from the /// tx pool input task sender. pub async fn run(mut self) {