Skip to content

Commit

Permalink
fix(torii/core): handle an edge case with pending block processing
Browse files Browse the repository at this point in the history
commit-id:6f679e25
  • Loading branch information
lambda-0x committed Nov 13, 2024
1 parent 4c5c3c1 commit eb95d49
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::{join_all, try_join_all};
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, MaybePendingBlockWithReceipts,
MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, Transaction, TransactionReceipt,
TransactionWithReceipt,
BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage,
MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, PendingBlockWithReceipts,
Transaction, TransactionReceipt, TransactionWithReceipt,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
Expand Down Expand Up @@ -305,23 +305,23 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

// TODO: since we now process blocks in chunks we can parallelize the fetching of data
pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
let latest_block_number = self.provider.block_hash_and_number().await?.block_number;
let latest_block = self.provider.block_hash_and_number().await?;

let from = cursors.head.unwrap_or(0);
let total_remaining_blocks = latest_block_number - from;
let total_remaining_blocks = latest_block.block_number - from;
let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size);
let to = from + blocks_to_process;

let instant = Instant::now();
let result = if from < latest_block_number {
let result = if from < latest_block.block_number {
let from = if from == 0 { from } else { from + 1 };
let data = self.fetch_range(from, to, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range.");
FetchDataResult::Range(data)
} else if self.config.index_pending {
let data =
self.fetch_pending(latest_block_number + 1, cursors.last_pending_block_tx).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block_number, "Fetched pending data.");
self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data.");
if let Some(data) = data {
FetchDataResult::Pending(data)
} else {
Expand Down Expand Up @@ -449,12 +449,18 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

async fn fetch_pending(
&self,
block_number: u64,
block: BlockHashAndNumber,
last_pending_block_tx: Option<Felt>,
) -> Result<Option<FetchPendingResult>> {
let block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) =
let pending_block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) =
self.provider.get_block_with_receipts(BlockId::Tag(BlockTag::Pending)).await?
{
// if the parent hash is not the hash of the latest block that we fetched, then it means
// a new block got mined just after we fetched the latest block information
if block.block_hash != pending.parent_hash {
return Ok(None);
}

pending
} else {
// TODO: change this to unreachable once katana is updated to return PendingBlockWithTxs
Expand All @@ -464,8 +470,8 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
};

Ok(Some(FetchPendingResult {
pending_block: Box::new(block),
block_number,
pending_block: Box::new(pending_block),
block_number: block.block_number + 1,
last_pending_block_tx,
}))
}
Expand Down

0 comments on commit eb95d49

Please sign in to comment.