From a35a991836c9e9155c984fe3b1f77a22b43acf4a Mon Sep 17 00:00:00 2001 From: Lev Roitman Date: Thu, 11 Jul 2024 12:43:23 +0300 Subject: [PATCH] feat: adding component start(initialization) to the server start commit-id:77f186d4 --- crates/gateway/src/gateway.rs | 4 +- crates/mempool/src/communication.rs | 4 ++ crates/mempool_infra/src/component_runner.rs | 10 +++-- crates/mempool_infra/src/component_server.rs | 46 ++++++++++++-------- crates/mempool_infra/tests/common/mod.rs | 7 +++ 5 files changed, 48 insertions(+), 23 deletions(-) diff --git a/crates/gateway/src/gateway.rs b/crates/gateway/src/gateway.rs index 25eefe936..3724c3283 100644 --- a/crates/gateway/src/gateway.rs +++ b/crates/gateway/src/gateway.rs @@ -8,7 +8,7 @@ use axum::routing::{get, post}; use axum::{Json, Router}; use starknet_api::rpc_transaction::RPCTransaction; use starknet_api::transaction::TransactionHash; -use starknet_mempool_infra::component_runner::{ComponentRunner, ComponentStartError}; +use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; use starknet_mempool_types::communication::SharedMempoolClient; use starknet_mempool_types::mempool_types::{Account, MempoolInput}; use tracing::{info, instrument}; @@ -146,7 +146,7 @@ pub fn create_gateway( } #[async_trait] -impl ComponentRunner for Gateway { +impl ComponentStarter for Gateway { async fn start(&mut self) -> Result<(), ComponentStartError> { info!("Gateway::start()"); self.run().await.map_err(|_| ComponentStartError::InternalComponentError) diff --git a/crates/mempool/src/communication.rs b/crates/mempool/src/communication.rs index 166f99a74..ebd1c4338 100644 --- a/crates/mempool/src/communication.rs +++ b/crates/mempool/src/communication.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use starknet_mempool_infra::component_definitions::ComponentRequestHandler; +use starknet_mempool_infra::component_runner::ComponentStarter; use starknet_mempool_infra::component_server::ComponentServer; use starknet_mempool_types::communication::{ MempoolRequest, MempoolRequestAndResponseSender, MempoolResponse, @@ -52,3 +53,6 @@ impl ComponentRequestHandler for MempoolCommuni } } } + +#[async_trait] +impl ComponentStarter for MempoolCommunicationWrapper {} diff --git a/crates/mempool_infra/src/component_runner.rs b/crates/mempool_infra/src/component_runner.rs index 7785bcb23..de8ab6b51 100644 --- a/crates/mempool_infra/src/component_runner.rs +++ b/crates/mempool_infra/src/component_runner.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -#[derive(thiserror::Error, Debug, PartialEq)] +#[derive(thiserror::Error, Debug, PartialEq, Clone)] pub enum ComponentStartError { #[error("Error in the component configuration.")] ComponentConfigError, @@ -10,7 +10,9 @@ pub enum ComponentStartError { /// Interface to start components. #[async_trait] -pub trait ComponentRunner { - /// Start the component. Normally this function should never return. - async fn start(&mut self) -> Result<(), ComponentStartError>; +pub trait ComponentStarter { + /// Start the component. By default do nothing. + async fn start(&mut self) -> Result<(), ComponentStartError> { + Ok(()) + } } diff --git a/crates/mempool_infra/src/component_server.rs b/crates/mempool_infra/src/component_server.rs index 17cb17bd0..79cfcbbbb 100644 --- a/crates/mempool_infra/src/component_server.rs +++ b/crates/mempool_infra/src/component_server.rs @@ -17,11 +17,11 @@ use crate::component_definitions::{ ComponentRequestAndResponseSender, ComponentRequestHandler, ServerError, APPLICATION_OCTET_STREAM, }; -use crate::component_runner::ComponentRunner; +use crate::component_runner::ComponentStarter; pub struct ComponentServer where - Component: ComponentRequestHandler, + Component: ComponentRequestHandler + ComponentStarter, Request: Send + Sync, Response: Send + Sync, { @@ -31,7 +31,7 @@ where impl ComponentServer where - Component: ComponentRequestHandler, + Component: ComponentRequestHandler + ComponentStarter, Request: Send + Sync, Response: Send + Sync, { @@ -52,22 +52,37 @@ pub trait ComponentServerStarter: Send + Sync { impl ComponentServerStarter for ComponentServer where - Component: ComponentRequestHandler + Send + Sync, + Component: ComponentRequestHandler + ComponentStarter + Send + Sync, Request: Send + Sync, Response: Send + Sync, { async fn start(&mut self) { - while let Some(request_and_res_tx) = self.rx.recv().await { - let request = request_and_res_tx.request; - let tx = request_and_res_tx.tx; + if start_component(&mut self.component).await { + while let Some(request_and_res_tx) = self.rx.recv().await { + let request = request_and_res_tx.request; + let tx = request_and_res_tx.tx; - let res = self.component.handle_request(request).await; + let res = self.component.handle_request(request).await; - tx.send(res).await.expect("Response connection should be open."); + tx.send(res).await.expect("Response connection should be open."); + } } } } +pub async fn start_component(component: &mut Component) -> bool +where + Component: ComponentStarter + Sync + Send, +{ + if let Err(err) = component.start().await { + error!("ComponentServer::start() failed: {:?}", err); + return false; + } + + info!("ComponentServer::start() completed."); + true +} + pub struct ComponentServerHttp where Component: ComponentRequestHandler + Send + 'static, @@ -139,26 +154,23 @@ where } } -pub struct EmptyServer { +pub struct EmptyServer { component: T, } -impl EmptyServer { +impl EmptyServer { pub fn new(component: T) -> Self { Self { component } } } #[async_trait] -impl ComponentServerStarter for EmptyServer { +impl ComponentServerStarter for EmptyServer { async fn start(&mut self) { - match self.component.start().await { - Ok(_) => info!("ComponentServer::start() completed."), - Err(err) => error!("ComponentServer::start() failed: {:?}", err), - } + start_component(&mut self.component).await; } } -pub fn create_empty_server(component: T) -> EmptyServer { +pub fn create_empty_server(component: T) -> EmptyServer { EmptyServer::new(component) } diff --git a/crates/mempool_infra/tests/common/mod.rs b/crates/mempool_infra/tests/common/mod.rs index 71781ca53..141425a5a 100644 --- a/crates/mempool_infra/tests/common/mod.rs +++ b/crates/mempool_infra/tests/common/mod.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use starknet_mempool_infra::component_client::ClientResult; +use starknet_mempool_infra::component_runner::ComponentStarter; pub(crate) type ValueA = u32; pub(crate) type ValueB = u8; @@ -55,6 +56,9 @@ impl ComponentA { } } +#[async_trait] +impl ComponentStarter for ComponentA {} + pub(crate) struct ComponentB { value: ValueB, _a: Box, @@ -69,3 +73,6 @@ impl ComponentB { self.value } } + +#[async_trait] +impl ComponentStarter for ComponentB {}