Skip to content

Commit

Permalink
feat: adding component start(initialization) to the server start
Browse files Browse the repository at this point in the history
commit-id:77f186d4
  • Loading branch information
lev-starkware committed Jul 14, 2024
1 parent 724ac68 commit 73bf292
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 23 deletions.
4 changes: 2 additions & 2 deletions crates/gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::routing::{get, post};
use axum::{Json, Router};
use starknet_api::rpc_transaction::RPCTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_mempool_infra::component_runner::{ComponentRunner, ComponentStartError};
use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter};
use starknet_mempool_types::communication::SharedMempoolClient;
use starknet_mempool_types::mempool_types::{Account, MempoolInput};
use tracing::{info, instrument};
Expand Down Expand Up @@ -146,7 +146,7 @@ pub fn create_gateway(
}

#[async_trait]
impl ComponentRunner for Gateway {
impl ComponentStarter for Gateway {
async fn start(&mut self) -> Result<(), ComponentStartError> {
info!("Gateway::start()");
self.run().await.map_err(|_| ComponentStartError::InternalComponentError)
Expand Down
4 changes: 4 additions & 0 deletions crates/mempool/src/communication.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use starknet_mempool_infra::component_definitions::ComponentRequestHandler;
use starknet_mempool_infra::component_runner::ComponentStarter;
use starknet_mempool_infra::component_server::ComponentServer;
use starknet_mempool_types::communication::{
MempoolRequest, MempoolRequestAndResponseSender, MempoolResponse,
Expand Down Expand Up @@ -52,3 +53,6 @@ impl ComponentRequestHandler<MempoolRequest, MempoolResponse> for MempoolCommuni
}
}
}

#[async_trait]
impl ComponentStarter for MempoolCommunicationWrapper {}
10 changes: 6 additions & 4 deletions crates/mempool_infra/src/component_runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;

#[derive(thiserror::Error, Debug, PartialEq)]
#[derive(thiserror::Error, Debug, PartialEq, Clone)]
pub enum ComponentStartError {
#[error("Error in the component configuration.")]
ComponentConfigError,
Expand All @@ -10,7 +10,9 @@ pub enum ComponentStartError {

/// Interface to start components.
#[async_trait]
pub trait ComponentRunner {
/// Start the component. Normally this function should never return.
async fn start(&mut self) -> Result<(), ComponentStartError>;
pub trait ComponentStarter {
/// Start the component. By default do nothing.
async fn start(&mut self) -> Result<(), ComponentStartError> {
Ok(())
}
}
46 changes: 29 additions & 17 deletions crates/mempool_infra/src/component_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use crate::component_definitions::{
ComponentRequestAndResponseSender, ComponentRequestHandler, ServerError,
APPLICATION_OCTET_STREAM,
};
use crate::component_runner::ComponentRunner;
use crate::component_runner::ComponentStarter;

pub struct ComponentServer<Component, Request, Response>
where
Component: ComponentRequestHandler<Request, Response>,
Component: ComponentRequestHandler<Request, Response> + ComponentStarter,
Request: Send + Sync,
Response: Send + Sync,
{
Expand All @@ -31,7 +31,7 @@ where

impl<Component, Request, Response> ComponentServer<Component, Request, Response>
where
Component: ComponentRequestHandler<Request, Response>,
Component: ComponentRequestHandler<Request, Response> + ComponentStarter,
Request: Send + Sync,
Response: Send + Sync,
{
Expand All @@ -52,22 +52,37 @@ pub trait ComponentServerStarter: Send + Sync {
impl<Component, Request, Response> ComponentServerStarter
for ComponentServer<Component, Request, Response>
where
Component: ComponentRequestHandler<Request, Response> + Send + Sync,
Component: ComponentRequestHandler<Request, Response> + ComponentStarter + Send + Sync,
Request: Send + Sync,
Response: Send + Sync,
{
async fn start(&mut self) {
while let Some(request_and_res_tx) = self.rx.recv().await {
let request = request_and_res_tx.request;
let tx = request_and_res_tx.tx;
if start_component(&mut self.component).await {
while let Some(request_and_res_tx) = self.rx.recv().await {
let request = request_and_res_tx.request;
let tx = request_and_res_tx.tx;

let res = self.component.handle_request(request).await;
let res = self.component.handle_request(request).await;

tx.send(res).await.expect("Response connection should be open.");
tx.send(res).await.expect("Response connection should be open.");
}
}
}
}

pub async fn start_component<Component>(component: &mut Component) -> bool
where
Component: ComponentStarter + Sync + Send,
{
if let Err(err) = component.start().await {
error!("ComponentServer::start() failed: {:?}", err);
return false;
}

info!("ComponentServer::start() completed.");
true
}

pub struct ComponentServerHttp<Component, Request, Response>
where
Component: ComponentRequestHandler<Request, Response> + Send + 'static,
Expand Down Expand Up @@ -139,26 +154,23 @@ where
}
}

pub struct EmptyServer<T: ComponentRunner + Send + Sync> {
pub struct EmptyServer<T: ComponentStarter + Send + Sync> {
component: T,
}

impl<T: ComponentRunner + Send + Sync> EmptyServer<T> {
impl<T: ComponentStarter + Send + Sync> EmptyServer<T> {
pub fn new(component: T) -> Self {
Self { component }
}
}

#[async_trait]
impl<T: ComponentRunner + Send + Sync> ComponentServerStarter for EmptyServer<T> {
impl<T: ComponentStarter + Send + Sync> ComponentServerStarter for EmptyServer<T> {
async fn start(&mut self) {
match self.component.start().await {
Ok(_) => info!("ComponentServer::start() completed."),
Err(err) => error!("ComponentServer::start() failed: {:?}", err),
}
start_component(&mut self.component).await;
}
}

pub fn create_empty_server<T: ComponentRunner + Send + Sync>(component: T) -> EmptyServer<T> {
pub fn create_empty_server<T: ComponentStarter + Send + Sync>(component: T) -> EmptyServer<T> {
EmptyServer::new(component)
}
7 changes: 7 additions & 0 deletions crates/mempool_infra/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use starknet_mempool_infra::component_client::ClientResult;
use starknet_mempool_infra::component_runner::ComponentStarter;

pub(crate) type ValueA = u32;
pub(crate) type ValueB = u8;
Expand Down Expand Up @@ -55,6 +56,9 @@ impl ComponentA {
}
}

#[async_trait]
impl ComponentStarter for ComponentA {}

pub(crate) struct ComponentB {
value: ValueB,
_a: Box<dyn ComponentAClientTrait>,
Expand All @@ -69,3 +73,6 @@ impl ComponentB {
self.value
}
}

#[async_trait]
impl ComponentStarter for ComponentB {}

0 comments on commit 73bf292

Please sign in to comment.