Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding component start(initialization) to the server start #447

Merged
merged 1 commit into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::routing::{get, post};
use axum::{Json, Router};
use starknet_api::rpc_transaction::RPCTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_mempool_infra::component_runner::{ComponentRunner, ComponentStartError};
use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter};
use starknet_mempool_types::communication::SharedMempoolClient;
use starknet_mempool_types::mempool_types::{Account, MempoolInput};
use tracing::{info, instrument};
Expand Down Expand Up @@ -146,7 +146,7 @@ pub fn create_gateway(
}

#[async_trait]
impl ComponentRunner for Gateway {
impl ComponentStarter for Gateway {
async fn start(&mut self) -> Result<(), ComponentStartError> {
info!("Gateway::start()");
self.run().await.map_err(|_| ComponentStartError::InternalComponentError)
Expand Down
4 changes: 4 additions & 0 deletions crates/mempool/src/communication.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use starknet_mempool_infra::component_definitions::ComponentRequestHandler;
use starknet_mempool_infra::component_runner::ComponentStarter;
use starknet_mempool_infra::component_server::ComponentServer;
use starknet_mempool_types::communication::{
MempoolRequest, MempoolRequestAndResponseSender, MempoolResponse,
Expand Down Expand Up @@ -52,3 +53,6 @@ impl ComponentRequestHandler<MempoolRequest, MempoolResponse> for MempoolCommuni
}
}
}

#[async_trait]
impl ComponentStarter for MempoolCommunicationWrapper {}
10 changes: 6 additions & 4 deletions crates/mempool_infra/src/component_runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;

#[derive(thiserror::Error, Debug, PartialEq)]
#[derive(thiserror::Error, Debug, PartialEq, Clone)]
pub enum ComponentStartError {
#[error("Error in the component configuration.")]
ComponentConfigError,
Expand All @@ -10,7 +10,9 @@ pub enum ComponentStartError {

/// Interface to start components.
#[async_trait]
pub trait ComponentRunner {
/// Start the component. Normally this function should never return.
async fn start(&mut self) -> Result<(), ComponentStartError>;
pub trait ComponentStarter {
/// Start the component. By default do nothing.
async fn start(&mut self) -> Result<(), ComponentStartError> {
Ok(())
}
}
46 changes: 29 additions & 17 deletions crates/mempool_infra/src/component_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use crate::component_definitions::{
ComponentRequestAndResponseSender, ComponentRequestHandler, ServerError,
APPLICATION_OCTET_STREAM,
};
use crate::component_runner::ComponentRunner;
use crate::component_runner::ComponentStarter;

pub struct ComponentServer<Component, Request, Response>
where
Component: ComponentRequestHandler<Request, Response>,
Component: ComponentRequestHandler<Request, Response> + ComponentStarter,
Request: Send + Sync,
Response: Send + Sync,
{
Expand All @@ -31,7 +31,7 @@ where

impl<Component, Request, Response> ComponentServer<Component, Request, Response>
where
Component: ComponentRequestHandler<Request, Response>,
Component: ComponentRequestHandler<Request, Response> + ComponentStarter,
Request: Send + Sync,
Response: Send + Sync,
{
Expand All @@ -52,22 +52,37 @@ pub trait ComponentServerStarter: Send + Sync {
impl<Component, Request, Response> ComponentServerStarter
for ComponentServer<Component, Request, Response>
where
Component: ComponentRequestHandler<Request, Response> + Send + Sync,
Component: ComponentRequestHandler<Request, Response> + ComponentStarter + Send + Sync,
Request: Send + Sync,
Response: Send + Sync,
{
async fn start(&mut self) {
while let Some(request_and_res_tx) = self.rx.recv().await {
let request = request_and_res_tx.request;
let tx = request_and_res_tx.tx;
if start_component(&mut self.component).await {
while let Some(request_and_res_tx) = self.rx.recv().await {
let request = request_and_res_tx.request;
let tx = request_and_res_tx.tx;

let res = self.component.handle_request(request).await;
let res = self.component.handle_request(request).await;

tx.send(res).await.expect("Response connection should be open.");
tx.send(res).await.expect("Response connection should be open.");
}
}
}
}

pub async fn start_component<Component>(component: &mut Component) -> bool
where
Component: ComponentStarter + Sync + Send,
{
if let Err(err) = component.start().await {
error!("ComponentServer::start() failed: {:?}", err);
return false;
}

info!("ComponentServer::start() completed.");
true
}

pub struct ComponentServerHttp<Component, Request, Response>
where
Component: ComponentRequestHandler<Request, Response> + Send + 'static,
Expand Down Expand Up @@ -139,26 +154,23 @@ where
}
}

pub struct EmptyServer<T: ComponentRunner + Send + Sync> {
pub struct EmptyServer<T: ComponentStarter + Send + Sync> {
component: T,
}

impl<T: ComponentRunner + Send + Sync> EmptyServer<T> {
impl<T: ComponentStarter + Send + Sync> EmptyServer<T> {
pub fn new(component: T) -> Self {
Self { component }
}
}

#[async_trait]
impl<T: ComponentRunner + Send + Sync> ComponentServerStarter for EmptyServer<T> {
impl<T: ComponentStarter + Send + Sync> ComponentServerStarter for EmptyServer<T> {
async fn start(&mut self) {
match self.component.start().await {
Ok(_) => info!("ComponentServer::start() completed."),
Err(err) => error!("ComponentServer::start() failed: {:?}", err),
}
start_component(&mut self.component).await;
}
}

pub fn create_empty_server<T: ComponentRunner + Send + Sync>(component: T) -> EmptyServer<T> {
pub fn create_empty_server<T: ComponentStarter + Send + Sync>(component: T) -> EmptyServer<T> {
EmptyServer::new(component)
}
7 changes: 7 additions & 0 deletions crates/mempool_infra/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use starknet_mempool_infra::component_client::ClientResult;
use starknet_mempool_infra::component_runner::ComponentStarter;

pub(crate) type ValueA = u32;
pub(crate) type ValueB = u8;
Expand Down Expand Up @@ -55,6 +56,9 @@ impl ComponentA {
}
}

#[async_trait]
impl ComponentStarter for ComponentA {}

pub(crate) struct ComponentB {
value: ValueB,
_a: Box<dyn ComponentAClientTrait>,
Expand All @@ -69,3 +73,6 @@ impl ComponentB {
self.value
}
}

#[async_trait]
impl ComponentStarter for ComponentB {}
Loading