From f6bd8aaf5e08ced70dbc66f87aa9ec4b8a3520af Mon Sep 17 00:00:00 2001 From: Lev Roitman Date: Thu, 20 Jun 2024 13:02:07 +0300 Subject: [PATCH] feat: add function to create all clients commit-id:414d7889 --- crates/mempool_infra/src/component_runner.rs | 4 +- crates/mempool_node/src/communication.rs | 53 ++++++++++++++++---- 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/crates/mempool_infra/src/component_runner.rs b/crates/mempool_infra/src/component_runner.rs index 7458987a7..19e2d9800 100644 --- a/crates/mempool_infra/src/component_runner.rs +++ b/crates/mempool_infra/src/component_runner.rs @@ -13,12 +13,12 @@ pub enum ComponentStartError { InternalComponentError, } -/// Interface to create memepool components. +/// Interface to create components. pub trait ComponentCreator { fn create(config: T) -> Self; } -/// Interface to start memepool components. +/// Interface to start components. #[async_trait] pub trait ComponentRunner { /// Start the component. Normally this function should never return. diff --git a/crates/mempool_node/src/communication.rs b/crates/mempool_node/src/communication.rs index 6dc3fffe2..09a8eaeef 100644 --- a/crates/mempool_node/src/communication.rs +++ b/crates/mempool_node/src/communication.rs @@ -1,17 +1,25 @@ -use starknet_mempool_types::communication::MempoolRequestAndResponseSender; +// use std::sync::Arc; + +use std::sync::Arc; + +use starknet_mempool_types::communication::{ + MempoolClientImpl, MempoolRequestAndResponseSender, SharedMempoolClient, +}; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use crate::config::MempoolNodeConfig; + pub struct ComponentCommunication { - tx: Sender, + tx: Option>, rx: Option>, } impl ComponentCommunication { - fn get_tx(&self) -> Sender { - self.tx.clone() + fn take_tx(&self) -> Sender { + self.tx.to_owned().expect("Sender should be available, could be taken only once") } - fn get_rx(&mut self) -> Receiver { - self.rx.take().expect("Receiver already taken") + fn take_rx(&mut self) -> Receiver { + self.rx.take().expect("Receiver should be available, could be taken only once") } } @@ -20,11 +28,11 @@ pub struct MempoolNodeCommunication { } impl MempoolNodeCommunication { - pub fn get_mempool_tx(&self) -> Sender { - self.mempool_channel.get_tx() + pub fn take_mempool_tx(&self) -> Sender { + self.mempool_channel.take_tx() } - pub fn get_mempool_rx(&mut self) -> Receiver { - self.mempool_channel.get_rx() + pub fn take_mempool_rx(&mut self) -> Receiver { + self.mempool_channel.take_rx() } } @@ -33,6 +41,29 @@ pub fn create_node_channels() -> MempoolNodeCommunication { let (tx_mempool, rx_mempool) = channel::(MEMPOOL_INVOCATIONS_QUEUE_SIZE); MempoolNodeCommunication { - mempool_channel: ComponentCommunication { tx: tx_mempool, rx: Some(rx_mempool) }, + mempool_channel: ComponentCommunication { tx: Some(tx_mempool), rx: Some(rx_mempool) }, + } +} + +pub struct MempoolNodeClients { + mempool_client: Option, + // TODO (Lev 25/06/2024): Change to Option>. +} + +impl MempoolNodeClients { + pub fn get_mempool_client(&self) -> Option { + self.mempool_client.clone() } } + +pub fn create_node_clients( + config: &MempoolNodeConfig, + channels: &MempoolNodeCommunication, +) -> MempoolNodeClients { + let mempool_client: Option = + match config.components.gateway_component.execute { + true => Some(Arc::new(MempoolClientImpl::new(channels.take_mempool_tx()))), + false => None, + }; + MempoolNodeClients { mempool_client } +}