diff --git a/Cargo.lock b/Cargo.lock index 710ccebac..0a93341ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5489,6 +5489,7 @@ name = "starknet_gateway" version = "0.0.0" dependencies = [ "assert_matches", + "async-trait", "axum", "blockifier 0.7.0-dev.1 (git+https://github.com/starkware-libs/blockifier.git?branch=main-mempool)", "cairo-lang-starknet-classes", @@ -5502,6 +5503,7 @@ dependencies = [ "serde_json", "starknet_api", "starknet_mempool", + "starknet_mempool_infra", "starknet_mempool_types", "starknet_sierra_compile", "thiserror", diff --git a/crates/gateway/Cargo.toml b/crates/gateway/Cargo.toml index 5402db98d..077c00603 100644 --- a/crates/gateway/Cargo.toml +++ b/crates/gateway/Cargo.toml @@ -12,6 +12,7 @@ workspace = true testing = [] [dependencies] +async-trait.workspace = true axum.workspace = true blockifier.workspace = true cairo-lang-starknet-classes.workspace = true @@ -22,6 +23,7 @@ reqwest.workspace = true serde.workspace = true serde_json.workspace = true starknet_api.workspace = true +starknet_mempool_infra = { path = "../mempool_infra", version = "0.0" } starknet_mempool_types = { path = "../mempool_types", version = "0.0" } starknet_sierra_compile = { path = "../starknet_sierra_compile", version = "0.0" } thiserror.workspace = true diff --git a/crates/gateway/src/gateway.rs b/crates/gateway/src/gateway.rs index 6efc97ba2..5678124c2 100644 --- a/crates/gateway/src/gateway.rs +++ b/crates/gateway/src/gateway.rs @@ -3,12 +3,15 @@ use std::net::SocketAddr; use std::panic; use std::sync::Arc; +use async_trait::async_trait; use axum::extract::State; use axum::routing::{get, post}; use axum::{Json, Router}; use blockifier::execution::contract_class::{ClassInfo, ContractClass, ContractClassV1}; use starknet_api::rpc_transaction::{RPCDeclareTransaction, RPCTransaction}; use starknet_api::transaction::TransactionHash; +use starknet_mempool_infra::component_runner::{ComponentRunner, ComponentStartError}; +use starknet_mempool_infra::empty_server::{create_empty_server, EmptyServer}; use starknet_mempool_types::communication::SharedMempoolClient; use starknet_mempool_types::mempool_types::{Account, MempoolInput}; use starknet_sierra_compile::compile::{compile_sierra_to_casm, CompilationUtilError}; @@ -177,3 +180,18 @@ pub fn create_gateway( let state_reader_factory = Arc::new(RpcStateReaderFactory { config: rpc_state_reader_config }); Gateway::new(config, state_reader_factory, client) } + +pub type GatewayCommunicationServer = EmptyServer; + +pub fn create_gateway_server(gateway: Gateway) -> GatewayCommunicationServer { + create_empty_server(gateway) +} + +#[async_trait] +impl ComponentRunner for Gateway { + async fn start(&mut self) -> Result<(), ComponentStartError> { + // TODO(Lev, 23/07/2024): Implement the real logic. + println!("Gateway::start()"); + Ok(()) + } +} diff --git a/crates/gateway/src/gateway_test.rs b/crates/gateway/src/gateway_test.rs index 684f4dff4..d9611652c 100644 --- a/crates/gateway/src/gateway_test.rs +++ b/crates/gateway/src/gateway_test.rs @@ -11,6 +11,7 @@ use starknet_api::rpc_transaction::RPCTransaction; use starknet_api::transaction::TransactionHash; use starknet_mempool::communication::create_mempool_server; use starknet_mempool::mempool::Mempool; +use starknet_mempool_infra::component_server::CommunicationServer; use starknet_mempool_types::communication::{MempoolClientImpl, MempoolRequestAndResponseSender}; use tokio::sync::mpsc::channel; use tokio::task; @@ -76,6 +77,7 @@ async fn test_add_tx( ) { // TODO(Tsabary): wrap creation of channels in dedicated functions, take channel capacity from // config. + let (tx_mempool, rx_mempool) = channel::(MEMPOOL_INVOCATIONS_QUEUE_SIZE); let mut mempool_server = create_mempool_server(mempool, rx_mempool); diff --git a/crates/mempool_infra/src/component_server.rs b/crates/mempool_infra/src/component_server.rs index 0a726100e..26880d72d 100644 --- a/crates/mempool_infra/src/component_server.rs +++ b/crates/mempool_infra/src/component_server.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use tokio::sync::mpsc::Receiver; use crate::component_definitions::{ComponentRequestAndResponseSender, ComponentRequestHandler}; @@ -25,7 +26,7 @@ where Self { component, rx } } - pub async fn start(&mut self) { + pub async fn request_response_loop(&mut self) { while let Some(request_and_res_tx) = self.rx.recv().await { let request = request_and_res_tx.request; let tx = request_and_res_tx.tx; @@ -36,3 +37,22 @@ where } } } + +#[async_trait] +pub trait CommunicationServer: Send + Sync { + async fn start(&mut self); +} + +#[async_trait] +impl CommunicationServer + for ComponentServer +where + Component: ComponentRequestHandler + Send + Sync, + Request: Send + Sync, + Response: Send + Sync, +{ + async fn start(&mut self) { + self.request_response_loop().await; + println!("ComponentServer completed unexpectedly."); + } +} diff --git a/crates/mempool_infra/src/empty_server.rs b/crates/mempool_infra/src/empty_server.rs new file mode 100644 index 000000000..1b7860de8 --- /dev/null +++ b/crates/mempool_infra/src/empty_server.rs @@ -0,0 +1,33 @@ +use async_trait::async_trait; + +// use tokio::sync::mpsc::channel; + +// use crate::component_definitions::{ComponentRequestAndResponseSender, +// ComponentRequestHandler}; use crate::component_runner::{ComponentRunner, +// ComponentStartError}; +use crate::component_runner::ComponentRunner; +use crate::component_server::CommunicationServer; + +pub struct EmptyServer { + component: T, +} + +impl EmptyServer { + pub fn new(component: T) -> Self { + Self { component } + } +} + +#[async_trait] +impl CommunicationServer for EmptyServer { + async fn start(&mut self) { + match self.component.start().await { + Ok(_) => println!("ComponentServer::start() completed."), + Err(err) => println!("ComponentServer::start() failed: {:?}", err), + } + } +} + +pub fn create_empty_server(component: T) -> EmptyServer { + EmptyServer::new(component) +} diff --git a/crates/mempool_infra/src/lib.rs b/crates/mempool_infra/src/lib.rs index 6f843ec3c..208c4c39b 100644 --- a/crates/mempool_infra/src/lib.rs +++ b/crates/mempool_infra/src/lib.rs @@ -2,3 +2,4 @@ pub mod component_client; pub mod component_definitions; pub mod component_runner; pub mod component_server; +pub mod empty_server; diff --git a/crates/mempool_infra/tests/component_server_client_test.rs b/crates/mempool_infra/tests/component_server_client_test.rs index df66c5e6f..56b0ae573 100644 --- a/crates/mempool_infra/tests/component_server_client_test.rs +++ b/crates/mempool_infra/tests/component_server_client_test.rs @@ -6,7 +6,7 @@ use starknet_mempool_infra::component_client::ComponentClient; use starknet_mempool_infra::component_definitions::{ ComponentRequestAndResponseSender, ComponentRequestHandler, }; -use starknet_mempool_infra::component_server::ComponentServer; +use starknet_mempool_infra::component_server::{CommunicationServer, ComponentServer}; use tokio::sync::mpsc::{channel, Sender}; use tokio::task; diff --git a/crates/tests-integration/Cargo.toml b/crates/tests-integration/Cargo.toml index bfd56dbfb..2fb3eb1a4 100644 --- a/crates/tests-integration/Cargo.toml +++ b/crates/tests-integration/Cargo.toml @@ -22,6 +22,7 @@ starknet_api.workspace = true starknet_client.workspace = true starknet_gateway = { path = "../gateway", version = "0.0", features = ["testing"] } starknet_mempool = { path = "../mempool", version = "0.0" } +starknet_mempool_infra = { path = "../mempool_infra", version = "0.0" } starknet_mempool_types = { path = "../mempool_types", version = "0.0" } starknet_task_executor = { path = "../task_executor", version = "0.0" } strum.workspace = true diff --git a/crates/tests-integration/src/integration_test_setup.rs b/crates/tests-integration/src/integration_test_setup.rs index b51db5831..afb01fd9d 100644 --- a/crates/tests-integration/src/integration_test_setup.rs +++ b/crates/tests-integration/src/integration_test_setup.rs @@ -7,6 +7,7 @@ use starknet_gateway::config::GatewayNetworkConfig; use starknet_gateway::errors::GatewayError; use starknet_mempool::communication::create_mempool_server; use starknet_mempool::mempool::Mempool; +use starknet_mempool_infra::component_server::CommunicationServer; use starknet_mempool_types::communication::{ MempoolClient, MempoolClientImpl, MempoolRequestAndResponseSender, };