From 975f3e4334707df176debc59391e2c37a94e1da4 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Tue, 8 Oct 2024 16:03:21 +0530 Subject: [PATCH] feat(torii): limit number of blocks processed in one go --- bin/torii/src/main.rs | 5 +++++ crates/torii/core/src/engine.rs | 11 +++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 3d6d748025..9e776f3bfc 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -114,6 +114,10 @@ struct Args { #[arg(long, default_value = "1024")] events_chunk_size: u64, + /// Number of blocks to process before commiting to DB + #[arg(long, default_value = "10240")] + blocks_chunk_size: u64, + /// Enable indexing pending blocks #[arg(long, action = ArgAction::Set, default_value_t = true)] index_pending: bool, @@ -239,6 +243,7 @@ async fn main() -> anyhow::Result<()> { EngineConfig { max_concurrent_tasks: args.max_concurrent_tasks, start_block: 0, + blocks_chunk_size: args.blocks_chunk_size, events_chunk_size: args.events_chunk_size, index_pending: args.index_pending, polling_interval: Duration::from_millis(args.polling_interval), diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 6fe09063db..b8f398f135 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -131,6 +131,7 @@ bitflags! { pub struct EngineConfig { pub polling_interval: Duration, pub start_block: u64, + pub blocks_chunk_size: u64, pub events_chunk_size: u64, pub index_pending: bool, pub max_concurrent_tasks: usize, @@ -142,6 +143,7 @@ impl Default for EngineConfig { Self { polling_interval: Duration::from_millis(500), start_block: 0, + blocks_chunk_size: 10240, events_chunk_size: 1024, index_pending: true, max_concurrent_tasks: 100, @@ -286,15 +288,20 @@ impl Engine

{ } } + // TODO: since we now process blocks in chunks we can parallelize the fetching of data pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result { let latest_block_number = self.provider.block_hash_and_number().await?.block_number; + let from = cursors.head.unwrap_or(0); + let total_remaining_blocks = latest_block_number - from; + let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size); + let to = from + blocks_to_process; let instant = Instant::now(); let result = if from < latest_block_number { let from = if from == 0 { from } else { from + 1 }; - let data = self.fetch_range(from, latest_block_number, &cursors.cursor_map).await?; - debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %latest_block_number, "Fetched data for range."); + let data = self.fetch_range(from, to, &cursors.cursor_map).await?; + debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range."); FetchDataResult::Range(data) } else if self.config.index_pending { let data =