From eb95d49966a42dc55093e6c86625edaeb0b2c3c6 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Wed, 30 Oct 2024 23:51:28 +0530 Subject: [PATCH] fix(torii/core): handle an edge case with pending block processing commit-id:6f679e25 --- crates/torii/core/src/engine.rs | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 95d0ccf418..c7dbb99cee 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -10,9 +10,9 @@ use dojo_world::contracts::world::WorldContractReader; use futures_util::future::{join_all, try_join_all}; use hashlink::LinkedHashMap; use starknet::core::types::{ - BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, MaybePendingBlockWithReceipts, - MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, Transaction, TransactionReceipt, - TransactionWithReceipt, + BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, + MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, + Transaction, TransactionReceipt, TransactionWithReceipt, }; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; @@ -305,23 +305,23 @@ impl Engine

{ // TODO: since we now process blocks in chunks we can parallelize the fetching of data pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result { - let latest_block_number = self.provider.block_hash_and_number().await?.block_number; + let latest_block = self.provider.block_hash_and_number().await?; let from = cursors.head.unwrap_or(0); - let total_remaining_blocks = latest_block_number - from; + let total_remaining_blocks = latest_block.block_number - from; let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size); let to = from + blocks_to_process; let instant = Instant::now(); - let result = if from < latest_block_number { + let result = if from < latest_block.block_number { let from = if from == 0 { from } else { from + 1 }; let data = self.fetch_range(from, to, &cursors.cursor_map).await?; debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range."); FetchDataResult::Range(data) } else if self.config.index_pending { let data = - self.fetch_pending(latest_block_number + 1, cursors.last_pending_block_tx).await?; - debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block_number, "Fetched pending data."); + self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?; + debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data."); if let Some(data) = data { FetchDataResult::Pending(data) } else { @@ -449,12 +449,18 @@ impl Engine

{ async fn fetch_pending( &self, - block_number: u64, + block: BlockHashAndNumber, last_pending_block_tx: Option, ) -> Result> { - let block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) = + let pending_block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) = self.provider.get_block_with_receipts(BlockId::Tag(BlockTag::Pending)).await? { + // if the parent hash is not the hash of the latest block that we fetched, then it means + // a new block got mined just after we fetched the latest block information + if block.block_hash != pending.parent_hash { + return Ok(None); + } + pending } else { // TODO: change this to unreachable once katana is updated to return PendingBlockWithTxs @@ -464,8 +470,8 @@ impl Engine

{ }; Ok(Some(FetchPendingResult { - pending_block: Box::new(block), - block_number, + pending_block: Box::new(pending_block), + block_number: block.block_number + 1, last_pending_block_tx, })) }