diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index a12420d5a0..55d8754893 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,9 +1,11 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; +use std::sync::Arc; use std::time::Duration; use anyhow::Result; use dojo_world::contracts::world::WorldContractReader; +use futures_util::future::try_join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, @@ -11,6 +13,7 @@ use starknet::core::types::{ TransactionReceiptWithBlockInfo, TransactionWithReceipt, }; use starknet::providers::Provider; +use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::time::sleep; @@ -24,7 +27,7 @@ use crate::sql::Sql; pub struct Processors { pub block: Vec>>, pub transaction: Vec>>, - pub event: HashMap>>, + pub event: HashMap>>, pub catch_all_event: Box>, } @@ -85,13 +88,14 @@ pub struct FetchPendingResult { #[allow(missing_debug_implementations)] pub struct Engine { - world: WorldContractReader

, - db: Sql, + world: Arc>, + db: Arc, provider: Box

, - processors: Processors

, + processors: Arc>, config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, + tasks: HashMap>, } struct UnprocessedEvent { @@ -109,7 +113,7 @@ impl Engine

{ shutdown_tx: Sender<()>, block_tx: Option>, ) -> Self { - Self { world, db, provider: Box::new(provider), processors, config, shutdown_tx, block_tx } + Self { world: Arc::new(world), db, provider: Box::new(provider), processors: Arc::new(processors), config, shutdown_tx, block_tx, tasks: HashMap::new() } } pub async fn start(&mut self) -> Result<()> { @@ -436,11 +440,30 @@ impl Engine

{ } } - // We return None for the pending_block_tx because our process_range - // gets only specific events from the world. so some transactions - // might get ignored and wont update the cursor. - // so once the sync range is done, we assume all of the tx of the block - // have been processed. + // Process queued tasks in parallel + let tasks: Vec<_> = self.tasks.drain().map(|(task_id, events)| { + let world = self.world.clone(); + let db = self.db.clone(); + let processors = self.processors.clone(); + let block_timestamp = data.blocks[&last_block]; + + tokio::spawn(async move { + for (event_id, event) in events { + if let Some(processor) = processors.event.get(&event.keys[0]) { + if let Err(e) = processor + .process(&world, &mut local_db, last_block, block_timestamp, &event_id, &event) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing queued event."); + } + } + } + Ok::<_, anyhow::Error>(local_db) + }) + }).collect(); + + // We wait for all tasks to complete processing + let results = try_join_all(tasks).await?; self.db.set_head(data.latest_block_number, None, None); self.db.execute().await?; @@ -589,7 +612,7 @@ impl Engine

{ event: &Event, transaction_hash: Felt, ) -> Result<()> { - self.db.store_event(event_id, event, transaction_hash, block_timestamp); + // self.db.store_event(event_id, event, transaction_hash, block_timestamp); let event_key = event.keys[0]; let Some(processor) = self.processors.event.get(&event_key) else { @@ -627,14 +650,28 @@ impl Engine

{ return Ok(()); }; - // if processor.validate(event) { - if let Err(e) = processor - .process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + let task_identifier = match processor.event_key().as_str() { + "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { + poseidon_hash_many(&[event.data[0], event.data[1]]) + } + _ => Felt::ZERO, + }; + + // if we have a task identifier, we queue the event to be parallelized + if task_identifier != Felt::ZERO { + self.tasks + .entry(task_identifier) + .or_insert(vec![]) + .push((event_id.to_string(), event.clone())); + } else { + // if we dont have a task identifier, we process the event immediately + if let Err(e) = processor + .process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + } } - // } Ok(()) } diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index c4a02da631..272513b09d 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -23,7 +23,7 @@ const ENTITY_ID_INDEX: usize = 1; const NUM_KEYS_INDEX: usize = 2; #[async_trait] -pub trait EventProcessor

+pub trait EventProcessor

: Send + Sync where P: Provider + Sync, { @@ -48,7 +48,7 @@ where } #[async_trait] -pub trait BlockProcessor { +pub trait BlockProcessor: Send + Sync { fn get_block_number(&self) -> String; async fn process( &self, @@ -60,7 +60,7 @@ pub trait BlockProcessor { } #[async_trait] -pub trait TransactionProcessor { +pub trait TransactionProcessor: Send + Sync { #[allow(clippy::too_many_arguments)] async fn process( &self, diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index e53a116889..c05cfd09c1 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -13,7 +13,7 @@ use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; -use tracing::debug; +use tracing::{debug, warn}; use crate::cache::{Model, ModelCache}; use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; @@ -40,6 +40,12 @@ pub struct Sql { model_cache: Arc, } +// impl Clone for Sql { +// fn clone(&self) -> Self { +// Self { world_address: self.world_address, pool: self.pool.clone(), query_queue: QueryQueue::new(self.pool.clone()), model_cache: self.model_cache.clone() } +// } +// } + impl Sql { pub async fn new(pool: Pool, world_address: Felt) -> Result { let mut query_queue = QueryQueue::new(pool.clone()); @@ -65,6 +71,22 @@ impl Sql { }) } + // pub fn merge(&mut self, other: Sql) -> Result<()> { + // // Merge query queue + // self.query_queue.queue.extend(other.query_queue.queue); + // self.query_queue.publish_queue.extend(other.query_queue.publish_queue); + + // // This should never happen + // if self.world_address != other.world_address { + // warn!( + // "Merging Sql instances with different world addresses: {} and {}", + // self.world_address, other.world_address + // ); + // } + + // Ok(()) + // } + pub async fn head(&self) -> Result<(u64, Option, Option)> { let mut conn: PoolConnection = self.pool.acquire().await?; let indexer_query =