Skip to content

Commit

Permalink
feat: non-blocking block_listener + expirable messages (#157)
Browse files Browse the repository at this point in the history
* feat: don't hold indexer send

* feat: message expiration

* refactor: some formatting

* refactor: renaming + test cleaning

* feat: add metrics to display number of messages in queue

* fix: attestor timeout used
  • Loading branch information
taco-paco authored May 17, 2024
1 parent 5dcb6f1 commit a82df89
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 84 deletions.
262 changes: 201 additions & 61 deletions indexer/src/block_listener.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,46 @@
use futures::future::join_all;
use near_indexer::near_primitives::{types::AccountId, views::ActionView};
use near_indexer::StreamerMessage;
use near_indexer::{
near_primitives::{types::AccountId, views::ActionView},
StreamerMessage,
};
use prometheus::Registry;
use std::{
collections::HashMap,
fmt::{self, Formatter},
use std::{collections::HashMap, collections::VecDeque, sync, time::Duration};
use tokio::{
sync::{
mpsc::{self, error::TrySendError, Receiver},
oneshot, Mutex,
},
task::JoinHandle,
time,
};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio::task::JoinHandle;
use tracing::info;

use crate::metrics::{make_block_listener_metrics, BlockEventListener, Metricable};
use crate::{errors::Result, INDEXER};
use crate::{
errors::Result,
metrics::{make_block_listener_metrics, BlockEventListener, Metricable},
types,
types::CandidateData,
INDEXER,
};

#[cfg(not(test))]
const EXPIRATION_TIMEOUT: Duration = Duration::from_secs(30);
#[cfg(test)]
const EXPIRATION_TIMEOUT: Duration = Duration::from_millis(200);

#[derive(Clone, Debug)]
pub(crate) struct CandidateData {
pub rollup_id: u32,
pub transaction: near_indexer::IndexerTransactionWithOutcome,
pub payloads: Vec<Vec<u8>>,
#[derive(Clone)]
struct ExpirableCandidateData {
timestamp: time::Instant,
inner: CandidateData,
}

impl fmt::Display for CandidateData {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!(
"rollup_id: {}, id: {}, signer_id: {}, receiver_id {}",
self.rollup_id,
self.transaction.transaction.hash,
self.transaction.transaction.signer_id,
self.transaction.transaction.receiver_id
))
impl From<ExpirableCandidateData> for CandidateData {
fn from(value: ExpirableCandidateData) -> Self {
value.inner
}
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub(crate) struct TransactionWithRollupId {
struct TransactionWithRollupId {
pub(crate) rollup_id: u32,
pub(crate) transaction: near_indexer::IndexerTransactionWithOutcome,
}
Expand Down Expand Up @@ -75,42 +81,127 @@ impl BlockListener {
}
}

async fn ticker(
mut done: oneshot::Receiver<()>,
queue_protected: types::ProtectedQueue<ExpirableCandidateData>,
candidates_sender: mpsc::Sender<CandidateData>,
listener: Option<BlockEventListener>
) {
#[cfg(not(test))]
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
#[cfg(test)]
const FLUSH_INTERVAL: Duration = Duration::from_millis(100);

let mut interval = time::interval(FLUSH_INTERVAL);
loop {
tokio::select! {
_ = interval.tick() => {
let mut queue = queue_protected.lock().await;
let _ = Self::flush(&mut queue, &candidates_sender);
listener.as_ref().map(|l| l.current_queued_candidates.set(queue.len() as f64));

interval.reset();
},
_ = &mut done => {
return
}
}
}
}

fn flush(queue: &mut VecDeque<ExpirableCandidateData>, candidates_sender: &mpsc::Sender<CandidateData>) -> bool {
if queue.is_empty() {
return true;
}

info!(target: INDEXER, "Flushing");

let now = time::Instant::now();
while let Some(candidate) = queue.front() {
if now.duration_since(candidate.timestamp) >= EXPIRATION_TIMEOUT {
queue.pop_front();
continue;
}

match candidates_sender.try_send(candidate.clone().into()) {
Ok(_) => {
let _ = queue.pop_front();
}
// TODO: return TrySendError instead
Err(_) => return false,
}
}

true
}

fn extract_candidates(addresses_to_rollup_ids: &HashMap<AccountId, u32>, streamer_message: StreamerMessage) -> Vec<CandidateData> {
streamer_message
.shards
.into_iter()
.flat_map(|shard| shard.chunk)
.flat_map(|chunk| {
chunk.transactions.into_iter().filter_map(|transaction| {
addresses_to_rollup_ids
.get(&transaction.transaction.receiver_id)
.map(|rollup_id| TransactionWithRollupId {
rollup_id: *rollup_id,
transaction,
})
})
})
.filter_map(Self::transaction_filter_map)
.collect()
}

// TODO: introduce Task struct
async fn process_stream(
self,
mut indexer_stream: Receiver<StreamerMessage>,
candidates_sender: mpsc::Sender<CandidateData>,
) -> Result<()> {
) {
let Self {
addresses_to_rollup_ids,
listener,
} = self;

let queue_protected = sync::Arc::new(Mutex::new(VecDeque::new()));
let (done_sender, done_receiver) = oneshot::channel();
actix::spawn(Self::ticker(
done_receiver,
queue_protected.clone(),
candidates_sender.clone(),
listener.clone(),
));

while let Some(streamer_message) = indexer_stream.recv().await {
info!(target: INDEXER, "Received streamer message");

// TODO: check receipt_receiver is closed?
let candidates_data: Vec<CandidateData> = streamer_message
.shards
.into_iter()
.flat_map(|shard| shard.chunk)
.flat_map(|chunk| {
chunk.transactions.into_iter().filter_map(|transaction| {
addresses_to_rollup_ids
.get(&transaction.transaction.receiver_id)
.map(|rollup_id| TransactionWithRollupId {
rollup_id: *rollup_id,
transaction,
})
})
})
.filter_map(Self::transaction_filter_map)
.collect();

let candidates_data = Self::extract_candidates(&addresses_to_rollup_ids, streamer_message);
if candidates_data.is_empty() {
info!(target: INDEXER, "No candidate data found in the streamer message");
continue;
}

// TODO: attempt flushing even if no new candidates or not?
// Flushing old messages before new one
{
let mut queue = queue_protected.lock().await;
let flushed = Self::flush(&mut queue, &candidates_sender);
listener.as_ref().map(|l| l.current_queued_candidates.set(queue.len() as f64));
if !flushed {
info!(target: INDEXER, "Not flushed, so enqueuing candidate data");

let timestamp = time::Instant::now();
queue.extend(
candidates_data
.into_iter()
.map(|el| ExpirableCandidateData { timestamp, inner: el }),
);
continue;
}
}

{
let candidates_len = candidates_data.len();
info!(target: INDEXER, "Found {} candidate(s)", candidates_len);
Expand All @@ -119,25 +210,49 @@ impl BlockListener {
.map(|listener| listener.num_candidates.inc_by(candidates_len as f64));
}

// TODO: try_send instead checking for capacity. Not to stall
let results = join_all(
candidates_data
.into_iter()
.map(|receipt| candidates_sender.send(receipt)),
)
.await;
results.into_iter().collect::<Result<_, _>>()?;
let mut iter = candidates_data.into_iter();
while let Some(candidate) = iter.next() {
match candidates_sender.try_send(candidate) {
Ok(_) => {}
Err(err) => match err {
TrySendError::Full(candidate) => {
let mut queue = queue_protected.lock().await;

let timestamp = time::Instant::now();
queue.push_back(ExpirableCandidateData {
timestamp,
inner: candidate,
});
queue.extend(iter.map(|el| ExpirableCandidateData { timestamp, inner: el }));
listener.as_ref().map(|l| l.current_queued_candidates.set(queue.len() as f64));

break;
}
TrySendError::Closed(_) => {
return;
}
},
}
}
}

Ok(())
let _ = done_sender.send(());
}

/// Filters indexer stream and returns receiving channel.
pub(crate) fn run(
pub(crate) fn run(&self, indexer_stream: Receiver<StreamerMessage>) -> (JoinHandle<()>, Receiver<CandidateData>) {
let (candidates_sender, candidates_receiver) = mpsc::channel(1000);
let handle = actix::spawn(Self::process_stream(self.clone(), indexer_stream, candidates_sender));

(handle, candidates_receiver)
}

#[cfg(test)]
pub(crate) fn test_run(
&self,
indexer_stream: Receiver<StreamerMessage>,
) -> (JoinHandle<Result<()>>, Receiver<CandidateData>) {
let (candidates_sender, candidates_receiver) = mpsc::channel(1000);
) -> (JoinHandle<()>, Receiver<CandidateData>) {
let (candidates_sender, candidates_receiver) = mpsc::channel(1);
let handle = actix::spawn(Self::process_stream(self.clone(), indexer_stream, candidates_sender));

(handle, candidates_receiver)
Expand All @@ -155,7 +270,8 @@ impl Metricable for BlockListener {

#[cfg(test)]
mod tests {
use crate::block_listener::{BlockListener, CandidateData, TransactionWithRollupId};
use crate::block_listener::{BlockListener, TransactionWithRollupId, EXPIRATION_TIMEOUT};
use crate::types::CandidateData;
use near_crypto::{KeyType, PublicKey, Signature};
use near_indexer::near_primitives::hash::CryptoHash;
use near_indexer::near_primitives::types::AccountId;
Expand Down Expand Up @@ -397,7 +513,7 @@ mod tests {
}

drop(stream_sender);
handle.await.unwrap().unwrap();
handle.await.unwrap();

let mut counter = 0;
while let Some(_) = candidates_receiver.recv().await {
Expand All @@ -417,7 +533,7 @@ mod tests {
let (stream_sender, stream_receiver) = mpsc::channel(10);

let listener = BlockListener::new(HashMap::from([(da_contract_id, rollup_id)]));
let (handle, mut candidates_receiver) = listener.run(stream_receiver);
let (handle, candidates_receiver) = listener.run(stream_receiver);

for (i, el) in streamer_messages.empty.into_iter().enumerate() {
stream_sender.send(el).await.unwrap();
Expand All @@ -430,6 +546,30 @@ mod tests {

drop(candidates_receiver);
// Sender::closed is triggered
assert!(handle.await.unwrap().is_ok());
assert!(handle.await.is_ok());
}

#[actix::test]
async fn test_expiration() {
let rollup_id = 1;
let da_contract_id = "da.test.near".parse().unwrap();

let streamer_messages = StreamerMessagesLoader::load();
let (stream_sender, stream_receiver) = mpsc::channel(10);

let listener = BlockListener::new(HashMap::from([(da_contract_id, rollup_id)]));
let (_, mut candidates_receiver) = listener.test_run(stream_receiver);

for el in streamer_messages.candidates {
stream_sender.send(el).await.unwrap();
}

// Let messages expire
tokio::time::sleep(2 * EXPIRATION_TIMEOUT).await;

// There shall be first message available
assert!(candidates_receiver.try_recv().is_ok(), "Receiver shall have one value");
// The rest shall be expired
assert_eq!(Err(TryRecvError::Empty), candidates_receiver.try_recv());
}
}
Loading

0 comments on commit a82df89

Please sign in to comment.