Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii): global "sync" for pending & range syncing #2320

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 31 additions & 123 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{
BlockId, BlockTag, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes,
MaybePendingBlockWithTxs, ReceiptBlock, Transaction, TransactionReceipt,
TransactionReceiptWithBlockInfo,
BlockId, BlockTag, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes, Transaction,
TransactionReceipt, TransactionReceiptWithBlockInfo,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
Expand Down Expand Up @@ -102,7 +101,7 @@
break Ok(());
}
_ = async {
match self.sync_to_head(head, pending_block_tx).await {
match self.sync(head, pending_block_tx).await {

Check warning on line 104 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L104

Added line #L104 was not covered by tests
Ok((latest_block_number, latest_pending_tx)) => {
if erroring_out {
erroring_out = false;
Expand All @@ -128,105 +127,24 @@
}
}

pub async fn sync_to_head(
pub async fn sync(
&mut self,
from: u64,
mut pending_block_tx: Option<Felt>,
pending_block_tx: Option<Felt>,
) -> Result<(u64, Option<Felt>)> {
// Process all blocks from current to head.
// If the index pending flag is set, we will process pending events as well.
let latest_block_number = self.provider.block_hash_and_number().await?.block_number;

if from < latest_block_number {
// if `from` == 0, then the block may or may not be processed yet.
let from = if from == 0 { from } else { from + 1 };
pending_block_tx = self.sync_range(from, latest_block_number, pending_block_tx).await?;
} else if self.config.index_pending {
pending_block_tx = self.sync_pending(latest_block_number + 1, pending_block_tx).await?;
}

Ok((latest_block_number, pending_block_tx))
}

pub async fn sync_pending(
&mut self,
block_number: u64,
mut pending_block_tx: Option<Felt>,
) -> Result<Option<Felt>> {
let block = if let MaybePendingBlockWithTxs::PendingBlock(pending) =
self.provider.get_block_with_txs(BlockId::Tag(BlockTag::Pending)).await?
{
pending
} else {
return Ok(None);
};

// Skip transactions that have been processed already
// Our cursor is the last processed transaction
let mut pending_block_tx_cursor = pending_block_tx;
for transaction in block.transactions {
if let Some(tx) = pending_block_tx_cursor {
if transaction.transaction_hash() != &tx {
continue;
}

pending_block_tx_cursor = None;
continue;
}

match self
.process_transaction_and_receipt(
*transaction.transaction_hash(),
&transaction,
block_number,
block.timestamp,
)
.await
{
Err(e) => {
match e.to_string().as_str() {
"TransactionHashNotFound" => {
// We failed to fetch the transaction, which is because
// the transaction might not have been processed fast enough by the
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Retrieving pending transaction receipt.");
return Ok(pending_block_tx);
}
_ => {
error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processing pending transaction.");
return Err(e);
}
}
}
Ok(true) => {
pending_block_tx = Some(*transaction.transaction_hash());
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending world transaction.");
}
Ok(_) => {
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending transaction.")
}
}
}

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(block_number - 1, pending_block_tx);

self.db.execute().await?;
Ok(pending_block_tx)
}

pub async fn sync_range(
&mut self,
from: u64,
to: u64,
pending_block_tx: Option<Felt>,
) -> Result<Option<Felt>> {
// Process all blocks from current to latest.
let get_events = |token: Option<String>| {
self.provider.get_events(
EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
to_block: Some(if self.config.index_pending {
BlockId::Tag(BlockTag::Pending)

Check warning on line 144 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L144

Added line #L144 was not covered by tests
} else {
BlockId::Number(latest_block_number)
}),
address: Some(self.world.address),
keys: None,
},
Expand Down Expand Up @@ -254,32 +172,20 @@
for event in &events_page.events {
let block_number = match event.block_number {
Some(block_number) => block_number,
// If the block number is not present, try to fetch it from the transaction
// receipt Should not/rarely happen. Thus the additional
// fetch is acceptable.
None => {
let TransactionReceiptWithBlockInfo { receipt, block } =
self.provider.get_transaction_receipt(event.transaction_hash).await?;

match receipt {
TransactionReceipt::Invoke(_) | TransactionReceipt::L1Handler(_) => {
if let ReceiptBlock::Block { block_number, .. } = block {
block_number
} else {
// If the block is pending, we assume the block number is the
// latest + 1
to + 1
}
}

_ => to + 1,
}
}
// If the block number is not present, we assume we're dealing
// with a pending block event. Thus the block number is the latest + 1
None => latest_block_number + 1,

Check warning on line 177 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L177

Added line #L177 was not covered by tests
};

// Keep track of last block number and fetch block timestamp
if block_number > last_block {
let block_timestamp = self.get_block_timestamp(block_number).await?;
let block_timestamp = self
.get_block_timestamp(if block_number > latest_block_number {
BlockId::Tag(BlockTag::Pending)

Check warning on line 184 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L184

Added line #L184 was not covered by tests
} else {
BlockId::Number(block_number)
})
.await?;
blocks.insert(block_number, block_timestamp);

last_block = block_number;
Expand Down Expand Up @@ -337,6 +243,8 @@
self.process_block(block_number, blocks[&block_number]).await?;
last_block = block_number;
}

pending_block_tx_cursor = Some(transaction_hash);
}

// We return None for the pending_block_tx because our sync_range
Expand All @@ -345,15 +253,15 @@
// so once the sync range is done, we assume all of the tx of the block
// have been processed.

self.db.set_head(to, None);
self.db.set_head(latest_block_number, pending_block_tx_cursor);

self.db.execute().await?;

Ok(None)
Ok((latest_block_number, pending_block_tx_cursor))
}

async fn get_block_timestamp(&self, block_number: u64) -> Result<u64> {
match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? {
async fn get_block_timestamp(&self, block_id: BlockId) -> Result<u64> {
match self.provider.get_block_with_tx_hashes(block_id).await? {
MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp),
MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp),
}
Expand All @@ -367,16 +275,16 @@
transaction: &Transaction,
block_number: u64,
block_timestamp: u64,
) -> Result<bool> {
) -> Result<()> {
let receipt = self.provider.get_transaction_receipt(transaction_hash).await?;
let events = match &receipt.receipt {
TransactionReceipt::Invoke(receipt) => Some(&receipt.events),
TransactionReceipt::L1Handler(receipt) => Some(&receipt.events),
_ => None,
};

let mut world_event = false;
if let Some(events) = events {
let mut world_event = false;
for (event_idx, event) in events.iter().enumerate() {
if event.from_address != self.world.address {
continue;
Expand Down Expand Up @@ -410,7 +318,7 @@
}
}

Ok(world_event)
Ok(())
}

async fn process_block(&mut self, block_number: u64, block_timestamp: u64) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
None,
);

let _ = engine.sync_to_head(0, None).await?;
let _ = engine.sync(0, None).await?;

Ok(engine)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/graphql/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub async fn spinup_types_test() -> Result<SqlitePool> {
None,
);

let _ = engine.sync_to_head(0, None).await?;
let _ = engine.sync(0, None).await?;

Ok(pool)
}
2 changes: 1 addition & 1 deletion crates/torii/grpc/src/server/tests/entities_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn test_entities_queries() {
None,
);

let _ = engine.sync_to_head(0, None).await.unwrap();
let _ = engine.sync(0, None).await.unwrap();

let (_, receiver) = tokio::sync::mpsc::channel(1);
let grpc = DojoWorld::new(db.pool, receiver, strat.world_address, provider.clone());
Expand Down
Loading