Skip to content

Commit

Permalink
Merge branch 'main' into feature/opt-out-operator
Browse files Browse the repository at this point in the history
  • Loading branch information
emlautarom1 committed May 23, 2024
2 parents 9d45fbe + 3d5e228 commit e1287fa
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 107 deletions.
4 changes: 2 additions & 2 deletions aggregator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,10 @@ func MakeAggregatorMetrics(registry *prometheus.Registry) (AggregatorEventListen

return &SelectiveAggregatorListener{
ObserveLastStateRootUpdateAggregatedCb: func(rollupId uint32, blockNumber uint64) {
lastStateRootUpdateAggregated.WithLabelValues(fmt.Sprintf("%x", rollupId)).Set(float64(blockNumber))
lastStateRootUpdateAggregated.WithLabelValues(fmt.Sprintf("%d", rollupId)).Set(float64(blockNumber))
},
ObserveLastStateRootUpdateReceivedCb: func(rollupId uint32, blockNumber uint64) {
lastStateRootUpdateReceived.WithLabelValues(fmt.Sprintf("%x", rollupId)).Set(float64(blockNumber))
lastStateRootUpdateReceived.WithLabelValues(fmt.Sprintf("%d", rollupId)).Set(float64(blockNumber))
},
ObserveLastOperatorSetUpdateAggregatedCb: func(operatorSetUpdateId uint64) {
lastOperatorSetUpdateAggregated.Set(float64(operatorSetUpdateId))
Expand Down
2 changes: 0 additions & 2 deletions core/safeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ func (c *SafeEthClient) handleReinitEvent() {
return
}

defer c.wg.Done()

c.isReinitializing = true

c.wg.Add(1)
Expand Down
14 changes: 7 additions & 7 deletions indexer/src/candidates_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use tracing::info;
use crate::{
errors::Result,
metrics::{make_candidates_validator_metrics, CandidatesListener, Metricable},
rabbit_publisher::RabbitPublisherHandle,
rabbit_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext},
rmq_publisher::RmqPublisherHandle,
rmq_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext},
types,
};

Expand All @@ -35,7 +35,7 @@ impl CandidatesValidator {
async fn ticker(
mut done: oneshot::Receiver<()>,
queue_protected: types::ProtectedQueue<types::CandidateData>,
mut rmq_handle: RabbitPublisherHandle,
mut rmq_handle: RmqPublisherHandle,
view_client: actix::Addr<near_client::ViewClientActor>,
listener: Option<CandidatesListener>,
) {
Expand All @@ -59,7 +59,7 @@ impl CandidatesValidator {
// Assumes queue is under mutex
async fn flush(
queue: &mut VecDeque<types::CandidateData>,
rmq_handle: &mut RabbitPublisherHandle,
rmq_handle: &mut RmqPublisherHandle,
view_client: &actix::Addr<near_client::ViewClientActor>,
listener: Option<CandidatesListener>,
) -> Result<bool> {
Expand Down Expand Up @@ -113,7 +113,7 @@ impl CandidatesValidator {
.unwrap_or(FinalExecutionStatus::NotStarted))
}

async fn send(candidate_data: &types::CandidateData, rmq_handle: &mut RabbitPublisherHandle) -> Result<()> {
async fn send(candidate_data: &types::CandidateData, rmq_handle: &mut RmqPublisherHandle) -> Result<()> {
// TODO: is sequential order important here?
for data in candidate_data.clone().payloads {
rmq_handle
Expand All @@ -139,7 +139,7 @@ impl CandidatesValidator {
async fn process_candidates(
self,
mut receiver: mpsc::Receiver<types::CandidateData>,
mut rmq_handle: RabbitPublisherHandle,
mut rmq_handle: RmqPublisherHandle,
) -> Result<()> {
let Self { view_client, listener } = self;

Expand Down Expand Up @@ -207,7 +207,7 @@ impl CandidatesValidator {
let (sender, receiver) = mpsc::channel(1000);
actix::spawn(
self.clone()
.process_candidates(candidates_receiver, RabbitPublisherHandle { sender }),
.process_candidates(candidates_receiver, RmqPublisherHandle { sender }),
);

receiver
Expand Down
7 changes: 3 additions & 4 deletions indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use tracing::{error, info};

use crate::{
candidates_validator::CandidatesValidator, configs::RunConfigArgs, errors::Error, errors::Result,
indexer_wrapper::IndexerWrapper, metrics::Metricable, metrics_server::MetricsServer,
rabbit_publisher::RabbitPublisher,
indexer_wrapper::IndexerWrapper, metrics::Metricable, metrics_server::MetricsServer, rmq_publisher::RmqPublisher,
};

mod block_listener;
Expand All @@ -16,7 +15,7 @@ mod errors;
mod indexer_wrapper;
mod metrics;
mod metrics_server;
mod rabbit_publisher;
mod rmq_publisher;
mod types;

const INDEXER: &str = "indexer";
Expand Down Expand Up @@ -54,7 +53,7 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> {
}

let validated_stream = candidates_validator.run(candidates_stream);
let mut rmq_publisher = RabbitPublisher::new(&config.rmq_address)?;
let mut rmq_publisher = RmqPublisher::new(&config.rmq_address)?;
if let Some(_) = config.metrics_ip_port_address {
rmq_publisher.enable_metrics(registry.clone())?;
}
Expand Down
216 changes: 132 additions & 84 deletions indexer/src/rabbit_publisher.rs → indexer/src/rmq_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use borsh::{BorshDeserialize, BorshSerialize};
use deadpool::managed::PoolError;
use deadpool_lapin::{Manager, Pool};
use lapin::{
options::{BasicPublishOptions, ExchangeDeclareOptions},
Expand All @@ -7,11 +8,14 @@ use lapin::{
};
use near_indexer::near_primitives::hash::CryptoHash;
use prometheus::Registry;
use tokio::sync::mpsc;
use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{error, info};

use crate::errors::{Error, Result};
use crate::metrics::{make_publisher_metrics, Metricable, PublisherListener};
use crate::{
errors::{Error, Result},
metrics::{make_publisher_metrics, Metricable, PublisherListener},
};

const PUBLISHER: &str = "publisher";
const EXCHANGE_NAME: &str = "rollup_exchange";
Expand Down Expand Up @@ -65,11 +69,11 @@ pub struct PublishData {
}

#[derive(Clone)]
pub struct RabbitPublisherHandle {
pub struct RmqPublisherHandle {
pub sender: mpsc::Sender<PublishData>,
}

impl RabbitPublisherHandle {
impl RmqPublisherHandle {
pub async fn publish(&mut self, publish_data: PublishData) -> Result<()> {
Ok(self.sender.send(publish_data).await?)
}
Expand All @@ -80,12 +84,12 @@ impl RabbitPublisherHandle {
}

#[derive(Clone)]
pub struct RabbitPublisher {
pub struct RmqPublisher {
connection_pool: Pool,
listener: Option<PublisherListener>,
}

impl RabbitPublisher {
impl RmqPublisher {
pub fn new(addr: &str) -> Result<Self> {
let connection_pool = create_connection_pool(addr.into())?;

Expand All @@ -95,11 +99,72 @@ impl RabbitPublisher {
})
}

pub fn run(&self, receiver: mpsc::Receiver<PublishData>) {
actix::spawn(self.clone().publisher(receiver));
pub fn run(&self, receiver: mpsc::Receiver<PublishData>) -> JoinHandle<()> {
let task = RmqPublisherTask::new(self.connection_pool.clone(), self.listener.clone(), receiver);

actix::spawn(task.run())
}
}

impl Metricable for RmqPublisher {
fn enable_metrics(&mut self, registry: Registry) -> Result<()> {
let listener = make_publisher_metrics(registry)?;
self.listener = Some(listener);

Ok(())
}
}

enum RmqPublisherState {
Shutdown,
WaitingForConnection,
Connected { connection: Connection },
}

struct RmqPublisherTask {
connection_pool: Pool,
receiver: mpsc::Receiver<PublishData>,
listener: Option<PublisherListener>,
}

impl RmqPublisherTask {
pub fn new(
connection_pool: Pool,
listener: Option<PublisherListener>,
receiver: mpsc::Receiver<PublishData>,
) -> Self {
Self {
connection_pool,
receiver,
listener,
}
}

pub async fn run(mut self) {
const RECONNECTION_INTERVAL: Duration = Duration::from_secs(2);

let mut next_step = self.connect().await;
loop {
next_step = match next_step {
RmqPublisherState::WaitingForConnection => {
tokio::time::sleep(RECONNECTION_INTERVAL).await;

info!(target: PUBLISHER, "Reconnecting to RMQ");
self.connect().await
}
RmqPublisherState::Connected { connection } => {
info!(target: PUBLISHER, "RMQ connected");
self.process_stream(connection).await
}
RmqPublisherState::Shutdown => {
self.receiver.close();
return;
}
}
}
}

async fn exchange_declare(connection: &Connection) -> Result<()> {
async fn exchange_declare(connection: &Connection) -> Result<(), lapin::Error> {
let channel = connection.create_channel().await?;
channel
.exchange_declare(
Expand All @@ -119,91 +184,83 @@ impl RabbitPublisher {
Ok(())
}

async fn publisher(self, mut receiver: mpsc::Receiver<PublishData>) {
const ERROR_CODE: i32 = 1;

let Self {
connection_pool,
listener,
} = self;
let mut connection = match connection_pool.get().await {
async fn connect(&mut self) -> RmqPublisherState {
let Self { connection_pool, .. } = self;
let connection = match connection_pool.get().await {
Ok(connection) => connection,
Err(err) => {
Self::handle_error(err, None);
actix::System::current().stop_with_code(ERROR_CODE);
return;
return match err {
PoolError::Timeout(_) | PoolError::Backend(_) => RmqPublisherState::WaitingForConnection,
PoolError::Closed | PoolError::NoRuntimeSpecified | PoolError::PostCreateHook(_) => {
RmqPublisherState::Shutdown
}
}
}
};

match Self::exchange_declare(&connection).await {
Ok(_) => {}
Ok(_) => RmqPublisherState::Connected { connection },
Err(err) => {
error!(target: PUBLISHER, "Failed to declare exchange: {}", err);
receiver.close();
actix::System::current().stop_with_code(ERROR_CODE);
}
};

let logic = |connection_pool: Pool, mut connection: Connection, publish_data: PublishData| async move {
if !connection.status().connected() {
connection = connection_pool.get().await?;
RmqPublisherState::WaitingForConnection
}
}
}

let channel = connection.create_channel().await?;
async fn publish(
connection: &Connection,
payload: &[u8],
publish_options: PublishOptions,
) -> Result<(), lapin::Error> {
let PublishOptions {
exchange,
routing_key,
basic_publish_options,
basic_properties,
} = publish_options;

let PublishOptions {
exchange,
routing_key,
let channel = connection.create_channel().await?;
channel
.basic_publish(
&exchange,
&routing_key,
basic_publish_options,
&payload,
basic_properties,
} = publish_data.publish_options.clone();
)
.await?;

Ok(())
}

async fn process_stream(&mut self, connection: Connection) -> RmqPublisherState {
while let Some(publish_data) = self.receiver.recv().await {
let mut payload: Vec<u8> = Vec::new();
publish_data.payload.serialize(&mut payload)?;

info!(target: PUBLISHER, "Publishing transaction: {:?}", publish_data.payload.transaction_id);

channel
.basic_publish(
&exchange,
&routing_key,
basic_publish_options,
&payload,
basic_properties,
)
.await?;

info!(target: PUBLISHER, "published tx: {}, routing_key: {}", publish_data.payload.transaction_id, routing_key);
Ok::<_, Error>(connection)
};
if let Err(err) = publish_data.payload.serialize(&mut payload) {
info!(target: PUBLISHER, "couldn't serialize publish payload {}", err.to_string());
continue;
}

let code = loop {
if let Some(publish_data) = receiver.recv().await {
let start_time = std::time::Instant::now();
match logic(connection_pool.clone(), connection, publish_data.clone()).await {
Ok(new_connection) => {
let duration = start_time.elapsed();
listener.as_ref().map(|l| {
l.num_published_blocks.inc();
l.publish_duration_histogram.observe(duration.as_millis() as f64);
});

connection = new_connection;
}
Err(err) => {
listener.as_ref().map(|l| l.num_failed_publishes.inc());
let start_time = std::time::Instant::now();
match Self::publish(&connection, &payload, publish_data.publish_options.clone()).await {
Ok(_) => {
let duration = start_time.elapsed();
self.listener.as_ref().map(|l| {
l.num_published_blocks.inc();
l.publish_duration_histogram.observe(duration.as_millis() as f64);
});
info!(target: PUBLISHER, "published tx: {}, routing_key: {}", publish_data.payload.transaction_id, publish_data.publish_options.routing_key);
}
Err(err) => {
self.listener.as_ref().map(|l| l.num_failed_publishes.inc());
Self::handle_error(err, Some(publish_data));

Self::handle_error(err, Some(publish_data));
break ERROR_CODE;
}
return RmqPublisherState::WaitingForConnection;
}
} else {
break 0;
}
};
}

receiver.close();
actix::System::current().stop_with_code(code);
RmqPublisherState::Shutdown
}

fn handle_error(error: impl Into<Error>, publish_data: Option<PublishData>) {
Expand Down Expand Up @@ -231,12 +288,3 @@ pub(crate) fn create_connection_pool(addr: String) -> Result<Pool> {

Ok(pool)
}

impl Metricable for RabbitPublisher {
fn enable_metrics(&mut self, registry: Registry) -> Result<()> {
let listener = make_publisher_metrics(registry)?;
self.listener = Some(listener);

Ok(())
}
}
Loading

0 comments on commit e1287fa

Please sign in to comment.