From 13ca28618d3883672c83241c075be0e10121e852 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 13 Sep 2024 13:40:51 -0400 Subject: [PATCH 01/21] feat: first pass at parallelization --- crates/torii/core/src/engine.rs | 73 +++++++++++++++++++------ crates/torii/core/src/processors/mod.rs | 6 +- crates/torii/core/src/sql.rs | 24 +++++++- 3 files changed, 81 insertions(+), 22 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index a12420d5a0..55d8754893 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,9 +1,11 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; +use std::sync::Arc; use std::time::Duration; use anyhow::Result; use dojo_world::contracts::world::WorldContractReader; +use futures_util::future::try_join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, @@ -11,6 +13,7 @@ use starknet::core::types::{ TransactionReceiptWithBlockInfo, TransactionWithReceipt, }; use starknet::providers::Provider; +use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::time::sleep; @@ -24,7 +27,7 @@ use crate::sql::Sql; pub struct Processors { pub block: Vec>>, pub transaction: Vec>>, - pub event: HashMap>>, + pub event: HashMap>>, pub catch_all_event: Box>, } @@ -85,13 +88,14 @@ pub struct FetchPendingResult { #[allow(missing_debug_implementations)] pub struct Engine { - world: WorldContractReader

, - db: Sql, + world: Arc>, + db: Arc, provider: Box

, - processors: Processors

, + processors: Arc>, config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, + tasks: HashMap>, } struct UnprocessedEvent { @@ -109,7 +113,7 @@ impl Engine

{ shutdown_tx: Sender<()>, block_tx: Option>, ) -> Self { - Self { world, db, provider: Box::new(provider), processors, config, shutdown_tx, block_tx } + Self { world: Arc::new(world), db, provider: Box::new(provider), processors: Arc::new(processors), config, shutdown_tx, block_tx, tasks: HashMap::new() } } pub async fn start(&mut self) -> Result<()> { @@ -436,11 +440,30 @@ impl Engine

{ } } - // We return None for the pending_block_tx because our process_range - // gets only specific events from the world. so some transactions - // might get ignored and wont update the cursor. - // so once the sync range is done, we assume all of the tx of the block - // have been processed. + // Process queued tasks in parallel + let tasks: Vec<_> = self.tasks.drain().map(|(task_id, events)| { + let world = self.world.clone(); + let db = self.db.clone(); + let processors = self.processors.clone(); + let block_timestamp = data.blocks[&last_block]; + + tokio::spawn(async move { + for (event_id, event) in events { + if let Some(processor) = processors.event.get(&event.keys[0]) { + if let Err(e) = processor + .process(&world, &mut local_db, last_block, block_timestamp, &event_id, &event) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing queued event."); + } + } + } + Ok::<_, anyhow::Error>(local_db) + }) + }).collect(); + + // We wait for all tasks to complete processing + let results = try_join_all(tasks).await?; self.db.set_head(data.latest_block_number, None, None); self.db.execute().await?; @@ -589,7 +612,7 @@ impl Engine

{ event: &Event, transaction_hash: Felt, ) -> Result<()> { - self.db.store_event(event_id, event, transaction_hash, block_timestamp); + // self.db.store_event(event_id, event, transaction_hash, block_timestamp); let event_key = event.keys[0]; let Some(processor) = self.processors.event.get(&event_key) else { @@ -627,14 +650,28 @@ impl Engine

{ return Ok(()); }; - // if processor.validate(event) { - if let Err(e) = processor - .process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + let task_identifier = match processor.event_key().as_str() { + "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { + poseidon_hash_many(&[event.data[0], event.data[1]]) + } + _ => Felt::ZERO, + }; + + // if we have a task identifier, we queue the event to be parallelized + if task_identifier != Felt::ZERO { + self.tasks + .entry(task_identifier) + .or_insert(vec![]) + .push((event_id.to_string(), event.clone())); + } else { + // if we dont have a task identifier, we process the event immediately + if let Err(e) = processor + .process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + } } - // } Ok(()) } diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index c4a02da631..272513b09d 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -23,7 +23,7 @@ const ENTITY_ID_INDEX: usize = 1; const NUM_KEYS_INDEX: usize = 2; #[async_trait] -pub trait EventProcessor

+pub trait EventProcessor

: Send + Sync where P: Provider + Sync, { @@ -48,7 +48,7 @@ where } #[async_trait] -pub trait BlockProcessor { +pub trait BlockProcessor: Send + Sync { fn get_block_number(&self) -> String; async fn process( &self, @@ -60,7 +60,7 @@ pub trait BlockProcessor { } #[async_trait] -pub trait TransactionProcessor { +pub trait TransactionProcessor: Send + Sync { #[allow(clippy::too_many_arguments)] async fn process( &self, diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index e53a116889..c05cfd09c1 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -13,7 +13,7 @@ use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; -use tracing::debug; +use tracing::{debug, warn}; use crate::cache::{Model, ModelCache}; use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; @@ -40,6 +40,12 @@ pub struct Sql { model_cache: Arc, } +// impl Clone for Sql { +// fn clone(&self) -> Self { +// Self { world_address: self.world_address, pool: self.pool.clone(), query_queue: QueryQueue::new(self.pool.clone()), model_cache: self.model_cache.clone() } +// } +// } + impl Sql { pub async fn new(pool: Pool, world_address: Felt) -> Result { let mut query_queue = QueryQueue::new(pool.clone()); @@ -65,6 +71,22 @@ impl Sql { }) } + // pub fn merge(&mut self, other: Sql) -> Result<()> { + // // Merge query queue + // self.query_queue.queue.extend(other.query_queue.queue); + // self.query_queue.publish_queue.extend(other.query_queue.publish_queue); + + // // This should never happen + // if self.world_address != other.world_address { + // warn!( + // "Merging Sql instances with different world addresses: {} and {}", + // self.world_address, other.world_address + // ); + // } + + // Ok(()) + // } + pub async fn head(&self) -> Result<(u64, Option, Option)> { let mut conn: PoolConnection = self.pool.acquire().await?; let indexer_query = From 4d04d4792514feddc18823819c9f9508e60292cb Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 13 Sep 2024 19:39:55 -0400 Subject: [PATCH 02/21] fix: traits --- bin/torii/src/main.rs | 18 +++++------ crates/torii/core/src/engine.rs | 17 ++++++---- crates/torii/core/src/processors/mod.rs | 5 +-- crates/torii/core/src/sql.rs | 42 ++++++++++++------------- 4 files changed, 44 insertions(+), 38 deletions(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 40a3514fd9..1cdbdc78e6 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -170,19 +170,19 @@ async fn main() -> anyhow::Result<()> { let provider: Arc<_> = JsonRpcClient::new(HttpTransport::new(args.rpc)).into(); // Get world address - let world = WorldContractReader::new(args.world_address, &provider); + let world = WorldContractReader::new(args.world_address, provider.clone()); let db = Sql::new(pool.clone(), args.world_address).await?; let processors = Processors { event: generate_event_processors_map(vec![ - Box::new(RegisterModelProcessor), - Box::new(StoreSetRecordProcessor), - Box::new(MetadataUpdateProcessor), - Box::new(StoreDelRecordProcessor), - Box::new(EventMessageProcessor), - Box::new(StoreUpdateRecordProcessor), - Box::new(StoreUpdateMemberProcessor), + Arc::new(RegisterModelProcessor), + Arc::new(StoreSetRecordProcessor), + Arc::new(MetadataUpdateProcessor), + Arc::new(StoreDelRecordProcessor), + Arc::new(EventMessageProcessor), + Arc::new(StoreUpdateRecordProcessor), + Arc::new(StoreUpdateMemberProcessor), ])?, transaction: vec![Box::new(StoreTransactionProcessor)], ..Processors::default() @@ -193,7 +193,7 @@ async fn main() -> anyhow::Result<()> { let mut engine = Engine::new( world, db.clone(), - &provider, + provider.clone(), processors, EngineConfig { start_block: args.start_block, diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 55d8754893..38d40adf40 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -24,14 +24,14 @@ use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; use crate::sql::Sql; #[allow(missing_debug_implementations)] -pub struct Processors { +pub struct Processors { pub block: Vec>>, pub transaction: Vec>>, pub event: HashMap>>, pub catch_all_event: Box>, } -impl Default for Processors

{ +impl Default for Processors

{ fn default() -> Self { Self { block: vec![], @@ -87,9 +87,9 @@ pub struct FetchPendingResult { } #[allow(missing_debug_implementations)] -pub struct Engine { +pub struct Engine { world: Arc>, - db: Arc, + db: Sql, provider: Box

, processors: Arc>, config: EngineConfig, @@ -103,7 +103,7 @@ struct UnprocessedEvent { data: Vec, } -impl Engine

{ +impl Engine

{ pub fn new( world: WorldContractReader

, db: Sql, @@ -442,12 +442,13 @@ impl Engine

{ // Process queued tasks in parallel let tasks: Vec<_> = self.tasks.drain().map(|(task_id, events)| { - let world = self.world.clone(); let db = self.db.clone(); + let world = self.world.clone(); let processors = self.processors.clone(); let block_timestamp = data.blocks[&last_block]; tokio::spawn(async move { + let mut local_db = db.clone(); for (event_id, event) in events { if let Some(processor) = processors.event.get(&event.keys[0]) { if let Err(e) = processor @@ -464,6 +465,10 @@ impl Engine

{ // We wait for all tasks to complete processing let results = try_join_all(tasks).await?; + for local_db in results { + // We merge the query queues of each task into the main db + self.db.merge(local_db?)?; + } self.db.set_head(data.latest_block_number, None, None); self.db.execute().await?; diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index 272513b09d..c6a8f13af5 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use anyhow::{Error, Result}; use async_trait::async_trait; @@ -75,8 +76,8 @@ pub trait TransactionProcessor: Send + Sync { /// Given a list of event processors, generate a map of event keys to the event processor pub fn generate_event_processors_map( - event_processor: Vec>>, -) -> Result>>> { + event_processor: Vec>>, +) -> Result>>> { let mut event_processors = HashMap::new(); for processor in event_processor { diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index c05cfd09c1..70fd1f5681 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -32,7 +32,7 @@ pub const FELT_DELIMITER: &str = "/"; #[path = "sql_test.rs"] mod test; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Sql { world_address: Felt, pub pool: Pool, @@ -40,11 +40,11 @@ pub struct Sql { model_cache: Arc, } -// impl Clone for Sql { -// fn clone(&self) -> Self { -// Self { world_address: self.world_address, pool: self.pool.clone(), query_queue: QueryQueue::new(self.pool.clone()), model_cache: self.model_cache.clone() } -// } -// } +impl Clone for Sql { + fn clone(&self) -> Self { + Self { world_address: self.world_address, pool: self.pool.clone(), query_queue: QueryQueue::new(self.pool.clone()), model_cache: self.model_cache.clone() } + } +} impl Sql { pub async fn new(pool: Pool, world_address: Felt) -> Result { @@ -71,21 +71,21 @@ impl Sql { }) } - // pub fn merge(&mut self, other: Sql) -> Result<()> { - // // Merge query queue - // self.query_queue.queue.extend(other.query_queue.queue); - // self.query_queue.publish_queue.extend(other.query_queue.publish_queue); - - // // This should never happen - // if self.world_address != other.world_address { - // warn!( - // "Merging Sql instances with different world addresses: {} and {}", - // self.world_address, other.world_address - // ); - // } - - // Ok(()) - // } + pub fn merge(&mut self, other: Sql) -> Result<()> { + // Merge query queue + self.query_queue.queue.extend(other.query_queue.queue); + self.query_queue.publish_queue.extend(other.query_queue.publish_queue); + + // This should never happen + if self.world_address != other.world_address { + warn!( + "Merging Sql instances with different world addresses: {} and {}", + self.world_address, other.world_address + ); + } + + Ok(()) + } pub async fn head(&self) -> Result<(u64, Option, Option)> { let mut conn: PoolConnection = self.pool.acquire().await?; From 42b80ceb4b14f018764f2ea2993ab9712eb6f2e9 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 13 Sep 2024 20:14:17 -0400 Subject: [PATCH 03/21] use semaphore for max concurrent tasks --- bin/torii/src/main.rs | 5 ++++ crates/torii/core/src/engine.rs | 44 +++++++++++++++++++-------------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 1cdbdc78e6..79152d73ae 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -125,6 +125,10 @@ struct Args { /// Polling interval in ms #[arg(long, default_value = "500")] polling_interval: u64, + + /// Max concurrent tasks + #[arg(long, default_value = "100")] + max_concurrent_tasks: usize, } #[tokio::main] @@ -196,6 +200,7 @@ async fn main() -> anyhow::Result<()> { provider.clone(), processors, EngineConfig { + max_concurrent_tasks: args.max_concurrent_tasks, start_block: args.start_block, events_chunk_size: args.events_chunk_size, index_pending: args.index_pending, diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 38d40adf40..e31b70e077 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -5,7 +5,6 @@ use std::time::Duration; use anyhow::Result; use dojo_world::contracts::world::WorldContractReader; -use futures_util::future::try_join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, @@ -14,10 +13,10 @@ use starknet::core::types::{ }; use starknet::providers::Provider; use starknet_crypto::poseidon_hash_many; -use tokio::sync::broadcast::Sender; -use tokio::sync::mpsc::Sender as BoundedSender; +use tokio::sync::{broadcast::Sender, mpsc::Sender as BoundedSender, Semaphore}; use tokio::time::sleep; use tracing::{debug, error, info, trace, warn}; +use tokio::task::JoinSet; use crate::processors::event_message::EventMessageProcessor; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; @@ -51,6 +50,7 @@ pub struct EngineConfig { pub start_block: u64, pub events_chunk_size: u64, pub index_pending: bool, + pub max_concurrent_tasks: usize, } impl Default for EngineConfig { @@ -60,6 +60,7 @@ impl Default for EngineConfig { start_block: 0, events_chunk_size: 1024, index_pending: true, + max_concurrent_tasks: 100, } } } @@ -440,34 +441,41 @@ impl Engine

{ } } - // Process queued tasks in parallel - let tasks: Vec<_> = self.tasks.drain().map(|(task_id, events)| { + // We use a semaphore to limit the number of concurrent tasks + let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); + + // Run all tasks concurrently + let mut set = JoinSet::new(); + for (task_id, events) in self.tasks.drain() { let db = self.db.clone(); let world = self.world.clone(); let processors = self.processors.clone(); let block_timestamp = data.blocks[&last_block]; + let semaphore = semaphore.clone(); - tokio::spawn(async move { + set.spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); let mut local_db = db.clone(); for (event_id, event) in events { if let Some(processor) = processors.event.get(&event.keys[0]) { + debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); + if let Err(e) = processor .process(&world, &mut local_db, last_block, block_timestamp, &event_id, &event) .await { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing queued event."); + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); } } } Ok::<_, anyhow::Error>(local_db) - }) - }).collect(); - - // We wait for all tasks to complete processing - let results = try_join_all(tasks).await?; - for local_db in results { - // We merge the query queues of each task into the main db - self.db.merge(local_db?)?; + }); + } + + // Join all tasks + while let Some(result) = set.join_next().await { + let local_db = result??; + self.db.merge(local_db)?; } self.db.set_head(data.latest_block_number, None, None); @@ -505,7 +513,7 @@ impl Engine

{ block_timestamp, &event_id, &event, - transaction_hash, + // transaction_hash, ) .await?; } @@ -555,7 +563,7 @@ impl Engine

{ block_timestamp, &event_id, event, - *transaction_hash, + // *transaction_hash, ) .await?; } @@ -615,7 +623,7 @@ impl Engine

{ block_timestamp: u64, event_id: &str, event: &Event, - transaction_hash: Felt, + // transaction_hash: Felt, ) -> Result<()> { // self.db.store_event(event_id, event, transaction_hash, block_timestamp); let event_key = event.keys[0]; From ff2d7a32e870f394d3406bf8041af45945224dc0 Mon Sep 17 00:00:00 2001 From: Nasr Date: Sun, 15 Sep 2024 13:02:48 -0400 Subject: [PATCH 04/21] update tests --- crates/torii/core/src/sql_test.rs | 11 ++++++----- crates/torii/graphql/src/tests/mod.rs | 7 ++++--- crates/torii/grpc/src/server/tests/entities_test.rs | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index db60d738ec..f1373fcf3f 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::sync::Arc; use cainome::cairo_serde::ContractAddress; use camino::Utf8PathBuf; @@ -42,11 +43,11 @@ where provider, Processors { event: generate_event_processors_map(vec![ - Box::new(RegisterModelProcessor), - Box::new(StoreSetRecordProcessor), - Box::new(StoreUpdateRecordProcessor), - Box::new(StoreUpdateMemberProcessor), - Box::new(StoreDelRecordProcessor), + Arc::new(RegisterModelProcessor), + Arc::new(StoreSetRecordProcessor), + Arc::new(StoreUpdateRecordProcessor), + Arc::new(StoreUpdateMemberProcessor), + Arc::new(StoreDelRecordProcessor), ])?, ..Processors::default() }, diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index 26ff6870df..d5751402a1 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::sync::Arc; use anyhow::Result; use async_graphql::dynamic::Schema; @@ -354,9 +355,9 @@ pub async fn spinup_types_test() -> Result { account.provider(), Processors { event: generate_event_processors_map(vec![ - Box::new(RegisterModelProcessor), - Box::new(StoreSetRecordProcessor), - Box::new(StoreDelRecordProcessor), + Arc::new(RegisterModelProcessor), + Arc::new(StoreSetRecordProcessor), + Arc::new(StoreDelRecordProcessor), ]) .unwrap(), ..Processors::default() diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index f1d60f80c8..ba35e24b8a 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -104,8 +104,8 @@ async fn test_entities_queries() { Arc::clone(&provider), Processors { event: generate_event_processors_map(vec![ - Box::new(RegisterModelProcessor), - Box::new(StoreSetRecordProcessor), + Arc::new(RegisterModelProcessor), + Arc::new(StoreSetRecordProcessor), ]) .unwrap(), ..Processors::default() From ec4a477c1d8e9ddc5a7f98618e248aea9c12e553 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 13:05:16 -0400 Subject: [PATCH 05/21] dont execute model registers --- crates/torii/core/src/sql.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 70fd1f5681..b73a891616 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -177,7 +177,7 @@ impl Sql { &mut 0, &mut 0, ); - self.execute().await?; + self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered)); Ok(()) From fc694d413df478f922820120c679413978acfc00 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 13:30:16 -0400 Subject: [PATCH 06/21] fix: tests (from lambda) --- crates/torii/core/src/engine.rs | 2 +- crates/torii/core/src/sql_test.rs | 58 ++++---- .../torii/graphql/src/tests/entities_test.rs | 13 +- crates/torii/graphql/src/tests/mod.rs | 14 +- crates/torii/graphql/src/tests/models_test.rs | 130 ++++++++++++++---- 5 files changed, 149 insertions(+), 68 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index e31b70e077..63562ca74b 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -674,7 +674,7 @@ impl Engine

{ if task_identifier != Felt::ZERO { self.tasks .entry(task_identifier) - .or_insert(vec![]) + .or_default() .push((event_id.to_string(), event.clone())); } else { // if we dont have a task identifier, we process the event immediately diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index f1373fcf3f..934e844e3a 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -11,10 +11,11 @@ use dojo_world::contracts::world::{WorldContract, WorldContractReader}; use katana_runner::{KatanaRunner, KatanaRunnerConfig}; use scarb::compiler::Profile; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use starknet::accounts::{Account, ConnectedAccount}; +use starknet::accounts::Account; use starknet::core::types::{Call, Felt}; use starknet::core::utils::{get_contract_address, get_selector_from_name}; -use starknet::providers::Provider; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Provider}; use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast; @@ -33,7 +34,7 @@ pub async fn bootstrap_engine

( provider: P, ) -> Result, Box> where - P: Provider + Send + Sync + core::fmt::Debug, + P: Provider + Send + Sync + core::fmt::Debug + Clone + 'static, { let (shutdown_tx, _) = broadcast::channel(1); let to = provider.block_hash_and_number().await?.block_number; @@ -81,6 +82,7 @@ async fn test_load_from_remote() { let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner."); let account = sequencer.account(0); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); let (strat, _) = prepare_migration_with_world_and_seed( manifest_path, @@ -107,7 +109,7 @@ async fn test_load_from_remote() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); // spawn let tx = &account @@ -120,13 +122,13 @@ async fn test_load_from_remote() { .await .unwrap(); - TransactionWaiter::new(tx.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); - let world_reader = WorldContractReader::new(strat.world_address, account.provider()); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); - let _ = bootstrap_engine(world_reader, db.clone(), account.provider()).await.unwrap(); + let _ = bootstrap_engine(world_reader, db.clone(), provider).await.unwrap(); let _block_timestamp = 1710754478_u64; let models = sqlx::query("SELECT * FROM models").fetch_all(&pool).await.unwrap(); @@ -215,6 +217,7 @@ async fn test_load_from_remote_del() { let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner."); let account = sequencer.account(0); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); let (strat, _) = prepare_migration_with_world_and_seed( manifest_path, @@ -240,7 +243,7 @@ async fn test_load_from_remote_del() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); // spawn let res = account @@ -253,7 +256,7 @@ async fn test_load_from_remote_del() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); // Set player config. let res = account @@ -267,7 +270,7 @@ async fn test_load_from_remote_del() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); let res = account .execute_v1(vec![Call { @@ -279,13 +282,13 @@ async fn test_load_from_remote_del() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); - let world_reader = WorldContractReader::new(strat.world_address, account.provider()); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); - let _ = bootstrap_engine(world_reader, db.clone(), account.provider()).await; + let _ = bootstrap_engine(world_reader, db.clone(), provider).await; assert_eq!(count_table("dojo_examples-PlayerConfig", &pool).await, 0); assert_eq!(count_table("dojo_examples-PlayerConfig$favorite_item", &pool).await, 0); @@ -297,16 +300,13 @@ async fn test_load_from_remote_del() { db.execute().await.unwrap(); } -// Start of Selection #[tokio::test(flavor = "multi_thread")] async fn test_update_with_set_record() { - // Initialize the SQLite in-memory database let options = SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - // Set up the compiler test environment let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); let config = setup.build_test_config("spawn-and-move", Profile::DEV); @@ -314,14 +314,10 @@ async fn test_update_with_set_record() { let manifest_path = Utf8PathBuf::from(config.manifest_path().parent().unwrap()); let target_dir = Utf8PathBuf::from(ws.target_dir().to_string()).join("dev"); - // Configure and start the KatanaRunner let seq_config = KatanaRunnerConfig { n_accounts: 10, ..Default::default() } .with_db_dir(copy_spawn_and_move_db().as_str()); - let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner."); - let account = sequencer.account(0); - // Prepare migration with world and seed let (strat, _) = prepare_migration_with_world_and_seed( manifest_path, target_dir, @@ -339,16 +335,18 @@ async fn test_update_with_set_record() { strat.world_address, ); + let account = sequencer.account(0); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); + let world = WorldContract::new(strat.world_address, &account); - // Grant writer permissions let res = world .grant_writer(&compute_bytearray_hash("dojo_examples"), &ContractAddress(actions_address)) .send_with_cfg(&TxnConfig::init_wait()) .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); // Send spawn transaction let spawn_res = account @@ -361,7 +359,7 @@ async fn test_update_with_set_record() { .await .unwrap(); - TransactionWaiter::new(spawn_res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(spawn_res.transaction_hash, &provider).await.unwrap(); // Send move transaction let move_res = account @@ -374,15 +372,15 @@ async fn test_update_with_set_record() { .await .unwrap(); - TransactionWaiter::new(move_res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(move_res.transaction_hash, &provider).await.unwrap(); - let world_reader = WorldContractReader::new(strat.world_address, account.provider()); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + + let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); - // Expect bootstrap_engine to error out due to the existing bug - let result = bootstrap_engine(world_reader, db.clone(), account.provider()).await; - assert!(result.is_ok(), "bootstrap_engine should not fail"); + db.execute().await.unwrap(); } /// Count the number of rows in a table. @@ -398,4 +396,4 @@ async fn count_table(table_name: &str, pool: &sqlx::Pool) -> i64 { let count: (i64,) = sqlx::query_as(&count_query).fetch_one(pool).await.unwrap(); count.0 -} +} \ No newline at end of file diff --git a/crates/torii/graphql/src/tests/entities_test.rs b/crates/torii/graphql/src/tests/entities_test.rs index 4722a7a26c..f3012c637f 100644 --- a/crates/torii/graphql/src/tests/entities_test.rs +++ b/crates/torii/graphql/src/tests/entities_test.rs @@ -106,8 +106,15 @@ mod tests { let last_entity = connection.edges.last().unwrap(); assert_eq!(connection.edges.len(), 2); assert_eq!(connection.total_count, 2); - assert_eq!(first_entity.node.keys.clone().unwrap(), vec!["0x0", "0x1"]); - assert_eq!(last_entity.node.keys.clone().unwrap(), vec!["0x0"]); + // due to parallelization order is nondeterministic + assert!( + first_entity.node.keys.clone().unwrap() == vec!["0x0", "0x1"] + || first_entity.node.keys.clone().unwrap() == vec!["0x0"] + ); + assert!( + last_entity.node.keys.clone().unwrap() == vec!["0x0", "0x1"] + || last_entity.node.keys.clone().unwrap() == vec!["0x0"] + ); // double key param - returns all entities with `0x0` as first key and `0x1` as second key let entities = entities_query(&schema, "(keys: [\"0x0\", \"0x1\"])").await; @@ -248,4 +255,4 @@ mod tests { assert_eq!(subrecord.subrecord_id, 1); Ok(()) } -} +} \ No newline at end of file diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index d5751402a1..e3814c12cf 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -22,7 +22,8 @@ use sqlx::SqlitePool; use starknet::accounts::{Account, ConnectedAccount}; use starknet::core::types::{Call, Felt, InvokeTransactionResult}; use starknet::macros::selector; -use starknet::providers::Provider; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Provider}; use tokio::sync::broadcast; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, Processors}; @@ -291,6 +292,7 @@ pub async fn spinup_types_test() -> Result { let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner."); let account = sequencer.account(0); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); let (strat, _) = prepare_migration_with_world_and_seed( manifest_path, @@ -329,7 +331,7 @@ pub async fn spinup_types_test() -> Result { .await .unwrap(); - TransactionWaiter::new(transaction_hash, &account.provider()).await?; + TransactionWaiter::new(transaction_hash, &provider).await?; // Execute `delete` and delete Record with id 20 let InvokeTransactionResult { transaction_hash } = account @@ -342,9 +344,9 @@ pub async fn spinup_types_test() -> Result { .await .unwrap(); - TransactionWaiter::new(transaction_hash, &account.provider()).await?; + TransactionWaiter::new(transaction_hash, &provider).await?; - let world = WorldContractReader::new(strat.world_address, account.provider()); + let world = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let db = Sql::new(pool.clone(), strat.world_address).await.unwrap(); @@ -352,7 +354,7 @@ pub async fn spinup_types_test() -> Result { let mut engine = Engine::new( world, db, - account.provider(), + Arc::clone(&provider), Processors { event: generate_event_processors_map(vec![ Arc::new(RegisterModelProcessor), @@ -372,4 +374,4 @@ pub async fn spinup_types_test() -> Result { engine.process_range(data).await.unwrap(); Ok(pool) -} +} \ No newline at end of file diff --git a/crates/torii/graphql/src/tests/models_test.rs b/crates/torii/graphql/src/tests/models_test.rs index 66bda6902b..b7f5f45349 100644 --- a/crates/torii/graphql/src/tests/models_test.rs +++ b/crates/torii/graphql/src/tests/models_test.rs @@ -169,8 +169,11 @@ mod tests { let pool = spinup_types_test().await?; let schema = build_schema(&pool).await.unwrap(); + // we need to order all the records because insertions are done in parallel + // which can have random order // default params, test entity relationship, test nested types - let records = records_model_query(&schema, "").await; + let records = + records_model_query(&schema, "(order: { direction: DESC, field: RECORD_ID })").await; let connection: Connection = serde_json::from_value(records).unwrap(); let record = connection.edges.last().unwrap(); let entity = record.node.entity.as_ref().unwrap(); @@ -193,25 +196,41 @@ mod tests { // *** WHERE FILTER TESTING *** // where filter EQ on record_id - let records = records_model_query(&schema, "(where: { record_id: 0 })").await; + let records = records_model_query( + &schema, + "(where: { record_id: 0 }, order: { direction: DESC, field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); let first_record = connection.edges.first().unwrap(); assert_eq!(connection.total_count, 1); assert_eq!(first_record.node.type_u8, 0); // where filter GTE on u16 - let records = records_model_query(&schema, "(where: { type_u16GTE: 5 })").await; + let records = records_model_query( + &schema, + "(where: { type_u16GTE: 5 }, order: { direction: DESC, field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); assert_eq!(connection.total_count, 5); // where filter LTE on u32 - let records = records_model_query(&schema, "(where: { type_u32LTE: 4 })").await; + let records = records_model_query( + &schema, + "(where: { type_u32LTE: 4 }, order: { direction: DESC, field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); assert_eq!(connection.total_count, 5); // where filter LT and GT - let records = - records_model_query(&schema, "(where: { type_u32GT: 2, type_u16LT: 4 })").await; + let records = records_model_query( + &schema, + "(where: { type_u32GT: 2, type_u16LT: 4 }, order: { direction: DESC, field: RECORD_ID \ + })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); let first_record = connection.edges.first().unwrap(); assert_eq!(first_record.node.type_u16, 3); @@ -224,7 +243,8 @@ mod tests { let records = records_model_query( &schema, &format!( - "(where: {{ type_class_hash: \"{}\", type_contract_address: \"{}\" }})", + "(where: {{ type_class_hash: \"{}\", type_contract_address: \"{}\" }}, order: {{ \ + direction: DESC, field: RECORD_ID }})", felt_str_0x5, felt_int_5 ), ) @@ -234,9 +254,14 @@ mod tests { assert_eq!(first_record.node.type_class_hash, "0x5"); // where filter EQ on u64 (string) - let records = - records_model_query(&schema, &format!("(where: {{ type_u64: \"{}\" }})", felt_str_0x5)) - .await; + let records = records_model_query( + &schema, + &format!( + "(where: {{ type_u64: \"{}\" }}, order: {{ direction: DESC, field: RECORD_ID }})", + felt_str_0x5 + ), + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); let first_record = connection.edges.first().unwrap(); assert_eq!(first_record.node.type_u64, "0x5"); @@ -244,7 +269,11 @@ mod tests { // where filter GTE on u128 (string) let records = records_model_query( &schema, - &format!("(where: {{ type_u128GTE: \"{}\" }})", felt_str_0x5), + &format!( + "(where: {{ type_u128GTE: \"{}\" }}, order: {{ direction: DESC, field: RECORD_ID \ + }})", + felt_str_0x5 + ), ) .await; let connection: Connection = serde_json::from_value(records).unwrap(); @@ -257,7 +286,11 @@ mod tests { // where filter LT on u256 (string) let records = records_model_query( &schema, - &format!("(where: {{ type_u256LT: \"{}\" }})", felt_int_5), + &format!( + "(where: {{ type_u256LT: \"{}\" }}, order: {{ direction: DESC, field: RECORD_ID \ + }})", + felt_int_5 + ), ) .await; let connection: Connection = serde_json::from_value(records).unwrap(); @@ -268,30 +301,42 @@ mod tests { assert_eq!(last_record.node.type_u256, "0x0"); // where filter on true bool - let records = records_model_query(&schema, "(where: { type_bool: true })").await; + let records = records_model_query( + &schema, + "(where: { type_bool: true }, order: { direction: DESC, field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); let first_record = connection.edges.first().unwrap(); assert_eq!(connection.total_count, 5); assert!(first_record.node.type_bool, "should be true"); // where filter on false bool - let records = records_model_query(&schema, "(where: { type_bool: false })").await; + let records = records_model_query( + &schema, + "(where: { type_bool: false }, order: { direction: DESC, field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); let first_record = connection.edges.first().unwrap(); assert_eq!(connection.total_count, 5); assert!(!first_record.node.type_bool, "should be false"); // where filter on In - let records = - records_model_query(&schema, "(where: { type_feltIN: [\"0x5\", \"0x6\", \"0x7\"] })") - .await; + let records = records_model_query( + &schema, + "(where: { type_feltIN: [\"0x5\", \"0x6\", \"0x7\"] }, order: { direction: DESC, \ + field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); assert_eq!(connection.total_count, 3); // where filter on NotIn let records = records_model_query( &schema, - "(where: { type_feltNOTIN: [\"0x5\", \"0x6\", \"0x7\"] })", + "(where: { type_feltNOTIN: [\"0x5\", \"0x6\", \"0x7\"] }, order: { direction: DESC, \ + field: RECORD_ID })", ) .await; let connection: Connection = serde_json::from_value(records).unwrap(); @@ -339,7 +384,11 @@ mod tests { // *** WHERE FILTER + PAGINATION TESTING *** - let records = records_model_query(&schema, "(where: { type_u8GTE: 5 })").await; + let records = records_model_query( + &schema, + "(where: { type_u8GTE: 5 }, order: { field: TYPE_U8, direction: DESC })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); let one = connection.edges.first().unwrap(); let two = connection.edges.get(1).unwrap(); @@ -348,7 +397,11 @@ mod tests { let five = connection.edges.get(4).unwrap(); // cursor based pagination - let records = records_model_query(&schema, "(where: { type_u8GTE: 5 }, first: 2)").await; + let records = records_model_query( + &schema, + "(where: { type_u8GTE: 5 }, first: 2, order: { field: TYPE_U8, direction: DESC })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); let first_record = connection.edges.first().unwrap(); let last_record = connection.edges.last().unwrap(); @@ -359,7 +412,11 @@ mod tests { let records = records_model_query( &schema, - &format!("(where: {{ type_u8GTE: 5 }}, first: 3, after: \"{}\")", last_record.cursor), + &format!( + "(where: {{ type_u8GTE: 5 }}, first: 3, after: \"{}\", order: {{ field: TYPE_U8, \ + direction: DESC }})", + last_record.cursor + ), ) .await; let connection: Connection = serde_json::from_value(records).unwrap(); @@ -371,8 +428,12 @@ mod tests { assert_eq!(second_record, five); // offset/limit base pagination - let records = - records_model_query(&schema, "(where: { type_u8GTE: 5 }, limit: 2, offset: 2)").await; + let records = records_model_query( + &schema, + "(where: { type_u8GTE: 5 }, limit: 2, offset: 2, order: { field: TYPE_U8, direction: \ + DESC })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); let first_record = connection.edges.first().unwrap(); let last_record = connection.edges.last().unwrap(); @@ -425,7 +486,8 @@ mod tests { assert_eq!(connection.total_count, 10); // *** SUBRECORD TESTING *** - let subrecord = subrecord_model_query(&schema, "").await; + let subrecord = + subrecord_model_query(&schema, "(order: { direction: DESC, field: RECORD_ID })").await; let connection: Connection = serde_json::from_value(subrecord).unwrap(); let last_record = connection.edges.first().unwrap(); assert_eq!(last_record.node.record_id, 18); @@ -433,20 +495,32 @@ mod tests { // *** DELETE TESTING *** // where filter EQ on record_id, test Record with id 20 is deleted - let records = records_model_query(&schema, "(where: { record_id: 20 })").await; + let records = records_model_query( + &schema, + "(where: { record_id: 20 }, order: { direction: DESC, field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(records).unwrap(); assert_eq!(connection.edges.len(), 0); // where filter GTE on record_id, test Sibling with id 20 is deleted - let sibling = record_sibling_query(&schema, "(where: { record_id: 20 })").await; + let sibling = record_sibling_query( + &schema, + "(where: { record_id: 20 }, order: { direction: DESC, field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(sibling).unwrap(); assert_eq!(connection.edges.len(), 0); // where filter GTE on record_id, test Subrecord with id 20 is deleted - let subrecord = subrecord_model_query(&schema, "(where: { record_id: 20 })").await; + let subrecord = subrecord_model_query( + &schema, + "(where: { record_id: 20 }, order: { direction: DESC, field: RECORD_ID })", + ) + .await; let connection: Connection = serde_json::from_value(subrecord).unwrap(); assert_eq!(connection.edges.len(), 0); Ok(()) } -} +} \ No newline at end of file From 0503e0763c699b3aa7d7aec07b3c9dc1daaa2b1f Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 13:31:14 -0400 Subject: [PATCH 07/21] fmt --- crates/torii/core/src/engine.rs | 19 +++++++++++++++---- crates/torii/core/src/sql.rs | 9 +++++++-- crates/torii/core/src/sql_test.rs | 2 +- .../torii/graphql/src/tests/entities_test.rs | 2 +- crates/torii/graphql/src/tests/mod.rs | 2 +- crates/torii/graphql/src/tests/models_test.rs | 2 +- 6 files changed, 26 insertions(+), 10 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 63562ca74b..2f4c379247 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -13,10 +13,12 @@ use starknet::core::types::{ }; use starknet::providers::Provider; use starknet_crypto::poseidon_hash_many; -use tokio::sync::{broadcast::Sender, mpsc::Sender as BoundedSender, Semaphore}; +use tokio::sync::broadcast::Sender; +use tokio::sync::mpsc::Sender as BoundedSender; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; use tokio::time::sleep; use tracing::{debug, error, info, trace, warn}; -use tokio::task::JoinSet; use crate::processors::event_message::EventMessageProcessor; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; @@ -114,7 +116,16 @@ impl Engine

{ shutdown_tx: Sender<()>, block_tx: Option>, ) -> Self { - Self { world: Arc::new(world), db, provider: Box::new(provider), processors: Arc::new(processors), config, shutdown_tx, block_tx, tasks: HashMap::new() } + Self { + world: Arc::new(world), + db, + provider: Box::new(provider), + processors: Arc::new(processors), + config, + shutdown_tx, + block_tx, + tasks: HashMap::new(), + } } pub async fn start(&mut self) -> Result<()> { @@ -452,7 +463,7 @@ impl Engine

{ let processors = self.processors.clone(); let block_timestamp = data.blocks[&last_block]; let semaphore = semaphore.clone(); - + set.spawn(async move { let _permit = semaphore.acquire().await.unwrap(); let mut local_db = db.clone(); diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index b73a891616..d1c7708e33 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -42,7 +42,12 @@ pub struct Sql { impl Clone for Sql { fn clone(&self) -> Self { - Self { world_address: self.world_address, pool: self.pool.clone(), query_queue: QueryQueue::new(self.pool.clone()), model_cache: self.model_cache.clone() } + Self { + world_address: self.world_address, + pool: self.pool.clone(), + query_queue: QueryQueue::new(self.pool.clone()), + model_cache: self.model_cache.clone(), + } } } @@ -177,7 +182,7 @@ impl Sql { &mut 0, &mut 0, ); - + self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered)); Ok(()) diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index 934e844e3a..b60ea3de36 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -396,4 +396,4 @@ async fn count_table(table_name: &str, pool: &sqlx::Pool) -> i64 { let count: (i64,) = sqlx::query_as(&count_query).fetch_one(pool).await.unwrap(); count.0 -} \ No newline at end of file +} diff --git a/crates/torii/graphql/src/tests/entities_test.rs b/crates/torii/graphql/src/tests/entities_test.rs index f3012c637f..6138aac846 100644 --- a/crates/torii/graphql/src/tests/entities_test.rs +++ b/crates/torii/graphql/src/tests/entities_test.rs @@ -255,4 +255,4 @@ mod tests { assert_eq!(subrecord.subrecord_id, 1); Ok(()) } -} \ No newline at end of file +} diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index e3814c12cf..503c2cb37e 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -374,4 +374,4 @@ pub async fn spinup_types_test() -> Result { engine.process_range(data).await.unwrap(); Ok(pool) -} \ No newline at end of file +} diff --git a/crates/torii/graphql/src/tests/models_test.rs b/crates/torii/graphql/src/tests/models_test.rs index b7f5f45349..163d9afc41 100644 --- a/crates/torii/graphql/src/tests/models_test.rs +++ b/crates/torii/graphql/src/tests/models_test.rs @@ -523,4 +523,4 @@ mod tests { Ok(()) } -} \ No newline at end of file +} From c852ad0e38d52f77fe106e7eed2e792a9a588e05 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 13:57:53 -0400 Subject: [PATCH 08/21] update cache before db exec --- crates/torii/core/src/cache.rs | 5 +++++ crates/torii/core/src/sql.rs | 11 +++++++++++ 2 files changed, 16 insertions(+) diff --git a/crates/torii/core/src/cache.rs b/crates/torii/core/src/cache.rs index a77c88642f..f5afab2103 100644 --- a/crates/torii/core/src/cache.rs +++ b/crates/torii/core/src/cache.rs @@ -113,6 +113,11 @@ impl ModelCache { Ok(model) } + pub async fn set(&self, selector: Felt, model: Model) { + let mut cache = self.cache.write().await; + cache.insert(selector, model); + } + pub async fn clear(&self) { self.cache.write().await.clear(); } diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index d1c7708e33..2dc4419a26 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -183,6 +183,17 @@ impl Sql { &mut 0, ); + self.model_cache.set(selector, Model { + namespace: namespace.to_string(), + name: model.name().to_string(), + selector, + class_hash: format!("{:#x}", class_hash), + contract_address: format!("{:#x}", contract_address), + packed_size, + unpacked_size, + layout, + schema: model + }); self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered)); Ok(()) From d24866431a719dfc2dc6083fed63263e79adecf8 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 13:59:08 -0400 Subject: [PATCH 09/21] set model --- crates/torii/core/src/sql.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 2dc4419a26..85eae9f932 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -187,13 +187,13 @@ impl Sql { namespace: namespace.to_string(), name: model.name().to_string(), selector, - class_hash: format!("{:#x}", class_hash), - contract_address: format!("{:#x}", contract_address), + class_hash, + contract_address, packed_size, unpacked_size, layout, schema: model - }); + }).await; self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered)); Ok(()) From 52bb33f31a518102121c43e92764b027aa8d2a81 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 14:11:35 -0400 Subject: [PATCH 10/21] refacotr: cache --- crates/torii/core/src/cache.rs | 5 +++++ crates/torii/core/src/sql.rs | 29 +++++++++++++++++------------ 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/crates/torii/core/src/cache.rs b/crates/torii/core/src/cache.rs index f5afab2103..fe184f4407 100644 --- a/crates/torii/core/src/cache.rs +++ b/crates/torii/core/src/cache.rs @@ -115,6 +115,11 @@ impl ModelCache { pub async fn set(&self, selector: Felt, model: Model) { let mut cache = self.cache.write().await; + + // we need to update the name of the struct to include the namespace + let mut schema = model.schema.clone().as_struct().unwrap(); + schema.name = format!("{}-{}", model.namespace, model.name); + model.schema = Ty::Struct(schema); cache.insert(selector, model); } diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 85eae9f932..11f5090191 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use chrono::Utc; use dojo_types::primitive::Primitive; -use dojo_types::schema::{EnumOption, Member, Ty}; +use dojo_types::schema::{EnumOption, Member, Struct, Ty}; use dojo_world::contracts::abi::model::Layout; use dojo_world::contracts::naming::compute_selector_from_names; use dojo_world::metadata::WorldMetadata; @@ -183,17 +183,22 @@ impl Sql { &mut 0, ); - self.model_cache.set(selector, Model { - namespace: namespace.to_string(), - name: model.name().to_string(), - selector, - class_hash, - contract_address, - packed_size, - unpacked_size, - layout, - schema: model - }).await; + self.model_cache + .set( + selector, + Model { + namespace: namespace.to_string(), + name: model.name().to_string(), + selector, + class_hash, + contract_address, + packed_size, + unpacked_size, + layout, + schema: model, + }, + ) + .await; self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered)); Ok(()) From f826735f164974704d4ab7873bb7af9f6409b621 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 14:15:00 -0400 Subject: [PATCH 11/21] fmt --- crates/torii/core/src/cache.rs | 5 ----- crates/torii/core/src/sql.rs | 14 +++++++++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/torii/core/src/cache.rs b/crates/torii/core/src/cache.rs index fe184f4407..f5afab2103 100644 --- a/crates/torii/core/src/cache.rs +++ b/crates/torii/core/src/cache.rs @@ -115,11 +115,6 @@ impl ModelCache { pub async fn set(&self, selector: Felt, model: Model) { let mut cache = self.cache.write().await; - - // we need to update the name of the struct to include the namespace - let mut schema = model.schema.clone().as_struct().unwrap(); - schema.name = format!("{}-{}", model.namespace, model.name); - model.schema = Ty::Struct(schema); cache.insert(selector, model); } diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 11f5090191..19423043be 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -149,7 +149,8 @@ impl Sql { unpacked_size: u32, block_timestamp: u64, ) -> Result<()> { - let selector = compute_selector_from_names(namespace, &model.name()); + let namespaced_name = format!("{}-{}", namespace, model.name()); + let selector = compute_selector_from_names(namespace, &namespaced_name); let insert_models = "INSERT INTO models (id, namespace, name, class_hash, contract_address, layout, \ @@ -176,13 +177,15 @@ impl Sql { self.build_register_queries_recursive( selector, &model, - vec![format!("{}-{}", namespace, model.name())], + vec![namespaced_name], &mut model_idx, block_timestamp, &mut 0, &mut 0, ); + // we set the model in the cache directly + // because entities might be using it before the query queue is processed self.model_cache .set( selector, @@ -195,7 +198,11 @@ impl Sql { packed_size, unpacked_size, layout, - schema: model, + // we need to update the name of the struct to include the namespace + schema: Ty::Struct(Struct { + name: namespaced_name, + children: model.as_struct().unwrap().children.clone(), + }), }, ) .await; @@ -277,6 +284,7 @@ impl Sql { return Err(anyhow!("Entity is not a struct")); }; + println!("entity: {:?}", entity); let namespaced_name = entity.name(); let (model_namespace, model_name) = namespaced_name.split_once('-').unwrap(); From 543f3cc4e7b887c6039e35fcff5497ab69e2d1e3 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 14:15:46 -0400 Subject: [PATCH 12/21] c --- crates/torii/core/src/sql.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 19423043be..eb63893b4d 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -177,7 +177,7 @@ impl Sql { self.build_register_queries_recursive( selector, &model, - vec![namespaced_name], + vec![namespaced_name.clone()], &mut model_idx, block_timestamp, &mut 0, @@ -284,7 +284,6 @@ impl Sql { return Err(anyhow!("Entity is not a struct")); }; - println!("entity: {:?}", entity); let namespaced_name = entity.name(); let (model_namespace, model_name) = namespaced_name.split_once('-').unwrap(); From 59102ad9103dcd18bf7e16f2e9bc8df7fceaf145 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 14:19:32 -0400 Subject: [PATCH 13/21] fix --- crates/torii/core/src/sql.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index eb63893b4d..bb518b1bc2 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -149,8 +149,8 @@ impl Sql { unpacked_size: u32, block_timestamp: u64, ) -> Result<()> { + let selector = compute_selector_from_names(namespace, &model.name()); let namespaced_name = format!("{}-{}", namespace, model.name()); - let selector = compute_selector_from_names(namespace, &namespaced_name); let insert_models = "INSERT INTO models (id, namespace, name, class_hash, contract_address, layout, \ From 29a9812135fa0656bb30c18e33ee2ba0b07873f1 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 15:35:10 -0400 Subject: [PATCH 14/21] fix: test --- crates/torii/graphql/src/tests/subscription_test.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index 363082878a..4cfa28cf4c 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -24,6 +24,7 @@ mod tests { let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); model_fixtures(&mut db).await; + db.execute().await.unwrap(); // 0. Preprocess expected entity value let namespace = "types_test".to_string(); let model_name = "Record".to_string(); From 6e9469b808292447deafde68601997c35d2d87b9 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 16:29:33 -0400 Subject: [PATCH 15/21] fix: tests --- crates/torii/graphql/src/tests/mod.rs | 2 ++ crates/torii/graphql/src/tests/subscription_test.rs | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index 503c2cb37e..133b46075e 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -270,6 +270,8 @@ pub async fn model_fixtures(db: &mut Sql) { ) .await .unwrap(); + + db.execute().await.unwrap(); } pub async fn spinup_types_test() -> Result { diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index 4cfa28cf4c..363082878a 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -24,7 +24,6 @@ mod tests { let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); model_fixtures(&mut db).await; - db.execute().await.unwrap(); // 0. Preprocess expected entity value let namespace = "types_test".to_string(); let model_name = "Record".to_string(); From ac7f1b8397d061f23eedc921e30efc9976c51fb2 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 16:32:08 -0400 Subject: [PATCH 16/21] execute register model --- crates/torii/libp2p/src/tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index 552b240590..7ef1472068 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -588,6 +588,7 @@ mod test { ) .await .unwrap(); + db.execute().await.unwrap(); // Initialize the relay server let mut relay_server = Relay::new(db, provider, 9900, 9901, 9902, None, None)?; From e3a2757d025ea5e7ba8a143cd08a2fa7bb95ed7c Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 16:56:32 -0400 Subject: [PATCH 17/21] pending block & timestamp fix --- crates/torii/core/src/engine.rs | 43 ++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 2f4c379247..b9fa3e0901 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -89,6 +89,14 @@ pub struct FetchPendingResult { pub block_number: u64, } +#[derive(Debug)] +pub struct ParallelizedEvent { + pub block_number: u64, + pub block_timestamp: u64, + pub event_id: String, + pub event: Event, +} + #[allow(missing_debug_implementations)] pub struct Engine { world: Arc>, @@ -98,7 +106,7 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tasks: HashMap>, + tasks: HashMap>, } struct UnprocessedEvent { @@ -413,11 +421,14 @@ impl Engine

{ } } + // Process parallelized events + self.process_tasks().await?; + // Set the head to the last processed pending transaction // Head block number should still be latest block number self.db.set_head(data.block_number - 1, last_pending_block_world_tx, last_pending_block_tx); - self.db.execute().await?; + Ok(()) } @@ -452,6 +463,16 @@ impl Engine

{ } } + // Process parallelized events + self.process_tasks().await?; + + self.db.set_head(data.latest_block_number, None, None); + self.db.execute().await?; + + Ok(()) + } + + async fn process_tasks(&mut self) -> Result<()> { // We use a semaphore to limit the number of concurrent tasks let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); @@ -461,18 +482,17 @@ impl Engine

{ let db = self.db.clone(); let world = self.world.clone(); let processors = self.processors.clone(); - let block_timestamp = data.blocks[&last_block]; let semaphore = semaphore.clone(); set.spawn(async move { let _permit = semaphore.acquire().await.unwrap(); let mut local_db = db.clone(); - for (event_id, event) in events { + for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events { if let Some(processor) = processors.event.get(&event.keys[0]) { debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); if let Err(e) = processor - .process(&world, &mut local_db, last_block, block_timestamp, &event_id, &event) + .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event) .await { error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); @@ -489,9 +509,6 @@ impl Engine

{ self.db.merge(local_db)?; } - self.db.set_head(data.latest_block_number, None, None); - self.db.execute().await?; - Ok(()) } @@ -683,10 +700,12 @@ impl Engine

{ // if we have a task identifier, we queue the event to be parallelized if task_identifier != Felt::ZERO { - self.tasks - .entry(task_identifier) - .or_default() - .push((event_id.to_string(), event.clone())); + self.tasks.entry(task_identifier).or_default().push(ParallelizedEvent { + event_id: event_id.to_string(), + event: event.clone(), + block_number, + block_timestamp, + }); } else { // if we dont have a task identifier, we process the event immediately if let Err(e) = processor From f60ef4fae199ebef36583c27cc3433967503dca7 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 20:55:43 -0400 Subject: [PATCH 18/21] use WAL journal mode & switch to normal syncrhonous mode --- bin/torii/src/main.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 79152d73ae..00a8bdd40f 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -161,13 +161,10 @@ async fn main() -> anyhow::Result<()> { .connect_with(options) .await?; - if args.database == ":memory:" { - // Disable auto-vacuum - sqlx::query("PRAGMA auto_vacuum = NONE;").execute(&pool).await?; - - // Switch DELETE journal mode - sqlx::query("PRAGMA journal_mode=DELETE;").execute(&pool).await?; - } + // Disable auto-vacuum + sqlx::query("PRAGMA auto_vacuum = NONE;").execute(&pool).await?; + sqlx::query("PRAGMA journal_mode = WAL;").execute(&pool).await?; + sqlx::query("PRAGMA synchronous = NORMAL;").execute(&pool).await?; sqlx::migrate!("../../crates/torii/migrations").run(&pool).await?; From 5ffe50e9362d1d9d7642ba132815074ed99c3ea9 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 21:09:23 -0400 Subject: [PATCH 19/21] set number of threads to cpu count --- bin/torii/src/main.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 00a8bdd40f..53a027fbff 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -166,6 +166,10 @@ async fn main() -> anyhow::Result<()> { sqlx::query("PRAGMA journal_mode = WAL;").execute(&pool).await?; sqlx::query("PRAGMA synchronous = NORMAL;").execute(&pool).await?; + // Set the number of threads based on CPU count + let cpu_count = std::thread::available_parallelism().unwrap().get(); + sqlx::query(&format!("PRAGMA threads = {};", cpu_count)).execute(&pool).await?; + sqlx::migrate!("../../crates/torii/migrations").run(&pool).await?; let provider: Arc<_> = JsonRpcClient::new(HttpTransport::new(args.rpc)).into(); From 3e297f96807e8877f5d1b3278fdb971f68a2162e Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 21:14:08 -0400 Subject: [PATCH 20/21] cap threads to 8 --- bin/torii/src/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 53a027fbff..cdac62dc54 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -10,6 +10,7 @@ //! documentation for usage details. This is **not recommended on Windows**. See [here](https://rust-lang.github.io/rfcs/1974-global-allocators.html#jemalloc) //! for more info. +use std::cmp; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; @@ -168,7 +169,8 @@ async fn main() -> anyhow::Result<()> { // Set the number of threads based on CPU count let cpu_count = std::thread::available_parallelism().unwrap().get(); - sqlx::query(&format!("PRAGMA threads = {};", cpu_count)).execute(&pool).await?; + let thread_count = cmp::min(cpu_count, 8); + sqlx::query(&format!("PRAGMA threads = {};", thread_count)).execute(&pool).await?; sqlx::migrate!("../../crates/torii/migrations").run(&pool).await?; From 663ed6a3f73e1f84e7e7ae3c446c2c467ba24b01 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 16 Sep 2024 21:25:19 -0400 Subject: [PATCH 21/21] refactor: use defaulthahser in favor of poseidon --- crates/torii/core/src/engine.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index b9fa3e0901..8b2f5685e9 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use std::time::Duration; @@ -12,7 +13,6 @@ use starknet::core::types::{ TransactionReceiptWithBlockInfo, TransactionWithReceipt, }; use starknet::providers::Provider; -use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; @@ -106,7 +106,7 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tasks: HashMap>, + tasks: HashMap>, } struct UnprocessedEvent { @@ -693,13 +693,16 @@ impl Engine

{ let task_identifier = match processor.event_key().as_str() { "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { - poseidon_hash_many(&[event.data[0], event.data[1]]) + let mut hasher = DefaultHasher::new(); + event.data[0].hash(&mut hasher); + event.data[1].hash(&mut hasher); + hasher.finish() } - _ => Felt::ZERO, + _ => 0, }; // if we have a task identifier, we queue the event to be parallelized - if task_identifier != Felt::ZERO { + if task_identifier != 0 { self.tasks.entry(task_identifier).or_default().push(ParallelizedEvent { event_id: event_id.to_string(), event: event.clone(),