From 33d23e835964b29b95cd87095b7e369bc525dd1f Mon Sep 17 00:00:00 2001 From: Alon-Lukatch-Starkware Date: Tue, 29 Oct 2024 14:11:40 +0200 Subject: [PATCH] feat(mempool_p2p): create mempool p2p servers and start them (#1645) * feat(mempool_p2p): create mempool p2p components - test failures because of channels being dropped * feat(mempool_p2p): create mempool p2p servers and start them --- config/mempool/default_config.json | 50 +++++++++++++++++ crates/mempool_node/src/components.rs | 30 ++++++++--- .../src/config/component_config.rs | 4 ++ .../src/config/component_execution_config.rs | 11 ++++ crates/mempool_node/src/servers.rs | 54 ++++++++++++++++++- crates/mempool_p2p/src/propagator/mod.rs | 20 ++++++- crates/mempool_p2p/src/runner/mod.rs | 9 ++++ 7 files changed, 169 insertions(+), 9 deletions(-) diff --git a/config/mempool/default_config.json b/config/mempool/default_config.json index 6b430c6804..787614c0a0 100644 --- a/config/mempool/default_config.json +++ b/config/mempool/default_config.json @@ -454,6 +454,56 @@ "privacy": "Public", "value": "0.0.0.0:8080" }, + "components.mempool_p2p.execution_mode.LocalExecution.enable_remote_connection": { + "description": "Specifies whether the component, when running locally, allows remote connections.", + "privacy": "Public", + "value": false + }, + "components.mempool_p2p.local_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": false + }, + "components.mempool_p2p.local_config.channel_buffer_size": { + "description": "The communication channel buffer size.", + "privacy": "Public", + "value": 32 + }, + "components.mempool_p2p.remote_client_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, + "components.mempool_p2p.remote_client_config.idle_connections": { + "description": "The maximum number of idle connections to keep alive.", + "privacy": "Public", + "value": 18446744073709551615 + }, + "components.mempool_p2p.remote_client_config.idle_timeout": { + "description": "The duration in seconds to keep an idle connection open before closing.", + "privacy": "Public", + "value": 90 + }, + "components.mempool_p2p.remote_client_config.retries": { + "description": "The max number of retries for sending a message.", + "privacy": "Public", + "value": 3 + }, + "components.mempool_p2p.remote_client_config.socket": { + "description": "The remote component server socket.", + "privacy": "Public", + "value": "0.0.0.0:8080" + }, + "components.mempool_p2p.remote_server_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, + "components.mempool_p2p.remote_server_config.socket": { + "description": "The remote component server socket.", + "privacy": "Public", + "value": "0.0.0.0:8080" + }, "components.monitoring_endpoint.execution_mode.LocalExecution.enable_remote_connection": { "description": "Specifies whether the component, when running locally, allows remote connections.", "privacy": "Public", diff --git a/crates/mempool_node/src/components.rs b/crates/mempool_node/src/components.rs index 111fbbce78..b1445be735 100644 --- a/crates/mempool_node/src/components.rs +++ b/crates/mempool_node/src/components.rs @@ -1,11 +1,11 @@ -use std::sync::Arc; - use starknet_batcher::batcher::{create_batcher, Batcher}; use starknet_consensus_manager::consensus_manager::ConsensusManager; use starknet_gateway::gateway::{create_gateway, Gateway}; use starknet_http_server::http_server::{create_http_server, HttpServer}; use starknet_mempool::communication::{create_mempool, MempoolCommunicationWrapper}; -use starknet_mempool_p2p::propagator::EmptyMempoolP2pPropagatorClient; +use starknet_mempool_p2p::create_p2p_propagator_and_runner; +use starknet_mempool_p2p::propagator::MempoolP2pPropagator; +use starknet_mempool_p2p::runner::MempoolP2pRunner; use starknet_monitoring_endpoint::monitoring_endpoint::{ create_monitoring_endpoint, MonitoringEndpoint, @@ -22,6 +22,8 @@ pub struct SequencerNodeComponents { pub http_server: Option, pub mempool: Option, pub monitoring_endpoint: Option, + pub mempool_p2p_propagator: Option, + pub mempool_p2p_runner: Option, } pub fn create_node_components( @@ -68,11 +70,25 @@ pub fn create_node_components( ComponentExecutionMode::Disabled => None, }; + let (mempool_p2p_propagator, mempool_p2p_runner) = + match config.components.mempool_p2p.execution_mode { + ComponentExecutionMode::LocalExecution { enable_remote_connection: _ } => { + let gateway_client = + clients.get_gateway_client().expect("Gateway Client should be available"); + let (mempool_p2p_propagator, mempool_p2p_runner) = create_p2p_propagator_and_runner( + config.mempool_p2p_config.clone(), + gateway_client, + ); + (Some(mempool_p2p_propagator), Some(mempool_p2p_runner)) + } + ComponentExecutionMode::Disabled => (None, None), + }; + let mempool = match config.components.mempool.execution_mode { ComponentExecutionMode::LocalExecution { enable_remote_connection: _ } => { - // TODO(Lukach): obtain the mempool_p2p_propagator_client from 'clients', pass it as an - // argument to create_mempool. - let mempool_p2p_propagator_client = Arc::new(EmptyMempoolP2pPropagatorClient); + let mempool_p2p_propagator_client = clients + .get_mempool_p2p_propagator_client() + .expect("Propagator Client should be available"); let mempool = create_mempool(mempool_p2p_propagator_client); Some(mempool) } @@ -94,5 +110,7 @@ pub fn create_node_components( http_server, mempool, monitoring_endpoint, + mempool_p2p_propagator, + mempool_p2p_runner, } } diff --git a/crates/mempool_node/src/config/component_config.rs b/crates/mempool_node/src/config/component_config.rs index e3da86da03..f957d46095 100644 --- a/crates/mempool_node/src/config/component_config.rs +++ b/crates/mempool_node/src/config/component_config.rs @@ -21,6 +21,8 @@ pub struct ComponentConfig { #[validate] pub mempool: ComponentExecutionConfig, #[validate] + pub mempool_p2p: ComponentExecutionConfig, + #[validate] pub monitoring_endpoint: ComponentExecutionConfig, } @@ -32,6 +34,7 @@ impl Default for ComponentConfig { gateway: ComponentExecutionConfig::gateway_default_config(), http_server: ComponentExecutionConfig::http_server_default_config(), mempool: ComponentExecutionConfig::mempool_default_config(), + mempool_p2p: ComponentExecutionConfig::mempool_p2p_default_config(), monitoring_endpoint: ComponentExecutionConfig::monitoring_endpoint_default_config(), } } @@ -46,6 +49,7 @@ impl SerializeConfig for ComponentConfig { append_sub_config_name(self.gateway.dump(), "gateway"), append_sub_config_name(self.http_server.dump(), "http_server"), append_sub_config_name(self.mempool.dump(), "mempool"), + append_sub_config_name(self.mempool_p2p.dump(), "mempool_p2p"), append_sub_config_name(self.monitoring_endpoint.dump(), "monitoring_endpoint"), ]; diff --git a/crates/mempool_node/src/config/component_execution_config.rs b/crates/mempool_node/src/config/component_execution_config.rs index d1b50ebe64..d1d4a11bda 100644 --- a/crates/mempool_node/src/config/component_execution_config.rs +++ b/crates/mempool_node/src/config/component_execution_config.rs @@ -155,6 +155,17 @@ impl ComponentExecutionConfig { remote_server_config: None, } } + + pub fn mempool_p2p_default_config() -> Self { + Self { + execution_mode: ComponentExecutionMode::LocalExecution { + enable_remote_connection: false, + }, + local_config: Some(LocalComponentCommunicationConfig::default()), + remote_client_config: None, + remote_server_config: None, + } + } } pub fn validate_single_component_config( diff --git a/crates/mempool_node/src/servers.rs b/crates/mempool_node/src/servers.rs index f7166eaf36..3792f2e13a 100644 --- a/crates/mempool_node/src/servers.rs +++ b/crates/mempool_node/src/servers.rs @@ -10,6 +10,11 @@ use starknet_consensus_manager::communication::{ use starknet_gateway::communication::{create_gateway_server, LocalGatewayServer}; use starknet_http_server::communication::{create_http_server, HttpServer}; use starknet_mempool::communication::{create_mempool_server, LocalMempoolServer}; +use starknet_mempool_p2p::propagator::{ + create_mempool_p2p_propagator_server, + LocalMempoolP2pPropagatorServer, +}; +use starknet_mempool_p2p::runner::MempoolP2pRunnerServer; use starknet_monitoring_endpoint::communication::{ create_monitoring_endpoint_server, MonitoringEndpointServer, @@ -27,6 +32,7 @@ struct LocalServers { pub(crate) batcher: Option>, pub(crate) gateway: Option>, pub(crate) mempool: Option>, + pub(crate) mempool_p2p_propagator: Option>, } // Component servers that wrap a component without a server. @@ -34,6 +40,7 @@ struct WrapperServers { pub(crate) consensus_manager: Option>, pub(crate) http_server: Option>, pub(crate) monitoring_endpoint: Option>, + pub(crate) mempool_p2p_runner: Option>, } pub struct SequencerNodeServers { @@ -96,13 +103,39 @@ pub fn create_node_servers( ComponentExecutionMode::Disabled => None, }; - let local_servers = - LocalServers { batcher: batcher_server, gateway: gateway_server, mempool: mempool_server }; + let mempool_p2p_propagator_server = match config.components.mempool_p2p.execution_mode { + ComponentExecutionMode::LocalExecution { enable_remote_connection: _ } => { + Some(Box::new(create_mempool_p2p_propagator_server( + components + .mempool_p2p_propagator + .expect("Mempool P2P Propagator is not initialized."), + communication.take_mempool_p2p_propagator_rx(), + ))) + } + ComponentExecutionMode::Disabled => None, + }; + + let mempool_p2p_runner_server = match config.components.mempool_p2p.execution_mode { + ComponentExecutionMode::LocalExecution { enable_remote_connection: _ } => { + Some(Box::new(MempoolP2pRunnerServer::new( + components.mempool_p2p_runner.expect("Mempool P2P Runner is not initialized."), + ))) + } + ComponentExecutionMode::Disabled => None, + }; + + let local_servers = LocalServers { + batcher: batcher_server, + gateway: gateway_server, + mempool: mempool_server, + mempool_p2p_propagator: mempool_p2p_propagator_server, + }; let wrapper_servers = WrapperServers { consensus_manager: consensus_manager_server, http_server, monitoring_endpoint: monitoring_endpoint_server, + mempool_p2p_runner: mempool_p2p_runner_server, }; SequencerNodeServers { local_servers, wrapper_servers } @@ -127,6 +160,13 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res // Sequencer Monitoring server. let monitoring_endpoint_future = get_server_future(servers.wrapper_servers.monitoring_endpoint); + // MempoolP2pPropagator server. + let mempool_p2p_propagator_future = + get_server_future(servers.local_servers.mempool_p2p_propagator); + + // MempoolP2pRunner server. + let mempool_p2p_runner_future = get_server_future(servers.wrapper_servers.mempool_p2p_runner); + // Start servers. let batcher_handle = tokio::spawn(batcher_future); let consensus_manager_handle = tokio::spawn(consensus_manager_future); @@ -134,6 +174,8 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res let http_server_handle = tokio::spawn(http_server_future); let mempool_handle = tokio::spawn(mempool_future); let monitoring_endpoint_handle = tokio::spawn(monitoring_endpoint_future); + let mempool_p2p_propagator_handle = tokio::spawn(mempool_p2p_propagator_future); + let mempool_p2p_runner_handle = tokio::spawn(mempool_p2p_runner_future); let result = tokio::select! { res = batcher_handle => { @@ -160,6 +202,14 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res error!("Monitoring Endpoint Server stopped."); res? } + res = mempool_p2p_propagator_handle => { + error!("Mempool P2P Propagator Server stopped."); + res? + } + res = mempool_p2p_runner_handle => { + error!("Mempool P2P Runner Server stopped."); + res? + } }; error!("Servers ended with unexpected Ok."); diff --git a/crates/mempool_p2p/src/propagator/mod.rs b/crates/mempool_p2p/src/propagator/mod.rs index c671a590fe..a554aad051 100644 --- a/crates/mempool_p2p/src/propagator/mod.rs +++ b/crates/mempool_p2p/src/propagator/mod.rs @@ -10,10 +10,13 @@ use starknet_mempool_p2p_types::communication::{ MempoolP2pPropagatorClient, MempoolP2pPropagatorClientResult, MempoolP2pPropagatorRequest, + MempoolP2pPropagatorRequestAndResponseSender, MempoolP2pPropagatorResponse, }; use starknet_mempool_p2p_types::errors::MempoolP2pPropagatorError; -use starknet_sequencer_infra::component_definitions::ComponentRequestHandler; +use starknet_sequencer_infra::component_definitions::{ComponentRequestHandler, ComponentStarter}; +use starknet_sequencer_infra::component_server::LocalComponentServer; +use tokio::sync::mpsc::Receiver; pub struct MempoolP2pPropagator { broadcast_topic_client: BroadcastTopicClient, @@ -72,3 +75,18 @@ impl MempoolP2pPropagatorClient for EmptyMempoolP2pPropagatorClient { Ok(()) } } + +pub type LocalMempoolP2pPropagatorServer = LocalComponentServer< + MempoolP2pPropagator, + MempoolP2pPropagatorRequest, + MempoolP2pPropagatorResponse, +>; + +impl ComponentStarter for MempoolP2pPropagator {} + +pub fn create_mempool_p2p_propagator_server( + mempool_p2p_propagator: MempoolP2pPropagator, + rx_mempool_p2p_propagator: Receiver, +) -> LocalMempoolP2pPropagatorServer { + LocalComponentServer::new(mempool_p2p_propagator, rx_mempool_p2p_propagator) +} diff --git a/crates/mempool_p2p/src/runner/mod.rs b/crates/mempool_p2p/src/runner/mod.rs index a69147c70b..66d82f0dbe 100644 --- a/crates/mempool_p2p/src/runner/mod.rs +++ b/crates/mempool_p2p/src/runner/mod.rs @@ -13,6 +13,7 @@ use papyrus_protobuf::mempool::RpcTransactionWrapper; use starknet_gateway_types::communication::SharedGatewayClient; use starknet_gateway_types::gateway_types::GatewayInput; use starknet_sequencer_infra::component_definitions::ComponentStarter; +use starknet_sequencer_infra::component_server::WrapperServer; use starknet_sequencer_infra::errors::ComponentError; use tracing::warn; @@ -83,3 +84,11 @@ impl ComponentStarter for MempoolP2pRunner { } } } + +pub type MempoolP2pRunnerServer = WrapperServer; + +pub fn create_mempool_p2p_runner_server( + mempool_p2p_runner: MempoolP2pRunner, +) -> MempoolP2pRunnerServer { + WrapperServer::new(mempool_p2p_runner) +}