Skip to content

Commit

Permalink
feat(torii-core): bitflags for indexing (#2450)
Browse files Browse the repository at this point in the history
* feat(torii-core): bitflags for indexing

* Disabled by default

* index raw events by default
  • Loading branch information
Larkooo authored Sep 19, 2024
1 parent 5777c00 commit 82a23a5
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 51 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 19 additions & 2 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use starknet::providers::JsonRpcClient;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio_stream::StreamExt;
use torii_core::engine::{Engine, EngineConfig, Processors};
use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors};
use torii_core::processors::event_message::EventMessageProcessor;
use torii_core::processors::generate_event_processors_map;
use torii_core::processors::metadata_update::MetadataUpdateProcessor;
Expand Down Expand Up @@ -132,6 +132,14 @@ struct Args {
/// Max concurrent tasks
#[arg(long, default_value = "100")]
max_concurrent_tasks: usize,

/// Whether or not to index world transactions
#[arg(long, action = ArgAction::Set, default_value_t = false)]
index_transactions: bool,

/// Whether or not to index raw events
#[arg(long, action = ArgAction::Set, default_value_t = true)]
index_raw_events: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -195,7 +203,15 @@ async fn main() -> anyhow::Result<()> {

let (block_tx, block_rx) = tokio::sync::mpsc::channel(100);

let mut engine = Engine::new(
let mut flags = IndexingFlags::empty();
if args.index_transactions {
flags.insert(IndexingFlags::TRANSACTIONS);
}
if args.index_raw_events {
flags.insert(IndexingFlags::RAW_EVENTS);
}

let mut engine: Engine<Arc<JsonRpcClient<HttpTransport>>> = Engine::new(
world,
db.clone(),
provider.clone(),
Expand All @@ -206,6 +222,7 @@ async fn main() -> anyhow::Result<()> {
events_chunk_size: args.events_chunk_size,
index_pending: args.index_pending,
polling_interval: Duration::from_millis(args.polling_interval),
flags,
},
shutdown_tx.clone(),
Some(block_tx),
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ tokio = { version = "1.32.0", features = [ "sync" ], default-features = true }
tokio-stream = "0.1.11"
tokio-util = "0.7.7"
tracing.workspace = true
clap.workspace = true
bitflags = "2.6.0"

[dev-dependencies]
camino.workspace = true
Expand Down
119 changes: 70 additions & 49 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use bitflags::bitflags;
use dojo_world::contracts::world::WorldContractReader;
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts,
MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt,
TransactionReceiptWithBlockInfo, TransactionWithReceipt,
MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, Transaction,
TransactionReceipt, TransactionReceiptWithBlockInfo, TransactionWithReceipt,
};
use starknet::providers::Provider;
use tokio::sync::broadcast::Sender;
Expand Down Expand Up @@ -46,13 +47,22 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Default for Processo
pub(crate) const LOG_TARGET: &str = "torii_core::engine";
pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000;

bitflags! {
#[derive(Debug, Clone)]
pub struct IndexingFlags: u32 {
const TRANSACTIONS = 0b00000001;
const RAW_EVENTS = 0b00000010;
}
}

#[derive(Debug)]
pub struct EngineConfig {
pub polling_interval: Duration,
pub start_block: u64,
pub events_chunk_size: u64,
pub index_pending: bool,
pub max_concurrent_tasks: usize,
pub flags: IndexingFlags,
}

impl Default for EngineConfig {
Expand All @@ -63,6 +73,7 @@ impl Default for EngineConfig {
events_chunk_size: 1024,
index_pending: true,
max_concurrent_tasks: 100,
flags: IndexingFlags::empty(),
}
}
}
Expand Down Expand Up @@ -447,13 +458,18 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
for ((block_number, transaction_hash), events) in data.transactions {
debug!("Processing transaction hash: {:#x}", transaction_hash);
// Process transaction
// let transaction = self.provider.get_transaction_by_hash(transaction_hash).await?;
let transaction = if self.config.flags.contains(IndexingFlags::TRANSACTIONS) {
Some(self.provider.get_transaction_by_hash(transaction_hash).await?)
} else {
None
};

self.process_transaction_with_events(
transaction_hash,
events.as_slice(),
block_number,
data.blocks[&block_number],
transaction,
)
.await?;

Expand Down Expand Up @@ -537,6 +553,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
events: &[EmittedEvent],
block_number: u64,
block_timestamp: u64,
transaction: Option<Transaction>,
) -> Result<()> {
for (event_idx, event) in events.iter().enumerate() {
let event_id =
Expand All @@ -553,24 +570,25 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
block_timestamp,
&event_id,
&event,
// transaction_hash,
transaction_hash,
)
.await?;
}

// Commented out this transaction processor because it requires an RPC call for each
// transaction which is slowing down the sync process by alot.
// Self::process_transaction(
// self,
// block_number,
// block_timestamp,
// transaction_hash,
// transaction,
// )
// .await?;
if let Some(ref transaction) = transaction {
Self::process_transaction(
self,
block_number,
block_timestamp,
transaction_hash,
transaction,
)
.await?;
}

Ok(())
}

// Process a transaction and events from its receipt.
// Returns whether the transaction has a world event.
async fn process_transaction_with_receipt(
Expand Down Expand Up @@ -603,21 +621,21 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
block_timestamp,
&event_id,
event,
// *transaction_hash,
*transaction_hash,
)
.await?;
}

// if world_event {
// Self::process_transaction(
// self,
// block_number,
// block_timestamp,
// transaction_hash,
// transaction,
// )
// .await?;
// }
if world_event && self.config.flags.contains(IndexingFlags::TRANSACTIONS) {
Self::process_transaction(
self,
block_number,
block_timestamp,
*transaction_hash,
&transaction_with_receipt.transaction,
)
.await?;
}
}

Ok(world_event)
Expand All @@ -634,38 +652,41 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
Ok(())
}

// async fn process_transaction(
// &mut self,
// block_number: u64,
// block_timestamp: u64,
// transaction_hash: Felt,
// transaction: &Transaction,
// ) -> Result<()> {
// for processor in &self.processors.transaction {
// processor
// .process(
// &mut self.db,
// self.provider.as_ref(),
// block_number,
// block_timestamp,
// transaction_hash,
// transaction,
// )
// .await?
// }

// Ok(())
// }
async fn process_transaction(
&mut self,
block_number: u64,
block_timestamp: u64,
transaction_hash: Felt,
transaction: &Transaction,
) -> Result<()> {
for processor in &self.processors.transaction {
processor
.process(
&mut self.db,
self.provider.as_ref(),
block_number,
block_timestamp,
transaction_hash,
transaction,
)
.await?
}

Ok(())
}

async fn process_event(
&mut self,
block_number: u64,
block_timestamp: u64,
event_id: &str,
event: &Event,
// transaction_hash: Felt,
transaction_hash: Felt,
) -> Result<()> {
// self.db.store_event(event_id, event, transaction_hash, block_timestamp);
if self.config.flags.contains(IndexingFlags::RAW_EVENTS) {
self.db.store_event(event_id, event, transaction_hash, block_timestamp);
}

let event_key = event.keys[0];

let Some(processor) = self.processors.event.get(&event_key) else {
Expand Down

0 comments on commit 82a23a5

Please sign in to comment.