diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 1da27038f7..d397d71d07 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -5,9 +5,8 @@ use std::time::Duration; use anyhow::Result; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{ - BlockId, BlockTag, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes, - MaybePendingBlockWithTxs, ReceiptBlock, Transaction, TransactionReceipt, - TransactionReceiptWithBlockInfo, + BlockId, BlockTag, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes, Transaction, + TransactionReceipt, TransactionReceiptWithBlockInfo, }; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; @@ -102,7 +101,7 @@ impl Engine

{ break Ok(()); } _ = async { - match self.sync_to_head(head, pending_block_tx).await { + match self.sync(head, pending_block_tx).await { Ok((latest_block_number, latest_pending_tx)) => { if erroring_out { erroring_out = false; @@ -128,105 +127,24 @@ impl Engine

{ } } - pub async fn sync_to_head( + pub async fn sync( &mut self, from: u64, - mut pending_block_tx: Option, + pending_block_tx: Option, ) -> Result<(u64, Option)> { + // Process all blocks from current to head. + // If the index pending flag is set, we will process pending events as well. let latest_block_number = self.provider.block_hash_and_number().await?.block_number; - if from < latest_block_number { - // if `from` == 0, then the block may or may not be processed yet. - let from = if from == 0 { from } else { from + 1 }; - pending_block_tx = self.sync_range(from, latest_block_number, pending_block_tx).await?; - } else if self.config.index_pending { - pending_block_tx = self.sync_pending(latest_block_number + 1, pending_block_tx).await?; - } - - Ok((latest_block_number, pending_block_tx)) - } - - pub async fn sync_pending( - &mut self, - block_number: u64, - mut pending_block_tx: Option, - ) -> Result> { - let block = if let MaybePendingBlockWithTxs::PendingBlock(pending) = - self.provider.get_block_with_txs(BlockId::Tag(BlockTag::Pending)).await? - { - pending - } else { - return Ok(None); - }; - - // Skip transactions that have been processed already - // Our cursor is the last processed transaction - let mut pending_block_tx_cursor = pending_block_tx; - for transaction in block.transactions { - if let Some(tx) = pending_block_tx_cursor { - if transaction.transaction_hash() != &tx { - continue; - } - - pending_block_tx_cursor = None; - continue; - } - - match self - .process_transaction_and_receipt( - *transaction.transaction_hash(), - &transaction, - block_number, - block.timestamp, - ) - .await - { - Err(e) => { - match e.to_string().as_str() { - "TransactionHashNotFound" => { - // We failed to fetch the transaction, which is because - // the transaction might not have been processed fast enough by the - // provider. So we can fail silently and try - // again in the next iteration. - warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Retrieving pending transaction receipt."); - return Ok(pending_block_tx); - } - _ => { - error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processing pending transaction."); - return Err(e); - } - } - } - Ok(true) => { - pending_block_tx = Some(*transaction.transaction_hash()); - info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending world transaction."); - } - Ok(_) => { - info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending transaction.") - } - } - } - - // Set the head to the last processed pending transaction - // Head block number should still be latest block number - self.db.set_head(block_number - 1, pending_block_tx); - - self.db.execute().await?; - Ok(pending_block_tx) - } - - pub async fn sync_range( - &mut self, - from: u64, - to: u64, - pending_block_tx: Option, - ) -> Result> { - // Process all blocks from current to latest. let get_events = |token: Option| { self.provider.get_events( EventFilter { from_block: Some(BlockId::Number(from)), - to_block: Some(BlockId::Number(to)), + to_block: Some(if self.config.index_pending { + BlockId::Tag(BlockTag::Pending) + } else { + BlockId::Number(latest_block_number) + }), address: Some(self.world.address), keys: None, }, @@ -254,32 +172,20 @@ impl Engine

{ for event in &events_page.events { let block_number = match event.block_number { Some(block_number) => block_number, - // If the block number is not present, try to fetch it from the transaction - // receipt Should not/rarely happen. Thus the additional - // fetch is acceptable. - None => { - let TransactionReceiptWithBlockInfo { receipt, block } = - self.provider.get_transaction_receipt(event.transaction_hash).await?; - - match receipt { - TransactionReceipt::Invoke(_) | TransactionReceipt::L1Handler(_) => { - if let ReceiptBlock::Block { block_number, .. } = block { - block_number - } else { - // If the block is pending, we assume the block number is the - // latest + 1 - to + 1 - } - } - - _ => to + 1, - } - } + // If the block number is not present, we assume we're dealing + // with a pending block event. Thus the block number is the latest + 1 + None => latest_block_number + 1, }; // Keep track of last block number and fetch block timestamp if block_number > last_block { - let block_timestamp = self.get_block_timestamp(block_number).await?; + let block_timestamp = self + .get_block_timestamp(if block_number > latest_block_number { + BlockId::Tag(BlockTag::Pending) + } else { + BlockId::Number(block_number) + }) + .await?; blocks.insert(block_number, block_timestamp); last_block = block_number; @@ -337,6 +243,8 @@ impl Engine

{ self.process_block(block_number, blocks[&block_number]).await?; last_block = block_number; } + + pending_block_tx_cursor = Some(transaction_hash); } // We return None for the pending_block_tx because our sync_range @@ -345,15 +253,15 @@ impl Engine

{ // so once the sync range is done, we assume all of the tx of the block // have been processed. - self.db.set_head(to, None); + self.db.set_head(latest_block_number, pending_block_tx_cursor); self.db.execute().await?; - Ok(None) + Ok((latest_block_number, pending_block_tx_cursor)) } - async fn get_block_timestamp(&self, block_number: u64) -> Result { - match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { + async fn get_block_timestamp(&self, block_id: BlockId) -> Result { + match self.provider.get_block_with_tx_hashes(block_id).await? { MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), } @@ -367,7 +275,7 @@ impl Engine

{ transaction: &Transaction, block_number: u64, block_timestamp: u64, - ) -> Result { + ) -> Result<()> { let receipt = self.provider.get_transaction_receipt(transaction_hash).await?; let events = match &receipt.receipt { TransactionReceipt::Invoke(receipt) => Some(&receipt.events), @@ -375,8 +283,8 @@ impl Engine

{ _ => None, }; - let mut world_event = false; if let Some(events) = events { + let mut world_event = false; for (event_idx, event) in events.iter().enumerate() { if event.from_address != self.world.address { continue; @@ -410,7 +318,7 @@ impl Engine

{ } } - Ok(world_event) + Ok(()) } async fn process_block(&mut self, block_number: u64, block_timestamp: u64) -> Result<()> { diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index e6db7f5d3a..400d8feb28 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -50,7 +50,7 @@ where None, ); - let _ = engine.sync_to_head(0, None).await?; + let _ = engine.sync(0, None).await?; Ok(engine) } diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index 6dacff739f..96092a752f 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -364,7 +364,7 @@ pub async fn spinup_types_test() -> Result { None, ); - let _ = engine.sync_to_head(0, None).await?; + let _ = engine.sync(0, None).await?; Ok(pool) } diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index 96503034cb..c44b78134b 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -110,7 +110,7 @@ async fn test_entities_queries() { None, ); - let _ = engine.sync_to_head(0, None).await.unwrap(); + let _ = engine.sync(0, None).await.unwrap(); let (_, receiver) = tokio::sync::mpsc::channel(1); let grpc = DojoWorld::new(db.pool, receiver, strat.world_address, provider.clone());