Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii): global "sync" for pending & range syncing #2320

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 29 additions & 105 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{
BlockId, BlockTag, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes,
MaybePendingBlockWithTxs, ReceiptBlock, Transaction, TransactionReceipt,
TransactionReceiptWithBlockInfo,
BlockId, BlockTag, Event, EventFilter, Felt, MaybePendingBlockWithTxHashes, ReceiptBlock,
Transaction, TransactionReceipt, TransactionReceiptWithBlockInfo,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
Expand Down Expand Up @@ -102,7 +101,7 @@
break Ok(());
}
_ = async {
match self.sync_to_head(head, pending_block_tx).await {
match self.sync(head, pending_block_tx).await {

Check warning on line 104 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L104

Added line #L104 was not covered by tests
Ok((latest_block_number, latest_pending_tx)) => {
if erroring_out {
erroring_out = false;
Expand All @@ -128,105 +127,24 @@
}
}

pub async fn sync_to_head(
pub async fn sync(
&mut self,
from: u64,
mut pending_block_tx: Option<Felt>,
pending_block_tx: Option<Felt>,
) -> Result<(u64, Option<Felt>)> {
// Process all blocks from current to head.
// If the index pending flag is set, we will process pending events as well.
let latest_block_number = self.provider.block_hash_and_number().await?.block_number;

if from < latest_block_number {
// if `from` == 0, then the block may or may not be processed yet.
let from = if from == 0 { from } else { from + 1 };
pending_block_tx = self.sync_range(from, latest_block_number, pending_block_tx).await?;
} else if self.config.index_pending {
pending_block_tx = self.sync_pending(latest_block_number + 1, pending_block_tx).await?;
}

Ok((latest_block_number, pending_block_tx))
}

pub async fn sync_pending(
&mut self,
block_number: u64,
mut pending_block_tx: Option<Felt>,
) -> Result<Option<Felt>> {
let block = if let MaybePendingBlockWithTxs::PendingBlock(pending) =
self.provider.get_block_with_txs(BlockId::Tag(BlockTag::Pending)).await?
{
pending
} else {
return Ok(None);
};

// Skip transactions that have been processed already
// Our cursor is the last processed transaction
let mut pending_block_tx_cursor = pending_block_tx;
for transaction in block.transactions {
if let Some(tx) = pending_block_tx_cursor {
if transaction.transaction_hash() != &tx {
continue;
}

pending_block_tx_cursor = None;
continue;
}

match self
.process_transaction_and_receipt(
*transaction.transaction_hash(),
&transaction,
block_number,
block.timestamp,
)
.await
{
Err(e) => {
match e.to_string().as_str() {
"TransactionHashNotFound" => {
// We failed to fetch the transaction, which is because
// the transaction might not have been processed fast enough by the
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Retrieving pending transaction receipt.");
return Ok(pending_block_tx);
}
_ => {
error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processing pending transaction.");
return Err(e);
}
}
}
Ok(true) => {
pending_block_tx = Some(*transaction.transaction_hash());
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending world transaction.");
}
Ok(_) => {
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction.transaction_hash()), "Processed pending transaction.")
}
}
}

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(block_number - 1, pending_block_tx);

self.db.execute().await?;
Ok(pending_block_tx)
}

pub async fn sync_range(
&mut self,
from: u64,
to: u64,
pending_block_tx: Option<Felt>,
) -> Result<Option<Felt>> {
// Process all blocks from current to latest.
let get_events = |token: Option<String>| {
self.provider.get_events(
EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
to_block: Some(if self.config.index_pending {
BlockId::Tag(BlockTag::Pending)

Check warning on line 144 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L144

Added line #L144 was not covered by tests
} else {
BlockId::Number(latest_block_number)
}),
address: Some(self.world.address),
keys: None,
},
Expand Down Expand Up @@ -268,11 +186,11 @@
} else {
// If the block is pending, we assume the block number is the
// latest + 1
to + 1
latest_block_number + 1

Check warning on line 189 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L189

Added line #L189 was not covered by tests
}
}

_ => to + 1,
_ => latest_block_number + 1,

Check warning on line 193 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L193

Added line #L193 was not covered by tests
}
}
};
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -320,13 +238,14 @@
// Process transaction
let transaction = self.provider.get_transaction_by_hash(transaction_hash).await?;

self.process_transaction_and_receipt(
transaction_hash,
&transaction,
block_number,
blocks[&block_number],
)
.await?;
let has_world_event = self
.process_transaction_and_receipt(
transaction_hash,
&transaction,
block_number,
blocks[&block_number],
)
.await?;

// Process block
if block_number > last_block {
Expand All @@ -337,6 +256,11 @@
self.process_block(block_number, blocks[&block_number]).await?;
last_block = block_number;
}

// If the transaction has a world event, we update the cursor
if has_world_event {
pending_block_tx_cursor = Some(transaction_hash);
}
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
}

// We return None for the pending_block_tx because our sync_range
Expand All @@ -345,11 +269,11 @@
// so once the sync range is done, we assume all of the tx of the block
// have been processed.

self.db.set_head(to, None);
self.db.set_head(latest_block_number, pending_block_tx_cursor);

self.db.execute().await?;

Ok(None)
Ok((latest_block_number, pending_block_tx_cursor))
}

async fn get_block_timestamp(&self, block_number: u64) -> Result<u64> {
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
None,
);

let _ = engine.sync_to_head(0, None).await?;
let _ = engine.sync(0, None).await?;

Ok(engine)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/graphql/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub async fn spinup_types_test() -> Result<SqlitePool> {
None,
);

let _ = engine.sync_to_head(0, None).await?;
let _ = engine.sync(0, None).await?;

Ok(pool)
}
2 changes: 1 addition & 1 deletion crates/torii/grpc/src/server/tests/entities_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn test_entities_queries() {
None,
);

let _ = engine.sync_to_head(0, None).await.unwrap();
let _ = engine.sync(0, None).await.unwrap();

let (_, receiver) = tokio::sync::mpsc::channel(1);
let grpc = DojoWorld::new(db.pool, receiver, strat.world_address, provider.clone());
Expand Down
Loading