Skip to content

Commit

Permalink
Clippy and fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
techraed committed Nov 28, 2024
1 parent a569591 commit 8b40859
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 56 deletions.
18 changes: 11 additions & 7 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use ethexe_processor::{LocalOutcome, ProcessorConfig};
use ethexe_sequencer::agro::AggregatedCommitments;
use ethexe_signer::{Digest, PublicKey, Signature, Signer};
use ethexe_tx_pool::{
EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender, StandardTxPoolInstantiationArtifacts, TxPoolService
EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender,
StandardTxPoolInstantiationArtifacts,
};
use ethexe_validator::BlockCommitmentValidationRequest;
use futures::{future, stream::StreamExt, FutureExt};
Expand Down Expand Up @@ -208,10 +209,14 @@ impl Service {
.transpose()?;

log::info!("🚅 Tx pool service starting...");
let tx_pool_artifacts = TxPoolService::new((db.clone(),));
let tx_pool_artifacts = ethexe_tx_pool::new((db.clone(),));

let rpc = config.rpc_config.as_ref().map(|config| {
ethexe_rpc::RpcService::new(config.clone(), db.clone(), tx_pool_artifacts.input_sender.clone())
ethexe_rpc::RpcService::new(
config.clone(),
db.clone(),
tx_pool_artifacts.input_sender.clone(),
)
});

Ok(Self {
Expand Down Expand Up @@ -492,9 +497,8 @@ impl Service {
None
};


let StandardTxPoolInstantiationArtifacts {
service: tx_pool_service,
let StandardTxPoolInstantiationArtifacts {
service: tx_pool_service,
input_sender: tx_pool_input_task_sender,
output_receiver: mut tx_pool_ouput_task_receiver,
} = tx_pool_artifacts;
Expand Down Expand Up @@ -864,7 +868,7 @@ impl Service {
NetworkMessage::Transaction { transaction } => {
let _ = tx_pool_input_task_sender
.send(InputTask::AddTransaction {
transaction: transaction,
transaction,
response_sender: None,
})
.inspect_err(|e| {
Expand Down
63 changes: 34 additions & 29 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,8 @@ async fn tx_pool_gossip() {

// Prepare tx data
let raw_message = b"hello world".to_vec();
let signature = env.signer
let signature = env
.signer
.sign(env.validators[2], &raw_message)
.expect("failed signing message");
let signature_bytes = signature.encode();
Expand All @@ -840,39 +841,44 @@ async fn tx_pool_gossip() {
})
.await
.expect("failed sending request");

assert!(resp.status().is_success());

tokio::time::sleep(Duration::from_secs(5)).await;

// Check that node0 received the message
let tx = EthexeTransaction::Message { raw_message, signature: signature_bytes };
let tx = EthexeTransaction::Message {
raw_message,
signature: signature_bytes,
};
let tx_hash = tx.tx_hash();

let tx_data = node0.db.validated_transaction(tx_hash).expect("tx not found");
let node0_db_tx: EthexeTransaction = Decode::decode(&mut &tx_data[..]).expect("failed to decode tx");

let tx_data = node0
.db
.validated_transaction(tx_hash)
.expect("tx not found");
let node0_db_tx: EthexeTransaction =
Decode::decode(&mut &tx_data[..]).expect("failed to decode tx");
assert_eq!(node0_db_tx, tx);
}

async fn send_json_request(rpc_server_url: String, create_request: impl Fn() -> serde_json::Value) -> Result<reqwest::Response, reqwest::Error> {
async fn send_json_request(
rpc_server_url: String,
create_request: impl Fn() -> serde_json::Value,
) -> Result<reqwest::Response, reqwest::Error> {
let client = reqwest::Client::new();
let req_body = create_request();

client
.post(rpc_server_url)
.json(&req_body)
.send()
.await
client.post(rpc_server_url).json(&req_body).send().await
}

mod utils {
use std::net::SocketAddr;
use super::*;
use ethexe_observer::SimpleBlockData;
use ethexe_rpc::{RpcConfig, RpcService};
use ethexe_tx_pool::TxPoolService;
use futures::StreamExt;
use gear_core::message::ReplyCode;
use std::net::SocketAddr;
use tokio::sync::{broadcast::Sender, Mutex};

pub struct TestEnv {
Expand Down Expand Up @@ -1208,10 +1214,7 @@ mod utils {

pub fn service_rpc(mut self, rpc_port: u16) -> Self {
let service_rpc_config = RpcConfig {
listen_addr: SocketAddr::new(
"127.0.0.1".parse().unwrap(),
rpc_port
)
listen_addr: SocketAddr::new("127.0.0.1".parse().unwrap(), rpc_port),
};
self.service_rpc_config = Some(service_rpc_config);

Expand Down Expand Up @@ -1421,13 +1424,15 @@ mod utils {
None => None,
};

let tx_pool_artifacts = TxPoolService::new((self.db.clone(),));
let tx_pool_artifacts = ethexe_tx_pool::new((self.db.clone(),));

let rpc = self.service_rpc_config
.as_ref()
.map(|service_rpc_config| {
RpcService::new(service_rpc_config.clone(), self.db.clone(), tx_pool_artifacts.input_sender.clone())
});
let rpc = self.service_rpc_config.as_ref().map(|service_rpc_config| {
RpcService::new(
service_rpc_config.clone(),
self.db.clone(),
tx_pool_artifacts.input_sender.clone(),
)
});

let service = Service::new_from_parts(
self.db.clone(),
Expand All @@ -1442,7 +1447,7 @@ mod utils {
validator,
None,
rpc,
tx_pool_artifacts
tx_pool_artifacts,
);

let handle = task::spawn(service.run());
Expand All @@ -1461,11 +1466,11 @@ mod utils {
let _ = handle.await;
self.multiaddr = None;
}

pub fn service_rpc_url(&self) -> Option<String> {
self.service_rpc_config.as_ref().map(|rpc|
format!("http://{}", rpc.listen_addr.to_string())
)
self.service_rpc_config
.as_ref()
.map(|rpc| format!("http://{}", rpc.listen_addr))
}
}

Expand Down
4 changes: 2 additions & 2 deletions ethexe/tx-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ mod transaction;
mod tests;

pub use service::{
InputTask, OutputTask, TxPoolInputTaskSender, TxPoolOutputTaskReceiver, TxPoolService,
TxPoolInstantiationArtifacts,
new, InputTask, OutputTask, TxPoolInputTaskSender, TxPoolInstantiationArtifacts,
TxPoolOutputTaskReceiver, TxPoolService,
};
pub use transaction::{EthexeTransaction, Transaction};

Expand Down
41 changes: 23 additions & 18 deletions ethexe/tx-pool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,29 @@ use input::TxPoolInputTaskReceiver;
use output::TxPoolOutputTaskSender;
use tokio::sync::mpsc;

/// Creates a new transaction pool service.
pub fn new<Tx, TxPool>(tx_pool_core: impl Into<TxPool>) -> TxPoolInstantiationArtifacts<Tx, TxPool>
where
Tx: Transaction + Clone,
TxPool: TxPoolTrait<Transaction = Tx>,
{
let tx_pool_core = tx_pool_core.into();
let (tx_in, rx_in) = mpsc::unbounded_channel();
let (tx_out, rx_out) = mpsc::unbounded_channel();

let service = TxPoolService {
core: tx_pool_core,
input_interface: TxPoolInputTaskReceiver { receiver: rx_in },
output_inteface: TxPoolOutputTaskSender { sender: tx_out },
};

TxPoolInstantiationArtifacts {
service,
input_sender: TxPoolInputTaskSender { sender: tx_in },
output_receiver: TxPoolOutputTaskReceiver { receiver: rx_out },
}
}

/// Transaction pool instantiation artifacts carrier.
pub struct TxPoolInstantiationArtifacts<Tx: Transaction, TxPool: TxPoolTrait<Transaction = Tx>> {
pub service: TxPoolService<Tx, TxPool>,
Expand All @@ -43,24 +66,6 @@ pub struct TxPoolService<Tx: Transaction, TxPool: TxPoolTrait<Transaction = Tx>>
}

impl<Tx: Transaction + Clone, TxPool: TxPoolTrait<Transaction = Tx>> TxPoolService<Tx, TxPool> {
pub fn new(tx_pool_core: impl Into<TxPool>) -> TxPoolInstantiationArtifacts<Tx, TxPool> {
let tx_pool_core = tx_pool_core.into();
let (tx_in, rx_in) = mpsc::unbounded_channel();
let (tx_out, rx_out) = mpsc::unbounded_channel();

let service = Self {
core: tx_pool_core,
input_interface: TxPoolInputTaskReceiver { receiver: rx_in },
output_inteface: TxPoolOutputTaskSender { sender: tx_out },
};

TxPoolInstantiationArtifacts {
service,
input_sender: TxPoolInputTaskSender { sender: tx_in },
output_receiver: TxPoolOutputTaskReceiver { receiver: rx_out },
}
}

/// Runs transaction pool service expecting to receive tasks from the
/// tx pool input task sender.
pub async fn run(mut self) {
Expand Down

0 comments on commit 8b40859

Please sign in to comment.