From faeccc45a8c633766fefdd7b30ff4e2d810c8ffc Mon Sep 17 00:00:00 2001 From: edwin Date: Wed, 15 May 2024 14:17:26 +0700 Subject: [PATCH] refactor: renaming --- indexer/src/candidates_validator.rs | 14 +++++++------- indexer/src/main.rs | 7 +++---- .../src/{rabbit_publisher.rs => rmq_publisher.rs} | 10 +++++----- 3 files changed, 15 insertions(+), 16 deletions(-) rename indexer/src/{rabbit_publisher.rs => rmq_publisher.rs} (98%) diff --git a/indexer/src/candidates_validator.rs b/indexer/src/candidates_validator.rs index 10e54f292..7594d212e 100644 --- a/indexer/src/candidates_validator.rs +++ b/indexer/src/candidates_validator.rs @@ -12,8 +12,8 @@ use crate::{ block_listener::CandidateData, errors::Result, metrics::{make_candidates_validator_metrics, CandidatesListener, Metricable}, - rabbit_publisher::RabbitPublisherHandle, - rabbit_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext}, + rmq_publisher::RmqPublisherHandle, + rmq_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext}, }; const CANDIDATES_VALIDATOR: &str = "candidates_validator"; @@ -37,7 +37,7 @@ impl CandidatesValidator { async fn ticker( mut done: oneshot::Receiver<()>, queue_protected: ProtectedQueue, - mut rmq_handle: RabbitPublisherHandle, + mut rmq_handle: RmqPublisherHandle, view_client: actix::Addr, listener: Option, ) { @@ -61,7 +61,7 @@ impl CandidatesValidator { // Assumes queue is under mutex async fn flush( queue: &mut VecDeque, - rmq_handle: &mut RabbitPublisherHandle, + rmq_handle: &mut RmqPublisherHandle, view_client: &actix::Addr, listener: Option, ) -> Result { @@ -115,7 +115,7 @@ impl CandidatesValidator { .unwrap_or(FinalExecutionStatus::NotStarted)) } - async fn send(candidate_data: &CandidateData, rmq_handle: &mut RabbitPublisherHandle) -> Result<()> { + async fn send(candidate_data: &CandidateData, rmq_handle: &mut RmqPublisherHandle) -> Result<()> { // TODO: is sequential order important here? for data in candidate_data.clone().payloads { rmq_handle @@ -141,7 +141,7 @@ impl CandidatesValidator { async fn process_candidates( self, mut receiver: mpsc::Receiver, - mut rmq_handle: RabbitPublisherHandle, + mut rmq_handle: RmqPublisherHandle, ) -> Result<()> { let Self { view_client, listener } = self; @@ -210,7 +210,7 @@ impl CandidatesValidator { let (sender, receiver) = mpsc::channel(1000); actix::spawn( self.clone() - .process_candidates(candidates_receiver, RabbitPublisherHandle { sender }), + .process_candidates(candidates_receiver, RmqPublisherHandle { sender }), ); receiver diff --git a/indexer/src/main.rs b/indexer/src/main.rs index 130002ff4..cf8e6bcdb 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -5,8 +5,7 @@ use tracing::{error, info}; use crate::{ candidates_validator::CandidatesValidator, configs::RunConfigArgs, errors::Error, errors::Result, - indexer_wrapper::IndexerWrapper, metrics::Metricable, metrics_server::MetricsServer, - rabbit_publisher::RabbitPublisher, + indexer_wrapper::IndexerWrapper, metrics::Metricable, metrics_server::MetricsServer, rmq_publisher::RmqPublisher, }; mod block_listener; @@ -16,7 +15,7 @@ mod errors; mod indexer_wrapper; mod metrics; mod metrics_server; -mod rabbit_publisher; +mod rmq_publisher; const INDEXER: &str = "indexer"; @@ -53,7 +52,7 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> { } let validated_stream = candidates_validator.run(candidates_stream); - let mut rmq_publisher = RabbitPublisher::new(&config.rmq_address)?; + let mut rmq_publisher = RmqPublisher::new(&config.rmq_address)?; if let Some(_) = config.metrics_ip_port_address { rmq_publisher.enable_metrics(registry.clone())?; } diff --git a/indexer/src/rabbit_publisher.rs b/indexer/src/rmq_publisher.rs similarity index 98% rename from indexer/src/rabbit_publisher.rs rename to indexer/src/rmq_publisher.rs index 206c4b6ed..9fd2989cf 100644 --- a/indexer/src/rabbit_publisher.rs +++ b/indexer/src/rmq_publisher.rs @@ -69,11 +69,11 @@ pub struct PublishData { } #[derive(Clone)] -pub struct RabbitPublisherHandle { +pub struct RmqPublisherHandle { pub sender: mpsc::Sender, } -impl RabbitPublisherHandle { +impl RmqPublisherHandle { pub async fn publish(&mut self, publish_data: PublishData) -> Result<()> { Ok(self.sender.send(publish_data).await?) } @@ -84,12 +84,12 @@ impl RabbitPublisherHandle { } #[derive(Clone)] -pub struct RabbitPublisher { +pub struct RmqPublisher { connection_pool: Pool, listener: Option, } -impl RabbitPublisher { +impl RmqPublisher { pub fn new(addr: &str) -> Result { let connection_pool = create_connection_pool(addr.into())?; @@ -106,7 +106,7 @@ impl RabbitPublisher { } } -impl Metricable for RabbitPublisher { +impl Metricable for RmqPublisher { fn enable_metrics(&mut self, registry: Registry) -> Result<()> { let listener = make_publisher_metrics(registry)?; self.listener = Some(listener);