Skip to content

Commit

Permalink
refactor: renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
taco-paco committed May 15, 2024
1 parent 953ba7e commit faeccc4
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 16 deletions.
14 changes: 7 additions & 7 deletions indexer/src/candidates_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::{
block_listener::CandidateData,
errors::Result,
metrics::{make_candidates_validator_metrics, CandidatesListener, Metricable},
rabbit_publisher::RabbitPublisherHandle,
rabbit_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext},
rmq_publisher::RmqPublisherHandle,
rmq_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext},
};

const CANDIDATES_VALIDATOR: &str = "candidates_validator";
Expand All @@ -37,7 +37,7 @@ impl CandidatesValidator {
async fn ticker(
mut done: oneshot::Receiver<()>,
queue_protected: ProtectedQueue,
mut rmq_handle: RabbitPublisherHandle,
mut rmq_handle: RmqPublisherHandle,
view_client: actix::Addr<near_client::ViewClientActor>,
listener: Option<CandidatesListener>,
) {
Expand All @@ -61,7 +61,7 @@ impl CandidatesValidator {
// Assumes queue is under mutex
async fn flush(
queue: &mut VecDeque<CandidateData>,
rmq_handle: &mut RabbitPublisherHandle,
rmq_handle: &mut RmqPublisherHandle,
view_client: &actix::Addr<near_client::ViewClientActor>,
listener: Option<CandidatesListener>,
) -> Result<bool> {
Expand Down Expand Up @@ -115,7 +115,7 @@ impl CandidatesValidator {
.unwrap_or(FinalExecutionStatus::NotStarted))
}

async fn send(candidate_data: &CandidateData, rmq_handle: &mut RabbitPublisherHandle) -> Result<()> {
async fn send(candidate_data: &CandidateData, rmq_handle: &mut RmqPublisherHandle) -> Result<()> {
// TODO: is sequential order important here?
for data in candidate_data.clone().payloads {
rmq_handle
Expand All @@ -141,7 +141,7 @@ impl CandidatesValidator {
async fn process_candidates(
self,
mut receiver: mpsc::Receiver<CandidateData>,
mut rmq_handle: RabbitPublisherHandle,
mut rmq_handle: RmqPublisherHandle,
) -> Result<()> {
let Self { view_client, listener } = self;

Expand Down Expand Up @@ -210,7 +210,7 @@ impl CandidatesValidator {
let (sender, receiver) = mpsc::channel(1000);
actix::spawn(
self.clone()
.process_candidates(candidates_receiver, RabbitPublisherHandle { sender }),
.process_candidates(candidates_receiver, RmqPublisherHandle { sender }),
);

receiver
Expand Down
7 changes: 3 additions & 4 deletions indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use tracing::{error, info};

use crate::{
candidates_validator::CandidatesValidator, configs::RunConfigArgs, errors::Error, errors::Result,
indexer_wrapper::IndexerWrapper, metrics::Metricable, metrics_server::MetricsServer,
rabbit_publisher::RabbitPublisher,
indexer_wrapper::IndexerWrapper, metrics::Metricable, metrics_server::MetricsServer, rmq_publisher::RmqPublisher,
};

mod block_listener;
Expand All @@ -16,7 +15,7 @@ mod errors;
mod indexer_wrapper;
mod metrics;
mod metrics_server;
mod rabbit_publisher;
mod rmq_publisher;

const INDEXER: &str = "indexer";

Expand Down Expand Up @@ -53,7 +52,7 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> {
}

let validated_stream = candidates_validator.run(candidates_stream);
let mut rmq_publisher = RabbitPublisher::new(&config.rmq_address)?;
let mut rmq_publisher = RmqPublisher::new(&config.rmq_address)?;
if let Some(_) = config.metrics_ip_port_address {
rmq_publisher.enable_metrics(registry.clone())?;
}
Expand Down
10 changes: 5 additions & 5 deletions indexer/src/rabbit_publisher.rs → indexer/src/rmq_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ pub struct PublishData {
}

#[derive(Clone)]
pub struct RabbitPublisherHandle {
pub struct RmqPublisherHandle {
pub sender: mpsc::Sender<PublishData>,
}

impl RabbitPublisherHandle {
impl RmqPublisherHandle {
pub async fn publish(&mut self, publish_data: PublishData) -> Result<()> {
Ok(self.sender.send(publish_data).await?)
}
Expand All @@ -84,12 +84,12 @@ impl RabbitPublisherHandle {
}

#[derive(Clone)]
pub struct RabbitPublisher {
pub struct RmqPublisher {
connection_pool: Pool,
listener: Option<PublisherListener>,
}

impl RabbitPublisher {
impl RmqPublisher {
pub fn new(addr: &str) -> Result<Self> {
let connection_pool = create_connection_pool(addr.into())?;

Expand All @@ -106,7 +106,7 @@ impl RabbitPublisher {
}
}

impl Metricable for RabbitPublisher {
impl Metricable for RmqPublisher {
fn enable_metrics(&mut self, registry: Registry) -> Result<()> {
let listener = make_publisher_metrics(registry)?;
self.listener = Some(listener);
Expand Down

0 comments on commit faeccc4

Please sign in to comment.