Skip to content

Commit

Permalink
feat(torii): limit number of blocks processed in one go
Browse files Browse the repository at this point in the history
  • Loading branch information
lambda-0x committed Oct 8, 2024
1 parent 20d1065 commit b17d0fd
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
5 changes: 5 additions & 0 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ struct Args {
#[arg(long, default_value = "1024")]
events_chunk_size: u64,

/// Number of blocks to process before commiting to DB
#[arg(long, default_value = "10240")]
blocks_chunk_size: u64,

Check warning on line 119 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L119

Added line #L119 was not covered by tests

/// Enable indexing pending blocks
#[arg(long, action = ArgAction::Set, default_value_t = true)]
index_pending: bool,
Expand Down Expand Up @@ -239,6 +243,7 @@ async fn main() -> anyhow::Result<()> {
EngineConfig {
max_concurrent_tasks: args.max_concurrent_tasks,
start_block: 0,
blocks_chunk_size: args.blocks_chunk_size,

Check warning on line 246 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L246

Added line #L246 was not covered by tests
events_chunk_size: args.events_chunk_size,
index_pending: args.index_pending,
polling_interval: Duration::from_millis(args.polling_interval),
Expand Down
11 changes: 9 additions & 2 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ bitflags! {
pub struct EngineConfig {
pub polling_interval: Duration,
pub start_block: u64,
pub blocks_chunk_size: u64,
pub events_chunk_size: u64,
pub index_pending: bool,
pub max_concurrent_tasks: usize,
Expand All @@ -142,6 +143,7 @@ impl Default for EngineConfig {
Self {
polling_interval: Duration::from_millis(500),
start_block: 0,
blocks_chunk_size: 10240,
events_chunk_size: 1024,
index_pending: true,
max_concurrent_tasks: 100,
Expand Down Expand Up @@ -290,15 +292,20 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}
}

// TODO: since we now process blocks in chunks we can parallelize the fetching of data
pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
let latest_block_number = self.provider.block_hash_and_number().await?.block_number;

let from = cursors.head.unwrap_or(0);
let total_remaining_blocks = latest_block_number - from;
let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size);
let to = from + blocks_to_process;

Check warning on line 302 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L300-L302

Added lines #L300 - L302 were not covered by tests

let instant = Instant::now();
let result = if from < latest_block_number {
let from = if from == 0 { from } else { from + 1 };
let data = self.fetch_range(from, latest_block_number, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %latest_block_number, "Fetched data for range.");
let data = self.fetch_range(from, to, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range.");

Check warning on line 308 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L307-L308

Added lines #L307 - L308 were not covered by tests
FetchDataResult::Range(data)
} else if self.config.index_pending {
let data =
Expand Down

0 comments on commit b17d0fd

Please sign in to comment.