diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 3d6d748025..9e776f3bfc 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -114,6 +114,10 @@ struct Args { #[arg(long, default_value = "1024")] events_chunk_size: u64, + /// Number of blocks to process before commiting to DB + #[arg(long, default_value = "10240")] + blocks_chunk_size: u64, + /// Enable indexing pending blocks #[arg(long, action = ArgAction::Set, default_value_t = true)] index_pending: bool, @@ -239,6 +243,7 @@ async fn main() -> anyhow::Result<()> { EngineConfig { max_concurrent_tasks: args.max_concurrent_tasks, start_block: 0, + blocks_chunk_size: args.blocks_chunk_size, events_chunk_size: args.events_chunk_size, index_pending: args.index_pending, polling_interval: Duration::from_millis(args.polling_interval), diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index eddb41594a..cb816fe6dc 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -131,6 +131,7 @@ bitflags! { pub struct EngineConfig { pub polling_interval: Duration, pub start_block: u64, + pub blocks_chunk_size: u64, pub events_chunk_size: u64, pub index_pending: bool, pub max_concurrent_tasks: usize, @@ -142,6 +143,7 @@ impl Default for EngineConfig { Self { polling_interval: Duration::from_millis(500), start_block: 0, + blocks_chunk_size: 10240, events_chunk_size: 1024, index_pending: true, max_concurrent_tasks: 100, @@ -290,15 +292,20 @@ impl Engine

{ } } + // TODO: since we now process blocks in chunks we can parallelize the fetching of data pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result { let latest_block_number = self.provider.block_hash_and_number().await?.block_number; + let from = cursors.head.unwrap_or(0); + let total_remaining_blocks = latest_block_number - from; + let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size); + let to = from + blocks_to_process; let instant = Instant::now(); let result = if from < latest_block_number { let from = if from == 0 { from } else { from + 1 }; - let data = self.fetch_range(from, latest_block_number, &cursors.cursor_map).await?; - debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %latest_block_number, "Fetched data for range."); + let data = self.fetch_range(from, to, &cursors.cursor_map).await?; + debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range."); FetchDataResult::Range(data) } else if self.config.index_pending { let data =