From 8c56cc451566cb14026256e91a6e700f014be407 Mon Sep 17 00:00:00 2001 From: Lev Roitman Date: Mon, 1 Jul 2024 21:12:19 +0300 Subject: [PATCH] refactor: refactor integration tests to use mempool_node modules commit-id:cd2418e0 --- Cargo.lock | 2 +- crates/tests-integration/Cargo.toml | 3 +- .../src/integration_test_setup.rs | 57 ++++++++++--------- .../src/integration_test_utils.rs | 30 +++------- crates/tests-integration/src/mock_batcher.rs | 14 ++--- 5 files changed, 45 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d80e0849..933b1395 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5340,8 +5340,8 @@ dependencies = [ "starknet_api", "starknet_client", "starknet_gateway", - "starknet_mempool", "starknet_mempool_infra", + "starknet_mempool_node", "starknet_mempool_types", "starknet_task_executor", "strum 0.24.1", diff --git a/crates/tests-integration/Cargo.toml b/crates/tests-integration/Cargo.toml index 9332a6c8..9db25987 100644 --- a/crates/tests-integration/Cargo.toml +++ b/crates/tests-integration/Cargo.toml @@ -21,8 +21,7 @@ serde_json.workspace = true starknet_api.workspace = true starknet_client.workspace = true starknet_gateway = { path = "../gateway", version = "0.0", features = ["testing"] } -starknet_mempool = { path = "../mempool", version = "0.0" } -starknet_mempool_infra = { path = "../mempool_infra", version = "0.0" } +starknet_mempool_node = { path = "../mempool_node", version = "0.0" } starknet_mempool_types = { path = "../mempool_types", version = "0.0" } starknet_task_executor = { path = "../task_executor", version = "0.0" } starknet-types-core.workspace = true diff --git a/crates/tests-integration/src/integration_test_setup.rs b/crates/tests-integration/src/integration_test_setup.rs index cce1a67e..4d4e0919 100644 --- a/crates/tests-integration/src/integration_test_setup.rs +++ b/crates/tests-integration/src/integration_test_setup.rs @@ -1,30 +1,26 @@ use std::net::SocketAddr; -use std::sync::Arc; use starknet_api::rpc_transaction::RPCTransaction; use starknet_api::transaction::TransactionHash; use starknet_gateway::config::GatewayNetworkConfig; use starknet_gateway::errors::GatewayError; -use starknet_mempool::communication::create_mempool_server; -use starknet_mempool::mempool::Mempool; -use starknet_mempool_infra::component_server::ComponentServerStarter; -use starknet_mempool_types::communication::{MempoolClientImpl, MempoolRequestAndResponseSender}; +use starknet_mempool_node::communication::{create_node_channels, create_node_clients}; +use starknet_mempool_node::components::create_components; +use starknet_mempool_node::servers::{create_servers, get_server_future}; use starknet_mempool_types::mempool_types::ThinTransaction; use starknet_task_executor::executor::TaskExecutor; use starknet_task_executor::tokio_executor::TokioExecutor; use tokio::runtime::Handle; -use tokio::sync::mpsc::channel; use tokio::task::JoinHandle; -use crate::integration_test_utils::{create_gateway, GatewayClient}; +use crate::integration_test_utils::{create_config, GatewayClient}; use crate::mock_batcher::MockBatcher; +use crate::state_reader::spawn_test_rpc_state_reader; pub struct IntegrationTestSetup { pub task_executor: TokioExecutor, pub gateway_client: GatewayClient, - pub batcher: MockBatcher, - pub gateway_handle: JoinHandle<()>, pub mempool_handle: JoinHandle<()>, } @@ -34,20 +30,29 @@ impl IntegrationTestSetup { let handle = Handle::current(); let task_executor = TokioExecutor::new(handle); - // TODO(Tsabary): wrap creation of channels in dedicated functions, take channel capacity - // from config. - const MEMPOOL_INVOCATIONS_QUEUE_SIZE: usize = 32; - let (tx_mempool, rx_mempool) = - channel::(MEMPOOL_INVOCATIONS_QUEUE_SIZE); - // Build and run gateway; initialize a gateway client. - let gateway_mempool_client = MempoolClientImpl::new(tx_mempool.clone()); - let mut gateway = - create_gateway(Arc::new(gateway_mempool_client), n_initialized_account_contracts).await; - let GatewayNetworkConfig { ip, port } = gateway.config.network_config; + // Spawn a papyrus rpc server for a papyrus storage reader. + let rpc_server_addr = spawn_test_rpc_state_reader(n_initialized_account_contracts).await; + + // Derive the configuration for the mempool node. + let config = create_config(rpc_server_addr).await; + + // Create the communication network for the mempool node. + let channels = create_node_channels(); + + // Create the clients for the mempool node. + let clients = create_node_clients(&config, &channels); + + // Create the components for the mempool node. + let components = create_components(&config, &clients); + + // Create the servers for the mempool node. + let servers = create_servers(&config, channels, components); + + let GatewayNetworkConfig { ip, port } = config.gateway_config.network_config; let gateway_client = GatewayClient::new(SocketAddr::from((ip, port))); - let gateway_handle = task_executor.spawn_with_handle(async move { - gateway.run().await.unwrap(); - }); + + let gateway_future = get_server_future("Gateway", true, servers.gateway); + let gateway_handle = task_executor.spawn_with_handle(gateway_future); // Wait for server to spin up. // TODO(Gilad): Replace with a persistant Client with a built-in retry mechanism, @@ -55,13 +60,11 @@ impl IntegrationTestSetup { tokio::time::sleep(std::time::Duration::from_millis(100)).await; // Build Batcher. - let batcher = MockBatcher::new(tx_mempool.clone()); + let batcher = MockBatcher::new(clients.get_mempool_client().unwrap()); // Build and run mempool. - let mut mempool_server = create_mempool_server(Mempool::empty(), rx_mempool); - let mempool_handle = task_executor.spawn_with_handle(async move { - mempool_server.start().await; - }); + let mempool_future = get_server_future("Mempool", true, servers.mempool); + let mempool_handle = task_executor.spawn_with_handle(mempool_future); Self { task_executor, gateway_client, batcher, gateway_handle, mempool_handle } } diff --git a/crates/tests-integration/src/integration_test_utils.rs b/crates/tests-integration/src/integration_test_utils.rs index b1a3995e..2d6147fe 100644 --- a/crates/tests-integration/src/integration_test_utils.rs +++ b/crates/tests-integration/src/integration_test_utils.rs @@ -1,5 +1,4 @@ use std::net::SocketAddr; -use std::sync::Arc; use axum::body::Body; use reqwest::{Client, Response}; @@ -10,18 +9,11 @@ use starknet_gateway::config::{ StatelessTransactionValidatorConfig, }; use starknet_gateway::errors::GatewayError; -use starknet_gateway::gateway::Gateway; -use starknet_gateway::rpc_state_reader::RpcStateReaderFactory; -use starknet_mempool_types::communication::SharedMempoolClient; +use starknet_mempool_node::config::MempoolNodeConfig; use test_utils::starknet_api_test_utils::external_tx_to_json; use tokio::net::TcpListener; -use crate::state_reader::spawn_test_rpc_state_reader; - -pub async fn create_gateway( - mempool_client: SharedMempoolClient, - n_initialized_account_contracts: u16, -) -> Gateway { +async fn create_gateway_config() -> GatewayConfig { let stateless_tx_validator_config = StatelessTransactionValidatorConfig { validate_non_zero_l1_gas_fee: true, max_calldata_length: 10, @@ -33,17 +25,13 @@ pub async fn create_gateway( let network_config = GatewayNetworkConfig { ip: socket.ip(), port: socket.port() }; let stateful_tx_validator_config = StatefulTransactionValidatorConfig::create_for_testing(); - let gateway_config = GatewayConfig { - network_config, - stateless_tx_validator_config, - stateful_tx_validator_config, - }; - - let rpc_server_addr = spawn_test_rpc_state_reader(n_initialized_account_contracts).await; - let rpc_state_reader_config = spawn_test_rpc_state_reader_config(rpc_server_addr); - let state_reader_factory = Arc::new(RpcStateReaderFactory { config: rpc_state_reader_config }); + GatewayConfig { network_config, stateless_tx_validator_config, stateful_tx_validator_config } +} - Gateway::new(gateway_config, state_reader_factory, mempool_client) +pub async fn create_config(rpc_server_addr: SocketAddr) -> MempoolNodeConfig { + let gateway_config = create_gateway_config().await; + let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); + MempoolNodeConfig { gateway_config, rpc_state_reader_config, ..MempoolNodeConfig::default() } } /// A test utility client for interacting with a gateway server. @@ -84,7 +72,7 @@ impl GatewayClient { } } -fn spawn_test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig { +fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig { const RPC_SPEC_VERION: &str = "V0_7"; const JSON_RPC_VERSION: &str = "2.0"; RpcStateReaderConfig { diff --git a/crates/tests-integration/src/mock_batcher.rs b/crates/tests-integration/src/mock_batcher.rs index 0e261800..b86c12db 100644 --- a/crates/tests-integration/src/mock_batcher.rs +++ b/crates/tests-integration/src/mock_batcher.rs @@ -1,20 +1,14 @@ -use starknet_mempool_infra::component_definitions::ComponentRequestAndResponseSender; -use starknet_mempool_types::communication::{ - MempoolClient, MempoolClientImpl, MempoolRequest, MempoolResponse, -}; +use starknet_mempool_types::communication::SharedMempoolClient; use starknet_mempool_types::mempool_types::ThinTransaction; -use tokio::sync::mpsc::Sender; #[derive(Clone)] pub struct MockBatcher { - mempool_client: MempoolClientImpl, + mempool_client: SharedMempoolClient, } impl MockBatcher { - pub fn new( - mempool_sender: Sender>, - ) -> Self { - Self { mempool_client: MempoolClientImpl::new(mempool_sender) } + pub fn new(mempool_client: SharedMempoolClient) -> Self { + Self { mempool_client } } pub async fn get_txs(&self, n_txs: usize) -> Vec {