Skip to content

Commit

Permalink
feat: add function to create all clients
Browse files Browse the repository at this point in the history
commit-id:414d7889
  • Loading branch information
lev-starkware committed Jun 26, 2024
1 parent 495f4a7 commit f6bd8aa
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
4 changes: 2 additions & 2 deletions crates/mempool_infra/src/component_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ pub enum ComponentStartError {
InternalComponentError,
}

/// Interface to create memepool components.
/// Interface to create components.
pub trait ComponentCreator<T: SerializeConfig> {
fn create(config: T) -> Self;
}

/// Interface to start memepool components.
/// Interface to start components.
#[async_trait]
pub trait ComponentRunner {
/// Start the component. Normally this function should never return.
Expand Down
53 changes: 42 additions & 11 deletions crates/mempool_node/src/communication.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use starknet_mempool_types::communication::MempoolRequestAndResponseSender;
// use std::sync::Arc;

use std::sync::Arc;

use starknet_mempool_types::communication::{
MempoolClientImpl, MempoolRequestAndResponseSender, SharedMempoolClient,
};
use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::config::MempoolNodeConfig;

pub struct ComponentCommunication<T: Send + Sync> {
tx: Sender<T>,
tx: Option<Sender<T>>,
rx: Option<Receiver<T>>,
}

impl<T: Send + Sync> ComponentCommunication<T> {
fn get_tx(&self) -> Sender<T> {
self.tx.clone()
fn take_tx(&self) -> Sender<T> {
self.tx.to_owned().expect("Sender should be available, could be taken only once")
}
fn get_rx(&mut self) -> Receiver<T> {
self.rx.take().expect("Receiver already taken")
fn take_rx(&mut self) -> Receiver<T> {
self.rx.take().expect("Receiver should be available, could be taken only once")
}
}

Expand All @@ -20,11 +28,11 @@ pub struct MempoolNodeCommunication {
}

impl MempoolNodeCommunication {
pub fn get_mempool_tx(&self) -> Sender<MempoolRequestAndResponseSender> {
self.mempool_channel.get_tx()
pub fn take_mempool_tx(&self) -> Sender<MempoolRequestAndResponseSender> {
self.mempool_channel.take_tx()
}
pub fn get_mempool_rx(&mut self) -> Receiver<MempoolRequestAndResponseSender> {
self.mempool_channel.get_rx()
pub fn take_mempool_rx(&mut self) -> Receiver<MempoolRequestAndResponseSender> {
self.mempool_channel.take_rx()
}
}

Expand All @@ -33,6 +41,29 @@ pub fn create_node_channels() -> MempoolNodeCommunication {
let (tx_mempool, rx_mempool) =
channel::<MempoolRequestAndResponseSender>(MEMPOOL_INVOCATIONS_QUEUE_SIZE);
MempoolNodeCommunication {
mempool_channel: ComponentCommunication { tx: tx_mempool, rx: Some(rx_mempool) },
mempool_channel: ComponentCommunication { tx: Some(tx_mempool), rx: Some(rx_mempool) },
}
}

pub struct MempoolNodeClients {
mempool_client: Option<SharedMempoolClient>,
// TODO (Lev 25/06/2024): Change to Option<Box<dyn MemPoolClient>>.
}

impl MempoolNodeClients {
pub fn get_mempool_client(&self) -> Option<SharedMempoolClient> {
self.mempool_client.clone()
}
}

pub fn create_node_clients(
config: &MempoolNodeConfig,
channels: &MempoolNodeCommunication,
) -> MempoolNodeClients {
let mempool_client: Option<SharedMempoolClient> =
match config.components.gateway_component.execute {
true => Some(Arc::new(MempoolClientImpl::new(channels.take_mempool_tx()))),
false => None,
};
MempoolNodeClients { mempool_client }
}

0 comments on commit f6bd8aa

Please sign in to comment.