From e89ace62d5e043b47abcae600f867848b60c7a7a Mon Sep 17 00:00:00 2001 From: Lev Roitman Date: Wed, 3 Jul 2024 16:38:18 +0300 Subject: [PATCH] feat: feat: adding main module commit-id:8df0f3c6 --- Cargo.lock | 3 ++ crates/mempool_node/Cargo.toml | 3 ++ crates/mempool_node/src/main.rs | 34 +++++++++++++++++-- crates/mempool_node/src/servers.rs | 53 +++++++++++++++++++++++++++++- 4 files changed, 89 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1ff8d3c..d80e0849 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5354,17 +5354,20 @@ dependencies = [ name = "starknet_mempool_node" version = "0.0.0" dependencies = [ + "anyhow", "assert-json-diff", "assert_matches", "clap", "colored", "const_format", + "futures", "papyrus_config", "pretty_assertions", "serde", "serde_json", "starknet_gateway", "starknet_mempool", + "starknet_mempool_infra", "starknet_mempool_types", "test_utils", "tokio", diff --git a/crates/mempool_node/Cargo.toml b/crates/mempool_node/Cargo.toml index 72b6f96a..60ca054a 100644 --- a/crates/mempool_node/Cargo.toml +++ b/crates/mempool_node/Cargo.toml @@ -9,9 +9,12 @@ license.workspace = true workspace = true [dependencies] +anyhow.workspace = true clap.workspace = true const_format.workspace = true +futures.workspace = true starknet_gateway = { path = "../gateway", version = "0.0" } +starknet_mempool_infra = { path = "../mempool_infra", version = "0.0" } starknet_mempool = { path = "../mempool", version = "0.0" } starknet_mempool_types = { path = "../mempool_types", version = "0.0" } serde.workspace = true diff --git a/crates/mempool_node/src/main.rs b/crates/mempool_node/src/main.rs index 9668ab69..699e95f9 100644 --- a/crates/mempool_node/src/main.rs +++ b/crates/mempool_node/src/main.rs @@ -1,5 +1,33 @@ +use std::env::args; +use std::process::exit; + +use papyrus_config::validators::config_validate; +use papyrus_config::ConfigError; +use starknet_mempool_node::communication::{create_node_channels, create_node_clients}; +use starknet_mempool_node::components::create_components; +use starknet_mempool_node::config::MempoolNodeConfig; +use starknet_mempool_node::servers::{create_servers, run_server_components}; + #[tokio::main] -async fn main() { - let my_string = "Main function placeholder"; - println!("{}", my_string); +async fn main() -> anyhow::Result<()> { + let config = MempoolNodeConfig::load_and_process(args().collect()); + if let Err(ConfigError::CommandInput(clap_err)) = config { + clap_err.exit(); + } + + let config = config?; + if let Err(error) = config_validate(&config) { + println!("Error: {}", error); + exit(1); + } + + let channels = create_node_channels(); + let clients = create_node_clients(&config, &channels); + let components = create_components(&config, &clients); + let servers = create_servers(&config, channels, components); + + println!("Info: Starting components!"); + run_server_components(&config, servers).await?; + + Ok(()) } diff --git a/crates/mempool_node/src/servers.rs b/crates/mempool_node/src/servers.rs index 662d3a0a..74b601c4 100644 --- a/crates/mempool_node/src/servers.rs +++ b/crates/mempool_node/src/servers.rs @@ -1,5 +1,10 @@ +use std::future::pending; +use std::pin::Pin; + +use futures::{Future, FutureExt}; use starknet_gateway::communication::{create_gateway_server, GatewayServer}; use starknet_mempool::communication::{create_mempool_server, MempoolServer}; +use starknet_mempool_infra::component_server::ComponentServerStarter; use crate::communication::MempoolNodeCommunication; use crate::components::Components; @@ -35,4 +40,50 @@ pub fn create_servers( Servers { gateway: gateway_server, mempool: mempool_server } } -// TODO (Lev): Implement the run server components function. +pub async fn run_server_components( + config: &MempoolNodeConfig, + servers: Servers, +) -> anyhow::Result<()> { + // Gateway component. + let gateway_future = + get_server_future("Gateway", config.components.gateway_component.execute, servers.gateway); + + // Mempool component. + let mempool_future = + get_server_future("Mempool", config.components.mempool_component.execute, servers.mempool); + + let gateway_handle = tokio::spawn(gateway_future); + let mempool_handle = tokio::spawn(mempool_future); + + tokio::select! { + res = gateway_handle => { + println!("Error: Gateway Server stopped."); + res? + } + res = mempool_handle => { + println!("Error: Mempool Server stopped."); + res? + } + }; + println!("Error: Servers ended with unexpected Ok."); + + Ok(()) +} + +pub fn get_server_future( + name: &str, + execute_flag: bool, + server: Option>, +) -> Pin + Send>> { + let server_future = match execute_flag { + true => { + let mut server = match server { + Some(server) => server, + _ => panic!("{} component is not initialized.", name), + }; + async move { server.start().await }.boxed() + } + false => pending().boxed(), + }; + server_future +}