Skip to content

Commit

Permalink
refactor(iota-indexer): Do not panic on errors in iota_indexer::frame…
Browse files Browse the repository at this point in the history
…work::runner::run
  • Loading branch information
tomxey committed Aug 14, 2024
1 parent 77de6db commit b79d712
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 19 deletions.
4 changes: 2 additions & 2 deletions crates/iota-analytics-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ async fn main() -> Result<(), AnalyticsIndexerError> {
.rest_url(&rest_url)
.handler(processor)
.run()
.await;
Ok(())
.await
.map_err(|e| AnalyticsIndexerError::GenericError(e.to_string()))
}
6 changes: 4 additions & 2 deletions crates/iota-indexer/src/framework/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use iota_types::messages_checkpoint::CheckpointSequenceNumber;
use super::{fetcher::CheckpointFetcher, Handler};
use crate::metrics::IndexerMetrics;

use anyhow::Result;

pub struct IndexerBuilder {
rest_url: Option<String>,
handlers: Vec<Box<dyn Handler>>,
Expand Down Expand Up @@ -52,7 +54,7 @@ impl IndexerBuilder {
self
}

pub async fn run(self) {
pub async fn run(self) -> Result<()> {
let (downloaded_checkpoint_data_sender, downloaded_checkpoint_data_receiver) =
mysten_metrics::metered_channel::channel(
self.checkpoint_buffer_size,
Expand Down Expand Up @@ -82,6 +84,6 @@ impl IndexerBuilder {
self.handlers,
self.metrics.clone(),
)
.await;
.await
}
}
41 changes: 29 additions & 12 deletions crates/iota-indexer/src/framework/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@

use super::{fetcher::CheckpointDownloadData, interface::Handler};
use crate::metrics::IndexerMetrics;
use anyhow::Result;
use iota_rest_api::CheckpointData;

// Limit indexing parallelism on big checkpoints to avoid OOM,
// by limiting the total size of batch checkpoints to ~50MB.
// On testnet, most checkpoints are < 200KB, some can go up to 50MB.
const CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT: usize = 50000000;
const CHECKPOINT_PROCESSING_BATCH_SIZE: usize = 100;

pub async fn run<S>(stream: S, mut handlers: Vec<Box<dyn Handler>>, metrics: IndexerMetrics)
pub async fn run<S>(
stream: S,
mut handlers: Vec<Box<dyn Handler>>,
metrics: IndexerMetrics,
) -> Result<()>
where
S: futures::Stream<Item = CheckpointDownloadData> + std::marker::Unpin,
{
Expand All @@ -31,24 +37,35 @@ where
cp_batch_total_size += checkpoint.size;
cp_batch.push(checkpoint.data.clone());
if cp_batch_total_size >= CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT {
futures::future::join_all(handlers.iter_mut().map(|handler| async {
handler.process_checkpoints(&cp_batch).await.unwrap()
}))
.await;

call_handlers_on_checkpoints_batch(&mut handlers, &cp_batch).await?;
metrics.indexing_batch_size.set(cp_batch_total_size as i64);
cp_batch = vec![];
cp_batch_total_size = 0;
}
}
if !cp_batch.is_empty() {
futures::future::join_all(
handlers
.iter_mut()
.map(|handler| async { handler.process_checkpoints(&cp_batch).await.unwrap() }),
)
.await;
call_handlers_on_checkpoints_batch(&mut handlers, &cp_batch).await?;
metrics.indexing_batch_size.set(cp_batch_total_size as i64);
}
}

Ok(())
}

async fn call_handlers_on_checkpoints_batch(
handlers: &mut Vec<Box<dyn Handler>>,
cp_batch: &Vec<CheckpointData>,
) -> Result<()> {
use futures::StreamExt;

let mut handlers_futures: futures::stream::FuturesUnordered<_> = handlers
.iter_mut()
.map(|handler| async { handler.process_checkpoints(&cp_batch).await })
.collect();

while let Some(handler_result) = handlers_futures.next().await {
handler_result?;
}

Ok(())
}
5 changes: 2 additions & 3 deletions crates/iota-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ impl Indexer {
vec![Box::new(checkpoint_handler)],
metrics,
)
.await;

Ok(())
.await
.map_err(|e| IndexerError::UncategorizedError(e))
}

pub async fn start_reader(
Expand Down

0 comments on commit b79d712

Please sign in to comment.