Skip to content

Commit

Permalink
[Torii] Index more transaction types (#1452)
Browse files Browse the repository at this point in the history
* include l1 handler indexing

* add txn type to table

* fix failing checks

* return unsupported txn type early

* retry failed block processing
  • Loading branch information
jelilat authored Feb 22, 2024
1 parent 401c930 commit 7984fb8
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 71 deletions.
124 changes: 78 additions & 46 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::time::Duration;
use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{
BlockId, BlockWithTxs, Event, InvokeTransaction, InvokeTransactionReceipt, InvokeTransactionV1,
MaybePendingBlockWithTxs, MaybePendingTransactionReceipt, Transaction, TransactionReceipt,
BlockId, BlockWithTxs, Event, InvokeTransaction, MaybePendingBlockWithTxs,
MaybePendingTransactionReceipt, Transaction, TransactionReceipt,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
use starknet_crypto::FieldElement;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::time::sleep;
Expand Down Expand Up @@ -135,11 +136,17 @@ impl<'db, P: Provider + Sync> Engine<'db, P> {
block_tx.send(from).await.expect("failed to send block number to gRPC server");
}

self.process(block_with_txs).await?;

self.db.set_head(from);
self.db.execute().await?;
from += 1;
match self.process(block_with_txs).await {
Ok(_) => {
self.db.set_head(from);
self.db.execute().await?;
from += 1;
}
Err(e) => {
error!("processing block: {}", e);
continue;
}
}
}

Ok(())
Expand All @@ -154,35 +161,61 @@ impl<'db, P: Provider + Sync> Engine<'db, P> {
Self::process_block(self, &block).await?;

for (tx_idx, transaction) in block.clone().transactions.iter().enumerate() {
let invoke_transaction = match &transaction {
Transaction::Invoke(invoke_transaction) => invoke_transaction,
let transaction_hash = match transaction {
Transaction::Invoke(invoke_transaction) => {
if let InvokeTransaction::V1(invoke_transaction) = invoke_transaction {
invoke_transaction.transaction_hash
} else {
continue;
}
}
Transaction::L1Handler(l1_handler_transaction) => {
l1_handler_transaction.transaction_hash
}
_ => continue,
};

let invoke_transaction = match invoke_transaction {
InvokeTransaction::V1(invoke_transaction) => invoke_transaction,
_ => continue,
};
self.process_transaction_and_receipt(transaction_hash, transaction, &block, tx_idx)
.await?;
}

let receipt = self
.provider
.get_transaction_receipt(invoke_transaction.transaction_hash)
.await
.ok()
.and_then(|receipt| match receipt {
MaybePendingTransactionReceipt::Receipt(TransactionReceipt::Invoke(
receipt,
)) => Some(receipt),
_ => None,
});

let invoke_receipt = match receipt {
Some(receipt) => receipt,
_ => continue,
info!("processed block: {}", block.block_number);

Ok(())
}

async fn process_transaction_and_receipt(
&mut self,
transaction_hash: FieldElement,
transaction: &Transaction,
block: &BlockWithTxs,
tx_idx: usize,
) -> Result<()> {
let receipt = match self.provider.get_transaction_receipt(transaction_hash).await {
Ok(receipt) => match receipt {
MaybePendingTransactionReceipt::Receipt(TransactionReceipt::Invoke(receipt)) => {
Some(TransactionReceipt::Invoke(receipt))
}
MaybePendingTransactionReceipt::Receipt(TransactionReceipt::L1Handler(receipt)) => {
Some(TransactionReceipt::L1Handler(receipt))
}
_ => None,
},
Err(e) => {
error!("getting transaction receipt: {}", e);
return Err(e.into());
}
};

if let Some(receipt) = receipt {
let events = match &receipt {
TransactionReceipt::Invoke(invoke_receipt) => &invoke_receipt.events,
TransactionReceipt::L1Handler(l1_handler_receipt) => &l1_handler_receipt.events,
_ => return Ok(()),
};

let mut world_event = false;
for (event_idx, event) in invoke_receipt.events.iter().enumerate() {
for (event_idx, event) in events.iter().enumerate() {
if event.from_address != self.world.address {
continue;
}
Expand All @@ -191,25 +224,17 @@ impl<'db, P: Provider + Sync> Engine<'db, P> {
let event_id =
format!("0x{:064x}:0x{:04x}:0x{:04x}", block.block_number, tx_idx, event_idx);

Self::process_event(self, &block, &invoke_receipt, &event_id, event).await?;
Self::process_event(self, block, &receipt, &event_id, event).await?;
}

if world_event {
let transaction_id = format!("0x{:064x}:0x{:04x}", block.block_number, tx_idx);

Self::process_transaction(
self,
&block,
&invoke_receipt,
&transaction_id,
invoke_transaction,
)
.await?;
Self::process_transaction(self, block, &receipt, &transaction_id, transaction)
.await?;
}
}

info!("processed block: {}", block.block_number);

Ok(())
}

Expand All @@ -223,17 +248,17 @@ impl<'db, P: Provider + Sync> Engine<'db, P> {
async fn process_transaction(
&mut self,
block: &BlockWithTxs,
invoke_receipt: &InvokeTransactionReceipt,
transaction_receipt: &TransactionReceipt,
transaction_id: &str,
transaction: &InvokeTransactionV1,
transaction: &Transaction,
) -> Result<()> {
for processor in &self.processors.transaction {
processor
.process(
self.db,
self.provider.as_ref(),
block,
invoke_receipt,
transaction_receipt,
transaction,
transaction_id,
)
Expand All @@ -246,17 +271,24 @@ impl<'db, P: Provider + Sync> Engine<'db, P> {
async fn process_event(
&mut self,
block: &BlockWithTxs,
invoke_receipt: &InvokeTransactionReceipt,
transaction_receipt: &TransactionReceipt,
event_id: &str,
event: &Event,
) -> Result<()> {
self.db.store_event(event_id, event, invoke_receipt.transaction_hash);
let transaction_hash = match transaction_receipt {
TransactionReceipt::Invoke(invoke_receipt) => invoke_receipt.transaction_hash,
TransactionReceipt::L1Handler(l1_handler_receipt) => {
l1_handler_receipt.transaction_hash
}
_ => return Ok(()),
};
self.db.store_event(event_id, event, transaction_hash);
for processor in &self.processors.event {
if get_selector_from_name(&processor.event_key())? == event.keys[0]
&& processor.validate(event)
{
processor
.process(&self.world, self.db, block, invoke_receipt, event_id, event)
.process(&self.world, self.db, block, transaction_receipt, event_id, event)
.await?;
} else {
let unprocessed_event = UnprocessedEvent {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use base64::Engine as _;
use dojo_world::contracts::world::WorldContractReader;
use dojo_world::metadata::{Uri, WorldMetadata};
use reqwest::Client;
use starknet::core::types::{BlockWithTxs, Event, InvokeTransactionReceipt};
use starknet::core::types::{BlockWithTxs, Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use starknet_crypto::FieldElement;
Expand Down Expand Up @@ -49,7 +49,7 @@ where
_world: &WorldContractReader<P>,
db: &mut Sql,
_block: &BlockWithTxs,
_invoke_receipt: &InvokeTransactionReceipt,
_transaction_receipt: &TransactionReceipt,
_event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
8 changes: 4 additions & 4 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{BlockWithTxs, Event, InvokeTransactionReceipt, InvokeTransactionV1};
use starknet::core::types::{BlockWithTxs, Event, Transaction, TransactionReceipt};
use starknet::providers::Provider;

use crate::sql::Sql;
Expand Down Expand Up @@ -34,7 +34,7 @@ where
world: &WorldContractReader<P>,
db: &mut Sql,
block: &BlockWithTxs,
invoke_receipt: &InvokeTransactionReceipt,
transaction_receipt: &TransactionReceipt,
event_id: &str,
event: &Event,
) -> Result<(), Error>;
Expand All @@ -53,8 +53,8 @@ pub trait TransactionProcessor<P: Provider + Sync> {
db: &mut Sql,
provider: &P,
block: &BlockWithTxs,
invoke_receipt: &InvokeTransactionReceipt,
transaction: &InvokeTransactionV1,
transaction_receipt: &TransactionReceipt,
transaction: &Transaction,
transaction_id: &str,
) -> Result<(), Error>;
}
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/register_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{BlockWithTxs, Event, InvokeTransactionReceipt};
use starknet::core::types::{BlockWithTxs, Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use tracing::{debug, info};
Expand Down Expand Up @@ -39,7 +39,7 @@ where
world: &WorldContractReader<P>,
db: &mut Sql,
_block: &BlockWithTxs,
_invoke_receipt: &InvokeTransactionReceipt,
_transaction_receipt: &TransactionReceipt,
_event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{BlockWithTxs, Event, InvokeTransactionReceipt};
use starknet::core::types::{BlockWithTxs, Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use tracing::info;
Expand Down Expand Up @@ -40,7 +40,7 @@ where
_world: &WorldContractReader<P>,
db: &mut Sql,
_block: &BlockWithTxs,
_transaction_receipt: &InvokeTransactionReceipt,
_transaction_receipt: &TransactionReceipt,
_event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{BlockWithTxs, Event, InvokeTransactionReceipt};
use starknet::core::types::{BlockWithTxs, Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use tracing::info;
Expand Down Expand Up @@ -40,7 +40,7 @@ where
_world: &WorldContractReader<P>,
db: &mut Sql,
_block: &BlockWithTxs,
_transaction_receipt: &InvokeTransactionReceipt,
_transaction_receipt: &TransactionReceipt,
event_id: &str,
event: &Event,
) -> Result<(), Error> {
Expand Down
6 changes: 3 additions & 3 deletions crates/torii/core/src/processors/store_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use starknet::core::types::{BlockWithTxs, InvokeTransactionReceipt, InvokeTransactionV1};
use starknet::core::types::{BlockWithTxs, Transaction, TransactionReceipt};
use starknet::providers::Provider;

use super::TransactionProcessor;
Expand All @@ -16,8 +16,8 @@ impl<P: Provider + Sync> TransactionProcessor<P> for StoreTransactionProcessor {
db: &mut Sql,
_provider: &P,
_block: &BlockWithTxs,
_receipt: &InvokeTransactionReceipt,
transaction: &InvokeTransactionV1,
_receipt: &TransactionReceipt,
transaction: &Transaction,
transaction_id: &str,
) -> Result<(), Error> {
db.store_transaction(transaction, transaction_id);
Expand Down
50 changes: 40 additions & 10 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use dojo_types::schema::Ty;
use dojo_world::metadata::WorldMetadata;
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use starknet::core::types::{Event, FieldElement, InvokeTransactionV1};
use starknet::core::types::{Event, FieldElement, InvokeTransaction, Transaction};
use starknet_crypto::poseidon_hash_many;

use super::World;
Expand Down Expand Up @@ -238,19 +238,49 @@ impl Sql {
Ok(rows.drain(..).map(|row| serde_json::from_str(&row.2).unwrap()).collect())
}

pub fn store_transaction(&mut self, transaction: &InvokeTransactionV1, transaction_id: &str) {
pub fn store_transaction(&mut self, transaction: &Transaction, transaction_id: &str) {
let id = Argument::String(transaction_id.to_string());
let transaction_hash = Argument::FieldElement(transaction.transaction_hash);
let sender_address = Argument::FieldElement(transaction.sender_address);
let calldata = Argument::String(felts_sql_string(&transaction.calldata));
let max_fee = Argument::FieldElement(transaction.max_fee);
let signature = Argument::String(felts_sql_string(&transaction.signature));
let nonce = Argument::FieldElement(transaction.nonce);

let transaction_type = match transaction {
Transaction::Invoke(_) => "INVOKE",
Transaction::L1Handler(_) => "L1_HANDLER",
_ => return,
};

let (transaction_hash, sender_address, calldata, max_fee, signature, nonce) =
match transaction {
Transaction::Invoke(InvokeTransaction::V1(invoke_v1_transaction)) => (
Argument::FieldElement(invoke_v1_transaction.transaction_hash),
Argument::FieldElement(invoke_v1_transaction.sender_address),
Argument::String(felts_sql_string(&invoke_v1_transaction.calldata)),
Argument::FieldElement(invoke_v1_transaction.max_fee),
Argument::String(felts_sql_string(&invoke_v1_transaction.signature)),
Argument::FieldElement(invoke_v1_transaction.nonce),
),
Transaction::L1Handler(l1_handler_transaction) => (
Argument::FieldElement(l1_handler_transaction.transaction_hash),
Argument::FieldElement(l1_handler_transaction.contract_address),
Argument::String(felts_sql_string(&l1_handler_transaction.calldata)),
Argument::FieldElement(FieldElement::ZERO), // has no max_fee
Argument::String("".to_string()), // has no signature
Argument::FieldElement((l1_handler_transaction.nonce).into()),
),
_ => return,
};

self.query_queue.enqueue(
"INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \
max_fee, signature, nonce) VALUES (?, ?, ?, ?, ?, ?, ?)",
vec![id, transaction_hash, sender_address, calldata, max_fee, signature, nonce],
max_fee, signature, nonce, transaction_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
vec![
id,
transaction_hash,
sender_address,
calldata,
max_fee,
signature,
nonce,
Argument::String(transaction_type.to_string()),
],
);
}

Expand Down
4 changes: 4 additions & 0 deletions crates/torii/migrations/20240207171639_add_txn_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE
transactions
ADD
COLUMN transaction_type TEXT NOT NULL DEFAULT 'INVOKE';

0 comments on commit 7984fb8

Please sign in to comment.