From d1990c0dce3233f162c45035250605d8f374b4ca Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 24 Sep 2024 13:28:25 -0400 Subject: [PATCH] refactor(torii-core): enqueue models & events --- crates/torii/core/src/query_queue.rs | 26 +++++++++++ crates/torii/core/src/sql.rs | 70 +++++++++++----------------- 2 files changed, 53 insertions(+), 43 deletions(-) diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/query_queue.rs index 5dfad77113..c971617c84 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/query_queue.rs @@ -49,6 +49,9 @@ pub struct DeleteEntityQuery { pub enum QueryType { SetEntity(Ty), DeleteEntity(DeleteEntityQuery), + EventMessage(Ty), + RegisterModel, + StoreEvent, Other, } @@ -137,6 +140,29 @@ impl QueryQueue { let broker_message = BrokerMessage::EntityUpdated(entity_updated); self.push_publish(broker_message); } + QueryType::RegisterModel => { + let row = query.fetch_one(&mut *tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let model_registered = ModelRegistered::from_row(&row)?; + self.push_publish(BrokerMessage::ModelRegistered(model_registered)); + } + QueryType::EventMessage(entity) => { + let row = query.fetch_one(&mut *tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let mut event_message = EventMessageUpdated::from_row(&row)?; + event_message.updated_model = Some(entity); + let broker_message = BrokerMessage::EventMessageUpdated(event_message); + self.push_publish(broker_message); + } + QueryType::StoreEvent => { + let row = query.fetch_one(&mut *tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let event = EventEmitted::from_row(&row)?; + self.push_publish(BrokerMessage::EventEmitted(event)); + } QueryType::Other => { query.execute(&mut *tx).await.with_context(|| { format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 249a3c4fef..e9ad5e56f3 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -3,7 +3,6 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Result}; -use chrono::Utc; use dojo_types::primitive::Primitive; use dojo_types::schema::{EnumOption, Member, Struct, Ty}; use dojo_world::contracts::abi::model::Layout; @@ -16,11 +15,8 @@ use starknet_crypto::poseidon_hash_many; use tracing::{debug, warn}; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; -use crate::types::{ - Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, -}; -use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; +use crate::query_queue::{Argument, DeleteEntityQuery, QueryQueue, QueryType}; +use crate::utils::utc_dt_string_from_timestamp; type IsEventMessage = bool; type IsStoreUpdate = bool; @@ -173,19 +169,20 @@ impl Sql { class_hash=EXCLUDED.class_hash, layout=EXCLUDED.layout, \ packed_size=EXCLUDED.packed_size, unpacked_size=EXCLUDED.unpacked_size, \ executed_at=EXCLUDED.executed_at RETURNING *"; - let model_registered: ModelRegistered = sqlx::query_as(insert_models) - // this is temporary until the model hash is precomputed - .bind(format!("{:#x}", selector)) - .bind(namespace) - .bind(model.name()) - .bind(format!("{class_hash:#x}")) - .bind(format!("{contract_address:#x}")) - .bind(serde_json::to_string(&layout)?) - .bind(packed_size) - .bind(unpacked_size) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .fetch_one(&self.pool) - .await?; + + let arguments = vec![ + Argument::String(format!("{:#x}", selector)), + Argument::String(namespace.to_string()), + Argument::String(model.name().to_string()), + Argument::String(format!("{class_hash:#x}")), + Argument::String(format!("{contract_address:#x}")), + Argument::String(serde_json::to_string(&layout)?), + Argument::Int(packed_size as i64), + Argument::Int(unpacked_size as i64), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ]; + + self.query_queue.enqueue(insert_models, arguments, QueryType::RegisterModel); let mut model_idx = 0_i64; self.build_register_queries_recursive( @@ -220,7 +217,6 @@ impl Sql { }, ) .await; - self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered)); Ok(()) } @@ -316,15 +312,16 @@ impl Sql { VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ event_id=EXCLUDED.event_id RETURNING *"; - let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities) - .bind(&entity_id) - .bind(&keys_str) - .bind(event_id) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .fetch_one(&self.pool) - .await?; - - event_message_updated.updated_model = Some(entity.clone()); + self.query_queue.enqueue( + insert_entities, + vec![ + Argument::String(entity_id.clone()), + Argument::String(keys_str), + Argument::String(event_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + QueryType::Other, + ); let path = vec![namespaced_name]; self.build_set_entity_queries_recursive( @@ -336,8 +333,6 @@ impl Sql { &vec![], ); - self.query_queue.push_publish(BrokerMessage::EventMessageUpdated(event_message_updated)); - Ok(()) } @@ -504,19 +499,8 @@ impl Sql { "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \ (?, ?, ?, ?, ?)", vec![id, keys, data, hash, executed_at], - QueryType::Other, + QueryType::StoreEvent, ); - - let emitted = EventEmitted { - id: event_id.to_string(), - keys: felts_sql_string(&event.keys), - data: felts_sql_string(&event.data), - transaction_hash: format!("{:#x}", transaction_hash), - created_at: Utc::now(), - executed_at: must_utc_datetime_from_timestamp(block_timestamp), - }; - - self.query_queue.push_publish(BrokerMessage::EventEmitted(emitted)); } #[allow(clippy::too_many_arguments)]