Skip to content

Commit

Permalink
feat(torii): process pending block (#1798)
Browse files Browse the repository at this point in the history
* feat: process pending block

* err

* feat: process pending block with cursor

* feat: update head

* refactor: tx cursor use hash

* refactor; migration

* feat: optimized sync range with events pages & dedup logic

* feat: correct logic

* fix: skipping

* fix: test

* fix

* refactor: dedup into for loop

* refactor: dont error out if no p[ending block

* fix: pending indexing inf loop

* refactor: pending transaction fetch error feedback

* fmt

* chore: clippy

* refactor: migrations

* refacort: simplify migrations

* chore: cleanup code
  • Loading branch information
Larkooo authored Apr 26, 2024
1 parent 5f421e1 commit 6a3cbaa
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 126 deletions.
275 changes: 177 additions & 98 deletions crates/torii/core/src/engine.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::types::{Event, MaybePendingTransactionReceipt};
use starknet::providers::Provider;
use tracing::info;

Expand Down Expand Up @@ -42,7 +42,7 @@ where
db: &mut Sql,
_block_number: u64,
block_timestamp: u64,
_transaction_receipt: &TransactionReceipt,
_transaction_receipt: &MaybePendingTransactionReceipt,
event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use base64::Engine as _;
use dojo_world::contracts::world::WorldContractReader;
use dojo_world::metadata::{Uri, WorldMetadata};
use reqwest::Client;
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::types::{Event, MaybePendingTransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use starknet_crypto::FieldElement;
Expand Down Expand Up @@ -53,7 +53,7 @@ where
db: &mut Sql,
_block_number: u64,
block_timestamp: u64,
_transaction_receipt: &TransactionReceipt,
_transaction_receipt: &MaybePendingTransactionReceipt,
_event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
7 changes: 3 additions & 4 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, Transaction, TransactionReceipt};
use starknet::core::types::{Event, MaybePendingTransactionReceipt, Transaction};
use starknet::providers::Provider;
use starknet_crypto::FieldElement;

Expand Down Expand Up @@ -37,7 +37,7 @@ where
db: &mut Sql,
block_number: u64,
block_timestamp: u64,
transaction_receipt: &TransactionReceipt,
transaction_receipt: &MaybePendingTransactionReceipt,
event_id: &str,
event: &Event,
) -> Result<(), Error>;
Expand All @@ -52,7 +52,6 @@ pub trait BlockProcessor<P: Provider + Sync> {
provider: &P,
block_number: u64,
block_timestamp: u64,
block_hash: FieldElement,
) -> Result<(), Error>;
}

Expand All @@ -65,7 +64,7 @@ pub trait TransactionProcessor<P: Provider + Sync> {
provider: &P,
block_number: u64,
block_timestamp: u64,
transaction_receipt: &TransactionReceipt,
transaction_receipt: &MaybePendingTransactionReceipt,
transaction_hash: FieldElement,
transaction: &Transaction,
) -> Result<(), Error>;
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/register_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::types::{Event, MaybePendingTransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use tracing::{debug, info};
Expand Down Expand Up @@ -43,7 +43,7 @@ where
db: &mut Sql,
_block_number: u64,
block_timestamp: u64,
_transaction_receipt: &TransactionReceipt,
_transaction_receipt: &MaybePendingTransactionReceipt,
_event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::types::{Event, MaybePendingTransactionReceipt};
use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string};
use starknet::providers::Provider;
use tracing::info;
Expand Down Expand Up @@ -44,7 +44,7 @@ where
db: &mut Sql,
_block_number: u64,
_block_timestamp: u64,
_transaction_receipt: &TransactionReceipt,
_transaction_receipt: &MaybePendingTransactionReceipt,
_event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::types::{Event, MaybePendingTransactionReceipt};
use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string};
use starknet::providers::Provider;
use tracing::info;
Expand Down Expand Up @@ -44,7 +44,7 @@ where
db: &mut Sql,
_block_number: u64,
block_timestamp: u64,
_transaction_receipt: &TransactionReceipt,
_transaction_receipt: &MaybePendingTransactionReceipt,
event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/store_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use starknet::core::types::{Transaction, TransactionReceipt};
use starknet::core::types::{MaybePendingTransactionReceipt, Transaction};
use starknet::providers::Provider;
use starknet_crypto::FieldElement;

Expand All @@ -18,7 +18,7 @@ impl<P: Provider + Sync> TransactionProcessor<P> for StoreTransactionProcessor {
_provider: &P,
block_number: u64,
block_timestamp: u64,
_receipt: &TransactionReceipt,
_receipt: &MaybePendingTransactionReceipt,
transaction_hash: FieldElement,
transaction: &Transaction,
) -> Result<(), Error> {
Expand Down
31 changes: 22 additions & 9 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,33 @@ impl Sql {
Ok(Self { pool, world_address, query_queue })
}

pub async fn head(&self) -> Result<u64> {
pub async fn head(&self) -> Result<(u64, Option<FieldElement>)> {
let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let indexer_query = sqlx::query_as::<_, (i64,)>("SELECT head FROM indexers WHERE id = ?")
.bind(format!("{:#x}", self.world_address));

let indexer: (i64,) = indexer_query.fetch_one(&mut *conn).await?;
Ok(indexer.0.try_into().expect("doesn't fit in u64"))
let indexer_query = sqlx::query_as::<_, (i64, Option<String>)>(
"SELECT head, pending_block_tx FROM indexers WHERE id = ?",
)
.bind(format!("{:#x}", self.world_address));

let indexer: (i64, Option<String>) = indexer_query.fetch_one(&mut *conn).await?;
Ok((
indexer.0.try_into().expect("doesn't fit in u64"),
indexer.1.map(|f| FieldElement::from_str(&f)).transpose()?,
))
}

pub fn set_head(&mut self, head: u64) {
pub fn set_head(&mut self, head: u64, pending_block_tx: Option<FieldElement>) {
let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
let id = Argument::String(format!("{:#x}", self.world_address));
let id = Argument::FieldElement(self.world_address);
let pending_block_tx = if let Some(f) = pending_block_tx {
Argument::String(format!("{:#x}", f))
} else {
Argument::Null
};

self.query_queue.enqueue("UPDATE indexers SET head = ? WHERE id = ?", vec![head, id]);
self.query_queue.enqueue(
"UPDATE indexers SET head = ?, pending_block_tx = ? WHERE id = ?",
vec![head, pending_block_tx, id],
);
}

pub async fn world(&self) -> Result<World> {
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ where
None,
);

let _ = engine.sync_to_head(0).await?;
let _ = engine.sync_to_head(0, None).await?;

Ok(engine)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/graphql/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub async fn spinup_types_test() -> Result<SqlitePool> {
None,
);

let _ = engine.sync_to_head(0).await?;
let _ = engine.sync_to_head(0, None).await?;

Ok(pool)
}
2 changes: 1 addition & 1 deletion crates/torii/grpc/src/server/tests/entities_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn test_entities_queries() {
None,
);

let _ = engine.sync_to_head(0).await.unwrap();
let _ = engine.sync_to_head(0, None).await.unwrap();

let (_, receiver) = tokio::sync::mpsc::channel(1);
let grpc = DojoWorld::new(db.pool, receiver, world_address, provider.clone());
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/migrations/20240426211245_pending_block.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add the pending block txn cursor to indexers table
ALTER TABLE indexers ADD COLUMN pending_block_tx TEXT NULL DEFAULT NULL;

0 comments on commit 6a3cbaa

Please sign in to comment.