diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 210fce2b60..a49b9c1220 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,10 +1,11 @@ +use std::collections::HashMap; use std::time::Duration; use anyhow::Result; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{ - BlockId, EmittedEvent, Event, EventFilter, MaybePendingBlockWithTxHashes, - MaybePendingTransactionReceipt, Transaction, TransactionReceipt, + BlockId, BlockTag, Event, EventFilter, MaybePendingBlockWithTxHashes, MaybePendingBlockWithTxs, + MaybePendingTransactionReceipt, PendingTransactionReceipt, Transaction, TransactionReceipt, }; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; @@ -73,7 +74,7 @@ impl Engine

{ } pub async fn start(&mut self) -> Result<()> { - let mut head = self.db.head().await?; + let (mut head, mut pending_block_tx) = self.db.head().await?; if head == 0 { head = self.config.start_block; } else if self.config.start_block != 0 { @@ -91,13 +92,14 @@ impl Engine

{ break Ok(()); } _ = async { - match self.sync_to_head(head).await { - Ok(latest_block_number) => { + match self.sync_to_head(head, pending_block_tx).await { + Ok((latest_block_number, latest_pending_tx)) => { + pending_block_tx = latest_pending_tx; head = latest_block_number; backoff_delay = Duration::from_secs(1); } Err(e) => { - error!(target: LOG_TARGET, error = %e, "Getting block."); + error!(target: LOG_TARGET, error = %e, "Syncing to head."); sleep(backoff_delay).await; if backoff_delay < max_backoff_delay { backoff_delay *= 2; @@ -110,19 +112,97 @@ impl Engine

{ } } - pub async fn sync_to_head(&mut self, from: u64) -> Result { + pub async fn sync_to_head( + &mut self, + from: u64, + mut pending_block_tx: Option, + ) -> Result<(u64, Option)> { let latest_block_number = self.provider.block_hash_and_number().await?.block_number; if from < latest_block_number { // if `from` == 0, then the block may or may not be processed yet. let from = if from == 0 { from } else { from + 1 }; - self.sync_range(from, latest_block_number).await?; + pending_block_tx = self.sync_range(from, latest_block_number, pending_block_tx).await?; + } else { + pending_block_tx = self.sync_pending(latest_block_number + 1, pending_block_tx).await?; + } + + Ok((latest_block_number, pending_block_tx)) + } + + pub async fn sync_pending( + &mut self, + block_number: u64, + mut pending_block_tx: Option, + ) -> Result> { + let block = if let MaybePendingBlockWithTxs::PendingBlock(pending) = + self.provider.get_block_with_txs(BlockId::Tag(BlockTag::Pending)).await? + { + pending + } else { + return Ok(None); }; - Ok(latest_block_number) + // Skip transactions that have been processed already + // Our cursor is the last processed transaction + let mut pending_block_tx_cursor = pending_block_tx; + for transaction in block.transactions { + if let Some(tx) = pending_block_tx_cursor { + if transaction.transaction_hash() != &tx { + continue; + } + + pending_block_tx_cursor = None; + continue; + } + + match self + .process_transaction_and_receipt( + *transaction.transaction_hash(), + &transaction, + block_number, + block.timestamp, + ) + .await + { + Err(e) => { + match e.to_string().as_str() { + "TransactionHashNotFound" => { + warn!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processing pending transaction."); + // We failed to fetch the transaction, which might be due to us indexing + // the pending transaction too fast. We will + // fail silently and retry processing the transaction in the next + // iteration. + return Ok(pending_block_tx); + } + _ => { + error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processing pending transaction."); + return Err(e); + } + } + } + Ok(_) => { + info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending transaction.") + } + } + + pending_block_tx = Some(*transaction.transaction_hash()); + } + + // Set the head to the last processed pending transaction + // Head block number should still be latest block number + self.db.set_head(block_number - 1, pending_block_tx); + + self.db.execute().await?; + Ok(pending_block_tx) } - pub async fn sync_range(&mut self, from: u64, to: u64) -> Result<()> { + pub async fn sync_range( + &mut self, + from: u64, + to: u64, + mut pending_block_tx: Option, + ) -> Result> { // Process all blocks from current to latest. let get_events = |token: Option| { self.provider.get_events( @@ -144,70 +224,87 @@ impl Engine

{ events_pages.push(get_events(Some(token.clone())).await?); } - let mut last_block: u64 = 0; - let mut last_transaction_hash: FieldElement = FieldElement::ZERO; - for events_page in events_pages { - for event in events_page.events { - self.process(event, &mut last_block, &mut last_transaction_hash).await?; - } - } + // Transactions & blocks to process + let mut last_block = 0_u64; + let mut blocks = HashMap::new(); + + // Flatten events pages and events according to the pending block cursor + // to array of (block_number, transaction_hash) + let mut transactions = vec![]; + for events_page in &events_pages { + for event in &events_page.events { + let block_number = match event.block_number { + Some(block_number) => block_number, + None => return Err(anyhow::anyhow!("Event without block number.")), + }; - self.db.execute().await?; + // Keep track of last block number and fetch block timestamp + if block_number > last_block { + let block_timestamp = self.get_block_timestamp(block_number).await?; + blocks.insert(block_number, block_timestamp); - Ok(()) - } + last_block = block_number; + } - async fn get_block_timestamp(&self, block_number: u64) -> Result { - match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { - MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), - MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), - } - } + // Then we skip all transactions until we reach the last pending processed + // transaction (if any) + if let Some(tx) = pending_block_tx { + if event.transaction_hash != tx { + continue; + } - async fn process( - &mut self, - event: EmittedEvent, - last_block: &mut u64, - last_transaction_hash: &mut FieldElement, - ) -> Result<()> { - let block_number = match event.block_number { - Some(block_number) => block_number, - None => { - error!(target: LOG_TARGET, "Event without block number."); - return Ok(()); - } - }; - let block_timestamp = self.get_block_timestamp(block_number).await?; + // Then we skip that processed transaction + pending_block_tx = None; + continue; + } - if block_number > *last_block { - *last_block = block_number; + if let Some((_, last_tx_hash)) = transactions.last() { + // Dedup transactions + // As me might have multiple events for the same transaction + if *last_tx_hash == event.transaction_hash { + continue; + } + } + transactions.push((block_number, event.transaction_hash)); + } + } + // Process blocks + for (block_number, block_timestamp) in blocks.iter() { if let Some(ref block_tx) = self.block_tx { - block_tx.send(block_number).await?; + block_tx.send(*block_number).await?; } - Self::process_block(self, block_number, block_timestamp, event.block_hash.unwrap()) - .await?; + self.process_block(*block_number, *block_timestamp).await?; info!(target: LOG_TARGET, block_number = %block_number, "Processed block."); - - self.db.set_head(block_number); } - // We index transaction only once for all events in the same transaction - // Events are indexed with the transaction processing - if event.transaction_hash != *last_transaction_hash { - *last_transaction_hash = event.transaction_hash; - let transaction = self.provider.get_transaction_by_hash(event.transaction_hash).await?; + // Process all transactions + for (block_number, transaction_hash) in transactions { + // Process transaction + let transaction = self.provider.get_transaction_by_hash(transaction_hash).await?; + self.process_transaction_and_receipt( - event.transaction_hash, + transaction_hash, &transaction, block_number, - block_timestamp, + blocks[&block_number], ) .await?; } - Ok(()) + self.db.set_head(to, pending_block_tx); + + self.db.execute().await?; + + Ok(pending_block_tx) + } + + async fn get_block_timestamp(&self, block_number: u64) -> Result { + match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { + MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), + MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), + } } async fn process_transaction_and_receipt( @@ -217,29 +314,24 @@ impl Engine

{ block_number: u64, block_timestamp: u64, ) -> Result<()> { - let receipt = match self.provider.get_transaction_receipt(transaction_hash).await { - Ok(receipt) => match receipt { - MaybePendingTransactionReceipt::Receipt(TransactionReceipt::Invoke(receipt)) => { - Some(TransactionReceipt::Invoke(receipt)) - } - MaybePendingTransactionReceipt::Receipt(TransactionReceipt::L1Handler(receipt)) => { - Some(TransactionReceipt::L1Handler(receipt)) - } - _ => None, - }, - Err(e) => { - error!(target: LOG_TARGET, error = %e, "Getting transaction receipt."); - return Err(e.into()); + let receipt = self.provider.get_transaction_receipt(transaction_hash).await?; + let events = match &receipt { + MaybePendingTransactionReceipt::Receipt(TransactionReceipt::Invoke(receipt)) => { + Some(&receipt.events) } + MaybePendingTransactionReceipt::Receipt(TransactionReceipt::L1Handler(receipt)) => { + Some(&receipt.events) + } + MaybePendingTransactionReceipt::PendingReceipt(PendingTransactionReceipt::Invoke( + receipt, + )) => Some(&receipt.events), + MaybePendingTransactionReceipt::PendingReceipt( + PendingTransactionReceipt::L1Handler(receipt), + ) => Some(&receipt.events), + _ => None, }; - if let Some(receipt) = receipt { - let events = match &receipt { - TransactionReceipt::Invoke(invoke_receipt) => &invoke_receipt.events, - TransactionReceipt::L1Handler(l1_handler_receipt) => &l1_handler_receipt.events, - _ => return Ok(()), - }; - + if let Some(events) = events { let mut world_event = false; for (event_idx, event) in events.iter().enumerate() { if event.from_address != self.world.address { @@ -277,21 +369,10 @@ impl Engine

{ Ok(()) } - async fn process_block( - &mut self, - block_number: u64, - block_timestamp: u64, - block_hash: FieldElement, - ) -> Result<()> { + async fn process_block(&mut self, block_number: u64, block_timestamp: u64) -> Result<()> { for processor in &self.processors.block { processor - .process( - &mut self.db, - self.provider.as_ref(), - block_number, - block_timestamp, - block_hash, - ) + .process(&mut self.db, self.provider.as_ref(), block_number, block_timestamp) .await?; } Ok(()) @@ -301,7 +382,7 @@ impl Engine

{ &mut self, block_number: u64, block_timestamp: u64, - transaction_receipt: &TransactionReceipt, + transaction_receipt: &MaybePendingTransactionReceipt, transaction_hash: FieldElement, transaction: &Transaction, ) -> Result<()> { @@ -326,18 +407,16 @@ impl Engine

{ &mut self, block_number: u64, block_timestamp: u64, - transaction_receipt: &TransactionReceipt, + transaction_receipt: &MaybePendingTransactionReceipt, event_id: &str, event: &Event, ) -> Result<()> { - let transaction_hash = match transaction_receipt { - TransactionReceipt::Invoke(invoke_receipt) => invoke_receipt.transaction_hash, - TransactionReceipt::L1Handler(l1_handler_receipt) => { - l1_handler_receipt.transaction_hash - } - _ => return Ok(()), - }; - self.db.store_event(event_id, event, transaction_hash, block_timestamp); + self.db.store_event( + event_id, + event, + *transaction_receipt.transaction_hash(), + block_timestamp, + ); for processor in &self.processors.event { // If the processor has no event_key, means it's a catch-all processor. // We also validate the event diff --git a/crates/torii/core/src/processors/event_message.rs b/crates/torii/core/src/processors/event_message.rs index d370d4af9b..f74bc730a9 100644 --- a/crates/torii/core/src/processors/event_message.rs +++ b/crates/torii/core/src/processors/event_message.rs @@ -2,7 +2,7 @@ use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::world::WorldContractReader; -use starknet::core::types::{Event, TransactionReceipt}; +use starknet::core::types::{Event, MaybePendingTransactionReceipt}; use starknet::providers::Provider; use tracing::info; @@ -42,7 +42,7 @@ where db: &mut Sql, _block_number: u64, block_timestamp: u64, - _transaction_receipt: &TransactionReceipt, + _transaction_receipt: &MaybePendingTransactionReceipt, event_id: &str, event: &Event, ) -> Result<(), Error> { diff --git a/crates/torii/core/src/processors/metadata_update.rs b/crates/torii/core/src/processors/metadata_update.rs index 48d1caad5c..07859743b2 100644 --- a/crates/torii/core/src/processors/metadata_update.rs +++ b/crates/torii/core/src/processors/metadata_update.rs @@ -7,7 +7,7 @@ use base64::Engine as _; use dojo_world::contracts::world::WorldContractReader; use dojo_world::metadata::{Uri, WorldMetadata}; use reqwest::Client; -use starknet::core::types::{Event, TransactionReceipt}; +use starknet::core::types::{Event, MaybePendingTransactionReceipt}; use starknet::core::utils::parse_cairo_short_string; use starknet::providers::Provider; use starknet_crypto::FieldElement; @@ -53,7 +53,7 @@ where db: &mut Sql, _block_number: u64, block_timestamp: u64, - _transaction_receipt: &TransactionReceipt, + _transaction_receipt: &MaybePendingTransactionReceipt, _event_id: &str, event: &Event, ) -> Result<(), Error> { diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index f40ff90fdd..5dfd43c766 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -1,7 +1,7 @@ use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::world::WorldContractReader; -use starknet::core::types::{Event, Transaction, TransactionReceipt}; +use starknet::core::types::{Event, MaybePendingTransactionReceipt, Transaction}; use starknet::providers::Provider; use starknet_crypto::FieldElement; @@ -37,7 +37,7 @@ where db: &mut Sql, block_number: u64, block_timestamp: u64, - transaction_receipt: &TransactionReceipt, + transaction_receipt: &MaybePendingTransactionReceipt, event_id: &str, event: &Event, ) -> Result<(), Error>; @@ -52,7 +52,6 @@ pub trait BlockProcessor { provider: &P, block_number: u64, block_timestamp: u64, - block_hash: FieldElement, ) -> Result<(), Error>; } @@ -65,7 +64,7 @@ pub trait TransactionProcessor { provider: &P, block_number: u64, block_timestamp: u64, - transaction_receipt: &TransactionReceipt, + transaction_receipt: &MaybePendingTransactionReceipt, transaction_hash: FieldElement, transaction: &Transaction, ) -> Result<(), Error>; diff --git a/crates/torii/core/src/processors/register_model.rs b/crates/torii/core/src/processors/register_model.rs index 21ab35324d..5b097ab40c 100644 --- a/crates/torii/core/src/processors/register_model.rs +++ b/crates/torii/core/src/processors/register_model.rs @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::world::WorldContractReader; -use starknet::core::types::{Event, TransactionReceipt}; +use starknet::core::types::{Event, MaybePendingTransactionReceipt}; use starknet::core::utils::parse_cairo_short_string; use starknet::providers::Provider; use tracing::{debug, info}; @@ -43,7 +43,7 @@ where db: &mut Sql, _block_number: u64, block_timestamp: u64, - _transaction_receipt: &TransactionReceipt, + _transaction_receipt: &MaybePendingTransactionReceipt, _event_id: &str, event: &Event, ) -> Result<(), Error> { diff --git a/crates/torii/core/src/processors/store_del_record.rs b/crates/torii/core/src/processors/store_del_record.rs index 46fd319cd3..c7802fef77 100644 --- a/crates/torii/core/src/processors/store_del_record.rs +++ b/crates/torii/core/src/processors/store_del_record.rs @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::world::WorldContractReader; -use starknet::core::types::{Event, TransactionReceipt}; +use starknet::core::types::{Event, MaybePendingTransactionReceipt}; use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; use starknet::providers::Provider; use tracing::info; @@ -44,7 +44,7 @@ where db: &mut Sql, _block_number: u64, _block_timestamp: u64, - _transaction_receipt: &TransactionReceipt, + _transaction_receipt: &MaybePendingTransactionReceipt, _event_id: &str, event: &Event, ) -> Result<(), Error> { diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index 71cb3ace81..03c9b71e49 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::world::WorldContractReader; -use starknet::core::types::{Event, TransactionReceipt}; +use starknet::core::types::{Event, MaybePendingTransactionReceipt}; use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; use starknet::providers::Provider; use tracing::info; @@ -44,7 +44,7 @@ where db: &mut Sql, _block_number: u64, block_timestamp: u64, - _transaction_receipt: &TransactionReceipt, + _transaction_receipt: &MaybePendingTransactionReceipt, event_id: &str, event: &Event, ) -> Result<(), Error> { diff --git a/crates/torii/core/src/processors/store_transaction.rs b/crates/torii/core/src/processors/store_transaction.rs index 448de5f6ff..7ceaaf0d89 100644 --- a/crates/torii/core/src/processors/store_transaction.rs +++ b/crates/torii/core/src/processors/store_transaction.rs @@ -1,6 +1,6 @@ use anyhow::{Error, Ok, Result}; use async_trait::async_trait; -use starknet::core::types::{Transaction, TransactionReceipt}; +use starknet::core::types::{MaybePendingTransactionReceipt, Transaction}; use starknet::providers::Provider; use starknet_crypto::FieldElement; @@ -18,7 +18,7 @@ impl TransactionProcessor

for StoreTransactionProcessor { _provider: &P, block_number: u64, block_timestamp: u64, - _receipt: &TransactionReceipt, + _receipt: &MaybePendingTransactionReceipt, transaction_hash: FieldElement, transaction: &Transaction, ) -> Result<(), Error> { diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 6bac2d67db..51e0eb5d60 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -53,20 +53,33 @@ impl Sql { Ok(Self { pool, world_address, query_queue }) } - pub async fn head(&self) -> Result { + pub async fn head(&self) -> Result<(u64, Option)> { let mut conn: PoolConnection = self.pool.acquire().await?; - let indexer_query = sqlx::query_as::<_, (i64,)>("SELECT head FROM indexers WHERE id = ?") - .bind(format!("{:#x}", self.world_address)); - - let indexer: (i64,) = indexer_query.fetch_one(&mut *conn).await?; - Ok(indexer.0.try_into().expect("doesn't fit in u64")) + let indexer_query = sqlx::query_as::<_, (i64, Option)>( + "SELECT head, pending_block_tx FROM indexers WHERE id = ?", + ) + .bind(format!("{:#x}", self.world_address)); + + let indexer: (i64, Option) = indexer_query.fetch_one(&mut *conn).await?; + Ok(( + indexer.0.try_into().expect("doesn't fit in u64"), + indexer.1.map(|f| FieldElement::from_str(&f)).transpose()?, + )) } - pub fn set_head(&mut self, head: u64) { + pub fn set_head(&mut self, head: u64, pending_block_tx: Option) { let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); - let id = Argument::String(format!("{:#x}", self.world_address)); + let id = Argument::FieldElement(self.world_address); + let pending_block_tx = if let Some(f) = pending_block_tx { + Argument::String(format!("{:#x}", f)) + } else { + Argument::Null + }; - self.query_queue.enqueue("UPDATE indexers SET head = ? WHERE id = ?", vec![head, id]); + self.query_queue.enqueue( + "UPDATE indexers SET head = ?, pending_block_tx = ? WHERE id = ?", + vec![head, pending_block_tx, id], + ); } pub async fn world(&self) -> Result { diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index b441c619f6..dd9c18f6cf 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -46,7 +46,7 @@ where None, ); - let _ = engine.sync_to_head(0).await?; + let _ = engine.sync_to_head(0, None).await?; Ok(engine) } diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index f5b7aa7511..5f1af308d9 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -347,7 +347,7 @@ pub async fn spinup_types_test() -> Result { None, ); - let _ = engine.sync_to_head(0).await?; + let _ = engine.sync_to_head(0, None).await?; Ok(pool) } diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index e4dcec6977..9c9a871280 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -88,7 +88,7 @@ async fn test_entities_queries() { None, ); - let _ = engine.sync_to_head(0).await.unwrap(); + let _ = engine.sync_to_head(0, None).await.unwrap(); let (_, receiver) = tokio::sync::mpsc::channel(1); let grpc = DojoWorld::new(db.pool, receiver, world_address, provider.clone()); diff --git a/crates/torii/migrations/20240426211245_pending_block.sql b/crates/torii/migrations/20240426211245_pending_block.sql new file mode 100644 index 0000000000..2db2edb6be --- /dev/null +++ b/crates/torii/migrations/20240426211245_pending_block.sql @@ -0,0 +1,2 @@ +-- Add the pending block txn cursor to indexers table +ALTER TABLE indexers ADD COLUMN pending_block_tx TEXT NULL DEFAULT NULL; \ No newline at end of file