diff --git a/indexer/src/block_listener.rs b/indexer/src/block_listener.rs index c01e2510..a9a6c643 100644 --- a/indexer/src/block_listener.rs +++ b/indexer/src/block_listener.rs @@ -1,40 +1,46 @@ -use futures::future::join_all; -use near_indexer::near_primitives::{types::AccountId, views::ActionView}; -use near_indexer::StreamerMessage; +use near_indexer::{ + near_primitives::{types::AccountId, views::ActionView}, + StreamerMessage, +}; use prometheus::Registry; -use std::{ - collections::HashMap, - fmt::{self, Formatter}, +use std::{collections::HashMap, collections::VecDeque, sync, time::Duration}; +use tokio::{ + sync::{ + mpsc::{self, error::TrySendError, Receiver}, + oneshot, Mutex, + }, + task::JoinHandle, + time, }; -use tokio::sync::mpsc; -use tokio::sync::mpsc::Receiver; -use tokio::task::JoinHandle; use tracing::info; -use crate::metrics::{make_block_listener_metrics, BlockEventListener, Metricable}; -use crate::{errors::Result, INDEXER}; +use crate::{ + errors::Result, + metrics::{make_block_listener_metrics, BlockEventListener, Metricable}, + types, + types::CandidateData, + INDEXER, +}; + +#[cfg(not(test))] +const EXPIRATION_TIMEOUT: Duration = Duration::from_secs(30); +#[cfg(test)] +const EXPIRATION_TIMEOUT: Duration = Duration::from_millis(200); -#[derive(Clone, Debug)] -pub(crate) struct CandidateData { - pub rollup_id: u32, - pub transaction: near_indexer::IndexerTransactionWithOutcome, - pub payloads: Vec>, +#[derive(Clone)] +struct ExpirableCandidateData { + timestamp: time::Instant, + inner: CandidateData, } -impl fmt::Display for CandidateData { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.write_fmt(format_args!( - "rollup_id: {}, id: {}, signer_id: {}, receiver_id {}", - self.rollup_id, - self.transaction.transaction.hash, - self.transaction.transaction.signer_id, - self.transaction.transaction.receiver_id - )) +impl From for CandidateData { + fn from(value: ExpirableCandidateData) -> Self { + value.inner } } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) struct TransactionWithRollupId { +struct TransactionWithRollupId { pub(crate) rollup_id: u32, pub(crate) transaction: near_indexer::IndexerTransactionWithOutcome, } @@ -75,42 +81,127 @@ impl BlockListener { } } + async fn ticker( + mut done: oneshot::Receiver<()>, + queue_protected: types::ProtectedQueue, + candidates_sender: mpsc::Sender, + listener: Option + ) { + #[cfg(not(test))] + const FLUSH_INTERVAL: Duration = Duration::from_secs(1); + #[cfg(test)] + const FLUSH_INTERVAL: Duration = Duration::from_millis(100); + + let mut interval = time::interval(FLUSH_INTERVAL); + loop { + tokio::select! { + _ = interval.tick() => { + let mut queue = queue_protected.lock().await; + let _ = Self::flush(&mut queue, &candidates_sender); + listener.as_ref().map(|l| l.current_queued_candidates.set(queue.len() as f64)); + + interval.reset(); + }, + _ = &mut done => { + return + } + } + } + } + + fn flush(queue: &mut VecDeque, candidates_sender: &mpsc::Sender) -> bool { + if queue.is_empty() { + return true; + } + + info!(target: INDEXER, "Flushing"); + + let now = time::Instant::now(); + while let Some(candidate) = queue.front() { + if now.duration_since(candidate.timestamp) >= EXPIRATION_TIMEOUT { + queue.pop_front(); + continue; + } + + match candidates_sender.try_send(candidate.clone().into()) { + Ok(_) => { + let _ = queue.pop_front(); + } + // TODO: return TrySendError instead + Err(_) => return false, + } + } + + true + } + + fn extract_candidates(addresses_to_rollup_ids: &HashMap, streamer_message: StreamerMessage) -> Vec { + streamer_message + .shards + .into_iter() + .flat_map(|shard| shard.chunk) + .flat_map(|chunk| { + chunk.transactions.into_iter().filter_map(|transaction| { + addresses_to_rollup_ids + .get(&transaction.transaction.receiver_id) + .map(|rollup_id| TransactionWithRollupId { + rollup_id: *rollup_id, + transaction, + }) + }) + }) + .filter_map(Self::transaction_filter_map) + .collect() + } + + // TODO: introduce Task struct async fn process_stream( self, mut indexer_stream: Receiver, candidates_sender: mpsc::Sender, - ) -> Result<()> { + ) { let Self { addresses_to_rollup_ids, listener, } = self; + let queue_protected = sync::Arc::new(Mutex::new(VecDeque::new())); + let (done_sender, done_receiver) = oneshot::channel(); + actix::spawn(Self::ticker( + done_receiver, + queue_protected.clone(), + candidates_sender.clone(), + listener.clone(), + )); + while let Some(streamer_message) = indexer_stream.recv().await { info!(target: INDEXER, "Received streamer message"); - // TODO: check receipt_receiver is closed? - let candidates_data: Vec = streamer_message - .shards - .into_iter() - .flat_map(|shard| shard.chunk) - .flat_map(|chunk| { - chunk.transactions.into_iter().filter_map(|transaction| { - addresses_to_rollup_ids - .get(&transaction.transaction.receiver_id) - .map(|rollup_id| TransactionWithRollupId { - rollup_id: *rollup_id, - transaction, - }) - }) - }) - .filter_map(Self::transaction_filter_map) - .collect(); - + let candidates_data = Self::extract_candidates(&addresses_to_rollup_ids, streamer_message); if candidates_data.is_empty() { info!(target: INDEXER, "No candidate data found in the streamer message"); continue; } + // TODO: attempt flushing even if no new candidates or not? + // Flushing old messages before new one + { + let mut queue = queue_protected.lock().await; + let flushed = Self::flush(&mut queue, &candidates_sender); + listener.as_ref().map(|l| l.current_queued_candidates.set(queue.len() as f64)); + if !flushed { + info!(target: INDEXER, "Not flushed, so enqueuing candidate data"); + + let timestamp = time::Instant::now(); + queue.extend( + candidates_data + .into_iter() + .map(|el| ExpirableCandidateData { timestamp, inner: el }), + ); + continue; + } + } + { let candidates_len = candidates_data.len(); info!(target: INDEXER, "Found {} candidate(s)", candidates_len); @@ -119,25 +210,49 @@ impl BlockListener { .map(|listener| listener.num_candidates.inc_by(candidates_len as f64)); } - // TODO: try_send instead checking for capacity. Not to stall - let results = join_all( - candidates_data - .into_iter() - .map(|receipt| candidates_sender.send(receipt)), - ) - .await; - results.into_iter().collect::>()?; + let mut iter = candidates_data.into_iter(); + while let Some(candidate) = iter.next() { + match candidates_sender.try_send(candidate) { + Ok(_) => {} + Err(err) => match err { + TrySendError::Full(candidate) => { + let mut queue = queue_protected.lock().await; + + let timestamp = time::Instant::now(); + queue.push_back(ExpirableCandidateData { + timestamp, + inner: candidate, + }); + queue.extend(iter.map(|el| ExpirableCandidateData { timestamp, inner: el })); + listener.as_ref().map(|l| l.current_queued_candidates.set(queue.len() as f64)); + + break; + } + TrySendError::Closed(_) => { + return; + } + }, + } + } } - Ok(()) + let _ = done_sender.send(()); } /// Filters indexer stream and returns receiving channel. - pub(crate) fn run( + pub(crate) fn run(&self, indexer_stream: Receiver) -> (JoinHandle<()>, Receiver) { + let (candidates_sender, candidates_receiver) = mpsc::channel(1000); + let handle = actix::spawn(Self::process_stream(self.clone(), indexer_stream, candidates_sender)); + + (handle, candidates_receiver) + } + + #[cfg(test)] + pub(crate) fn test_run( &self, indexer_stream: Receiver, - ) -> (JoinHandle>, Receiver) { - let (candidates_sender, candidates_receiver) = mpsc::channel(1000); + ) -> (JoinHandle<()>, Receiver) { + let (candidates_sender, candidates_receiver) = mpsc::channel(1); let handle = actix::spawn(Self::process_stream(self.clone(), indexer_stream, candidates_sender)); (handle, candidates_receiver) @@ -155,7 +270,8 @@ impl Metricable for BlockListener { #[cfg(test)] mod tests { - use crate::block_listener::{BlockListener, CandidateData, TransactionWithRollupId}; + use crate::block_listener::{BlockListener, TransactionWithRollupId, EXPIRATION_TIMEOUT}; + use crate::types::CandidateData; use near_crypto::{KeyType, PublicKey, Signature}; use near_indexer::near_primitives::hash::CryptoHash; use near_indexer::near_primitives::types::AccountId; @@ -397,7 +513,7 @@ mod tests { } drop(stream_sender); - handle.await.unwrap().unwrap(); + handle.await.unwrap(); let mut counter = 0; while let Some(_) = candidates_receiver.recv().await { @@ -417,7 +533,7 @@ mod tests { let (stream_sender, stream_receiver) = mpsc::channel(10); let listener = BlockListener::new(HashMap::from([(da_contract_id, rollup_id)])); - let (handle, mut candidates_receiver) = listener.run(stream_receiver); + let (handle, candidates_receiver) = listener.run(stream_receiver); for (i, el) in streamer_messages.empty.into_iter().enumerate() { stream_sender.send(el).await.unwrap(); @@ -430,6 +546,30 @@ mod tests { drop(candidates_receiver); // Sender::closed is triggered - assert!(handle.await.unwrap().is_ok()); + assert!(handle.await.is_ok()); + } + + #[actix::test] + async fn test_expiration() { + let rollup_id = 1; + let da_contract_id = "da.test.near".parse().unwrap(); + + let streamer_messages = StreamerMessagesLoader::load(); + let (stream_sender, stream_receiver) = mpsc::channel(10); + + let listener = BlockListener::new(HashMap::from([(da_contract_id, rollup_id)])); + let (_, mut candidates_receiver) = listener.test_run(stream_receiver); + + for el in streamer_messages.candidates { + stream_sender.send(el).await.unwrap(); + } + + // Let messages expire + tokio::time::sleep(2 * EXPIRATION_TIMEOUT).await; + + // There shall be first message available + assert!(candidates_receiver.try_recv().is_ok(), "Receiver shall have one value"); + // The rest shall be expired + assert_eq!(Err(TryRecvError::Empty), candidates_receiver.try_recv()); } } diff --git a/indexer/src/candidates_validator.rs b/indexer/src/candidates_validator.rs index 10e54f29..c93ccffe 100644 --- a/indexer/src/candidates_validator.rs +++ b/indexer/src/candidates_validator.rs @@ -9,17 +9,15 @@ use tokio::{ use tracing::info; use crate::{ - block_listener::CandidateData, errors::Result, metrics::{make_candidates_validator_metrics, CandidatesListener, Metricable}, rabbit_publisher::RabbitPublisherHandle, rabbit_publisher::{get_routing_key, PublishData, PublishOptions, PublishPayload, PublisherContext}, + types, }; const CANDIDATES_VALIDATOR: &str = "candidates_validator"; -type ProtectedQueue = sync::Arc>>; - #[derive(Clone)] pub(crate) struct CandidatesValidator { view_client: actix::Addr, @@ -36,7 +34,7 @@ impl CandidatesValidator { async fn ticker( mut done: oneshot::Receiver<()>, - queue_protected: ProtectedQueue, + queue_protected: types::ProtectedQueue, mut rmq_handle: RabbitPublisherHandle, view_client: actix::Addr, listener: Option, @@ -60,7 +58,7 @@ impl CandidatesValidator { // Assumes queue is under mutex async fn flush( - queue: &mut VecDeque, + queue: &mut VecDeque, rmq_handle: &mut RabbitPublisherHandle, view_client: &actix::Addr, listener: Option, @@ -97,7 +95,7 @@ impl CandidatesValidator { async fn fetch_execution_outcome( view_client: &actix::Addr, - candidate_data: &CandidateData, + candidate_data: &types::CandidateData, ) -> Result { info!(target: CANDIDATES_VALIDATOR, "Fetching execution outcome for candidate data"); Ok(view_client @@ -115,7 +113,7 @@ impl CandidatesValidator { .unwrap_or(FinalExecutionStatus::NotStarted)) } - async fn send(candidate_data: &CandidateData, rmq_handle: &mut RabbitPublisherHandle) -> Result<()> { + async fn send(candidate_data: &types::CandidateData, rmq_handle: &mut RabbitPublisherHandle) -> Result<()> { // TODO: is sequential order important here? for data in candidate_data.clone().payloads { rmq_handle @@ -140,13 +138,12 @@ impl CandidatesValidator { async fn process_candidates( self, - mut receiver: mpsc::Receiver, + mut receiver: mpsc::Receiver, mut rmq_handle: RabbitPublisherHandle, ) -> Result<()> { let Self { view_client, listener } = self; let queue_protected = sync::Arc::new(Mutex::new(VecDeque::new())); - let (done_sender, done_receiver) = oneshot::channel(); actix::spawn(Self::ticker( done_receiver, @@ -206,7 +203,7 @@ impl CandidatesValidator { } // TODO: JoinHandle or errC - pub(crate) fn run(&self, candidates_receiver: mpsc::Receiver) -> mpsc::Receiver { + pub(crate) fn run(&self, candidates_receiver: mpsc::Receiver) -> mpsc::Receiver { let (sender, receiver) = mpsc::channel(1000); actix::spawn( self.clone() diff --git a/indexer/src/indexer_wrapper.rs b/indexer/src/indexer_wrapper.rs index 3f20e05c..540905f4 100644 --- a/indexer/src/indexer_wrapper.rs +++ b/indexer/src/indexer_wrapper.rs @@ -3,11 +3,7 @@ use prometheus::Registry; use std::collections::HashMap; use tokio::{sync::mpsc::Receiver, task::JoinHandle}; -use crate::{ - block_listener::{BlockListener, CandidateData}, - errors::Result, - metrics::Metricable, -}; +use crate::{block_listener::BlockListener, errors::Result, metrics::Metricable, types}; pub struct IndexerWrapper { indexer: near_indexer::Indexer, @@ -34,7 +30,7 @@ impl IndexerWrapper { self.indexer.client_actors() } - pub fn run(self) -> (JoinHandle>, Receiver) { + pub fn run(self) -> (JoinHandle<()>, Receiver) { let indexer_stream = self.indexer.streamer(); self.block_listener.run(indexer_stream) } diff --git a/indexer/src/main.rs b/indexer/src/main.rs index 130002ff..eb6013b8 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -17,6 +17,7 @@ mod indexer_wrapper; mod metrics; mod metrics_server; mod rabbit_publisher; +mod types; const INDEXER: &str = "indexer"; @@ -59,6 +60,7 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> { } rmq_publisher.run(validated_stream); + // TODO: block_handle wether cancelled or Panics. Can handle Ok::<_, Error>(block_handle.await?) }); @@ -69,7 +71,7 @@ fn run(home_dir: std::path::PathBuf, config: RunConfigArgs) -> Result<()> { // Run until publishing finished system.run()?; - block_res?.map_err(|err| { + block_res.map_err(|err| { error!(target: INDEXER, "Indexer Error: {}", err); err }) diff --git a/indexer/src/metrics.rs b/indexer/src/metrics.rs index 1c17cad4..398e6b3a 100644 --- a/indexer/src/metrics.rs +++ b/indexer/src/metrics.rs @@ -1,7 +1,5 @@ -use prometheus::{ - core::{AtomicF64, GenericCounter}, - Counter, Histogram, HistogramOpts, Opts, Registry, -}; +use prometheus::{core::{AtomicF64, GenericCounter}, Counter, Gauge, Histogram, HistogramOpts, Opts, Registry}; +use prometheus::core::GenericGauge; use crate::errors::Result; @@ -19,6 +17,7 @@ pub struct CandidatesListener { #[derive(Clone)] pub struct BlockEventListener { pub num_candidates: GenericCounter, + pub current_queued_candidates: GenericGauge } #[derive(Clone)] @@ -60,7 +59,13 @@ pub(crate) fn make_block_listener_metrics(registry: Registry) -> Result Result { diff --git a/indexer/src/metrics_server.rs b/indexer/src/metrics_server.rs index ba453338..7923758c 100644 --- a/indexer/src/metrics_server.rs +++ b/indexer/src/metrics_server.rs @@ -99,7 +99,6 @@ impl MetricsServer { ServerState::Shutdown } else { info!(target: METRICS, "Reconnecting server"); - retries += 1; tokio::time::sleep(RECONNECTION_INTERVAL).await; self.reconnect().await diff --git a/indexer/src/types.rs b/indexer/src/types.rs new file mode 100644 index 00000000..7552b95e --- /dev/null +++ b/indexer/src/types.rs @@ -0,0 +1,25 @@ +use std::collections::VecDeque; +use std::fmt::Formatter; +use std::{fmt, sync}; +use tokio::sync::Mutex; + +pub(crate) type ProtectedQueue = sync::Arc>>; + +#[derive(Clone, Debug)] +pub(crate) struct CandidateData { + pub rollup_id: u32, + pub transaction: near_indexer::IndexerTransactionWithOutcome, + pub payloads: Vec>, +} + +impl fmt::Display for CandidateData { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_fmt(format_args!( + "rollup_id: {}, id: {}, signer_id: {}, receiver_id {}", + self.rollup_id, + self.transaction.transaction.hash, + self.transaction.transaction.signer_id, + self.transaction.transaction.receiver_id + )) + } +}