Skip to content

Commit

Permalink
feat(mempool_p2p): create mempool p2p servers and start them (#1645)
Browse files Browse the repository at this point in the history
* feat(mempool_p2p): create mempool p2p components - test failures because of channels being dropped

* feat(mempool_p2p): create mempool p2p servers and start them
  • Loading branch information
AlonLStarkWare authored Oct 29, 2024
1 parent 0f33a02 commit 33d23e8
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 9 deletions.
50 changes: 50 additions & 0 deletions config/mempool/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,56 @@
"privacy": "Public",
"value": "0.0.0.0:8080"
},
"components.mempool_p2p.execution_mode.LocalExecution.enable_remote_connection": {
"description": "Specifies whether the component, when running locally, allows remote connections.",
"privacy": "Public",
"value": false
},
"components.mempool_p2p.local_config.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": false
},
"components.mempool_p2p.local_config.channel_buffer_size": {
"description": "The communication channel buffer size.",
"privacy": "Public",
"value": 32
},
"components.mempool_p2p.remote_client_config.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": true
},
"components.mempool_p2p.remote_client_config.idle_connections": {
"description": "The maximum number of idle connections to keep alive.",
"privacy": "Public",
"value": 18446744073709551615
},
"components.mempool_p2p.remote_client_config.idle_timeout": {
"description": "The duration in seconds to keep an idle connection open before closing.",
"privacy": "Public",
"value": 90
},
"components.mempool_p2p.remote_client_config.retries": {
"description": "The max number of retries for sending a message.",
"privacy": "Public",
"value": 3
},
"components.mempool_p2p.remote_client_config.socket": {
"description": "The remote component server socket.",
"privacy": "Public",
"value": "0.0.0.0:8080"
},
"components.mempool_p2p.remote_server_config.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": true
},
"components.mempool_p2p.remote_server_config.socket": {
"description": "The remote component server socket.",
"privacy": "Public",
"value": "0.0.0.0:8080"
},
"components.monitoring_endpoint.execution_mode.LocalExecution.enable_remote_connection": {
"description": "Specifies whether the component, when running locally, allows remote connections.",
"privacy": "Public",
Expand Down
30 changes: 24 additions & 6 deletions crates/mempool_node/src/components.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::Arc;

use starknet_batcher::batcher::{create_batcher, Batcher};
use starknet_consensus_manager::consensus_manager::ConsensusManager;
use starknet_gateway::gateway::{create_gateway, Gateway};
use starknet_http_server::http_server::{create_http_server, HttpServer};
use starknet_mempool::communication::{create_mempool, MempoolCommunicationWrapper};
use starknet_mempool_p2p::propagator::EmptyMempoolP2pPropagatorClient;
use starknet_mempool_p2p::create_p2p_propagator_and_runner;
use starknet_mempool_p2p::propagator::MempoolP2pPropagator;
use starknet_mempool_p2p::runner::MempoolP2pRunner;
use starknet_monitoring_endpoint::monitoring_endpoint::{
create_monitoring_endpoint,
MonitoringEndpoint,
Expand All @@ -22,6 +22,8 @@ pub struct SequencerNodeComponents {
pub http_server: Option<HttpServer>,
pub mempool: Option<MempoolCommunicationWrapper>,
pub monitoring_endpoint: Option<MonitoringEndpoint>,
pub mempool_p2p_propagator: Option<MempoolP2pPropagator>,
pub mempool_p2p_runner: Option<MempoolP2pRunner>,
}

pub fn create_node_components(
Expand Down Expand Up @@ -68,11 +70,25 @@ pub fn create_node_components(
ComponentExecutionMode::Disabled => None,
};

let (mempool_p2p_propagator, mempool_p2p_runner) =
match config.components.mempool_p2p.execution_mode {
ComponentExecutionMode::LocalExecution { enable_remote_connection: _ } => {
let gateway_client =
clients.get_gateway_client().expect("Gateway Client should be available");
let (mempool_p2p_propagator, mempool_p2p_runner) = create_p2p_propagator_and_runner(
config.mempool_p2p_config.clone(),
gateway_client,
);
(Some(mempool_p2p_propagator), Some(mempool_p2p_runner))
}
ComponentExecutionMode::Disabled => (None, None),
};

let mempool = match config.components.mempool.execution_mode {
ComponentExecutionMode::LocalExecution { enable_remote_connection: _ } => {
// TODO(Lukach): obtain the mempool_p2p_propagator_client from 'clients', pass it as an
// argument to create_mempool.
let mempool_p2p_propagator_client = Arc::new(EmptyMempoolP2pPropagatorClient);
let mempool_p2p_propagator_client = clients
.get_mempool_p2p_propagator_client()
.expect("Propagator Client should be available");
let mempool = create_mempool(mempool_p2p_propagator_client);
Some(mempool)
}
Expand All @@ -94,5 +110,7 @@ pub fn create_node_components(
http_server,
mempool,
monitoring_endpoint,
mempool_p2p_propagator,
mempool_p2p_runner,
}
}
4 changes: 4 additions & 0 deletions crates/mempool_node/src/config/component_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct ComponentConfig {
#[validate]
pub mempool: ComponentExecutionConfig,
#[validate]
pub mempool_p2p: ComponentExecutionConfig,
#[validate]
pub monitoring_endpoint: ComponentExecutionConfig,
}

Expand All @@ -32,6 +34,7 @@ impl Default for ComponentConfig {
gateway: ComponentExecutionConfig::gateway_default_config(),
http_server: ComponentExecutionConfig::http_server_default_config(),
mempool: ComponentExecutionConfig::mempool_default_config(),
mempool_p2p: ComponentExecutionConfig::mempool_p2p_default_config(),
monitoring_endpoint: ComponentExecutionConfig::monitoring_endpoint_default_config(),
}
}
Expand All @@ -46,6 +49,7 @@ impl SerializeConfig for ComponentConfig {
append_sub_config_name(self.gateway.dump(), "gateway"),
append_sub_config_name(self.http_server.dump(), "http_server"),
append_sub_config_name(self.mempool.dump(), "mempool"),
append_sub_config_name(self.mempool_p2p.dump(), "mempool_p2p"),
append_sub_config_name(self.monitoring_endpoint.dump(), "monitoring_endpoint"),
];

Expand Down
11 changes: 11 additions & 0 deletions crates/mempool_node/src/config/component_execution_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ impl ComponentExecutionConfig {
remote_server_config: None,
}
}

pub fn mempool_p2p_default_config() -> Self {
Self {
execution_mode: ComponentExecutionMode::LocalExecution {
enable_remote_connection: false,
},
local_config: Some(LocalComponentCommunicationConfig::default()),
remote_client_config: None,
remote_server_config: None,
}
}
}

pub fn validate_single_component_config(
Expand Down
54 changes: 52 additions & 2 deletions crates/mempool_node/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ use starknet_consensus_manager::communication::{
use starknet_gateway::communication::{create_gateway_server, LocalGatewayServer};
use starknet_http_server::communication::{create_http_server, HttpServer};
use starknet_mempool::communication::{create_mempool_server, LocalMempoolServer};
use starknet_mempool_p2p::propagator::{
create_mempool_p2p_propagator_server,
LocalMempoolP2pPropagatorServer,
};
use starknet_mempool_p2p::runner::MempoolP2pRunnerServer;
use starknet_monitoring_endpoint::communication::{
create_monitoring_endpoint_server,
MonitoringEndpointServer,
Expand All @@ -27,13 +32,15 @@ struct LocalServers {
pub(crate) batcher: Option<Box<LocalBatcherServer>>,
pub(crate) gateway: Option<Box<LocalGatewayServer>>,
pub(crate) mempool: Option<Box<LocalMempoolServer>>,
pub(crate) mempool_p2p_propagator: Option<Box<LocalMempoolP2pPropagatorServer>>,
}

// Component servers that wrap a component without a server.
struct WrapperServers {
pub(crate) consensus_manager: Option<Box<ConsensusManagerServer>>,
pub(crate) http_server: Option<Box<HttpServer>>,
pub(crate) monitoring_endpoint: Option<Box<MonitoringEndpointServer>>,
pub(crate) mempool_p2p_runner: Option<Box<MempoolP2pRunnerServer>>,
}

pub struct SequencerNodeServers {
Expand Down Expand Up @@ -96,13 +103,39 @@ pub fn create_node_servers(
ComponentExecutionMode::Disabled => None,
};

let local_servers =
LocalServers { batcher: batcher_server, gateway: gateway_server, mempool: mempool_server };
let mempool_p2p_propagator_server = match config.components.mempool_p2p.execution_mode {
ComponentExecutionMode::LocalExecution { enable_remote_connection: _ } => {
Some(Box::new(create_mempool_p2p_propagator_server(
components
.mempool_p2p_propagator
.expect("Mempool P2P Propagator is not initialized."),
communication.take_mempool_p2p_propagator_rx(),
)))
}
ComponentExecutionMode::Disabled => None,
};

let mempool_p2p_runner_server = match config.components.mempool_p2p.execution_mode {
ComponentExecutionMode::LocalExecution { enable_remote_connection: _ } => {
Some(Box::new(MempoolP2pRunnerServer::new(
components.mempool_p2p_runner.expect("Mempool P2P Runner is not initialized."),
)))
}
ComponentExecutionMode::Disabled => None,
};

let local_servers = LocalServers {
batcher: batcher_server,
gateway: gateway_server,
mempool: mempool_server,
mempool_p2p_propagator: mempool_p2p_propagator_server,
};

let wrapper_servers = WrapperServers {
consensus_manager: consensus_manager_server,
http_server,
monitoring_endpoint: monitoring_endpoint_server,
mempool_p2p_runner: mempool_p2p_runner_server,
};

SequencerNodeServers { local_servers, wrapper_servers }
Expand All @@ -127,13 +160,22 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res
// Sequencer Monitoring server.
let monitoring_endpoint_future = get_server_future(servers.wrapper_servers.monitoring_endpoint);

// MempoolP2pPropagator server.
let mempool_p2p_propagator_future =
get_server_future(servers.local_servers.mempool_p2p_propagator);

// MempoolP2pRunner server.
let mempool_p2p_runner_future = get_server_future(servers.wrapper_servers.mempool_p2p_runner);

// Start servers.
let batcher_handle = tokio::spawn(batcher_future);
let consensus_manager_handle = tokio::spawn(consensus_manager_future);
let gateway_handle = tokio::spawn(gateway_future);
let http_server_handle = tokio::spawn(http_server_future);
let mempool_handle = tokio::spawn(mempool_future);
let monitoring_endpoint_handle = tokio::spawn(monitoring_endpoint_future);
let mempool_p2p_propagator_handle = tokio::spawn(mempool_p2p_propagator_future);
let mempool_p2p_runner_handle = tokio::spawn(mempool_p2p_runner_future);

let result = tokio::select! {
res = batcher_handle => {
Expand All @@ -160,6 +202,14 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res
error!("Monitoring Endpoint Server stopped.");
res?
}
res = mempool_p2p_propagator_handle => {
error!("Mempool P2P Propagator Server stopped.");
res?
}
res = mempool_p2p_runner_handle => {
error!("Mempool P2P Runner Server stopped.");
res?
}
};
error!("Servers ended with unexpected Ok.");

Expand Down
20 changes: 19 additions & 1 deletion crates/mempool_p2p/src/propagator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ use starknet_mempool_p2p_types::communication::{
MempoolP2pPropagatorClient,
MempoolP2pPropagatorClientResult,
MempoolP2pPropagatorRequest,
MempoolP2pPropagatorRequestAndResponseSender,
MempoolP2pPropagatorResponse,
};
use starknet_mempool_p2p_types::errors::MempoolP2pPropagatorError;
use starknet_sequencer_infra::component_definitions::ComponentRequestHandler;
use starknet_sequencer_infra::component_definitions::{ComponentRequestHandler, ComponentStarter};
use starknet_sequencer_infra::component_server::LocalComponentServer;
use tokio::sync::mpsc::Receiver;

pub struct MempoolP2pPropagator {
broadcast_topic_client: BroadcastTopicClient<RpcTransactionWrapper>,
Expand Down Expand Up @@ -72,3 +75,18 @@ impl MempoolP2pPropagatorClient for EmptyMempoolP2pPropagatorClient {
Ok(())
}
}

pub type LocalMempoolP2pPropagatorServer = LocalComponentServer<
MempoolP2pPropagator,
MempoolP2pPropagatorRequest,
MempoolP2pPropagatorResponse,
>;

impl ComponentStarter for MempoolP2pPropagator {}

pub fn create_mempool_p2p_propagator_server(
mempool_p2p_propagator: MempoolP2pPropagator,
rx_mempool_p2p_propagator: Receiver<MempoolP2pPropagatorRequestAndResponseSender>,
) -> LocalMempoolP2pPropagatorServer {
LocalComponentServer::new(mempool_p2p_propagator, rx_mempool_p2p_propagator)
}
9 changes: 9 additions & 0 deletions crates/mempool_p2p/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use papyrus_protobuf::mempool::RpcTransactionWrapper;
use starknet_gateway_types::communication::SharedGatewayClient;
use starknet_gateway_types::gateway_types::GatewayInput;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::component_server::WrapperServer;
use starknet_sequencer_infra::errors::ComponentError;
use tracing::warn;

Expand Down Expand Up @@ -83,3 +84,11 @@ impl ComponentStarter for MempoolP2pRunner {
}
}
}

pub type MempoolP2pRunnerServer = WrapperServer<MempoolP2pRunner>;

pub fn create_mempool_p2p_runner_server(
mempool_p2p_runner: MempoolP2pRunner,
) -> MempoolP2pRunnerServer {
WrapperServer::new(mempool_p2p_runner)
}

0 comments on commit 33d23e8

Please sign in to comment.