diff --git a/crates/torii/core/src/processors/event_message.rs b/crates/torii/core/src/processors/event_message.rs index a3c06fc762..090bda3e16 100644 --- a/crates/torii/core/src/processors/event_message.rs +++ b/crates/torii/core/src/processors/event_message.rs @@ -66,7 +66,7 @@ where let mut entity = model.schema().await?; entity.deserialize(&mut keys_and_unpacked)?; - db.set_event_message(entity, event_id, block_timestamp).await?; + db.set_entity(entity, true, event_id, block_timestamp).await?; Ok(()) } } diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index 6641e19b16..c4f7c5a9d2 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -75,7 +75,7 @@ where let mut entity = model.schema().await?; entity.deserialize(&mut keys_and_unpacked)?; - db.set_entity(entity, event_id, block_timestamp).await?; + db.set_entity(entity, false, event_id, block_timestamp).await?; Ok(()) } } diff --git a/crates/torii/core/src/processors/store_update_record.rs b/crates/torii/core/src/processors/store_update_record.rs index 9fdc3a03c7..0e34792090 100644 --- a/crates/torii/core/src/processors/store_update_record.rs +++ b/crates/torii/core/src/processors/store_update_record.rs @@ -78,7 +78,7 @@ where let mut entity = model.schema().await?; entity.deserialize(&mut keys_and_unpacked)?; - db.set_entity(entity, event_id, block_timestamp).await?; + db.set_entity(entity, false, event_id, block_timestamp).await?; Ok(()) } } diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 69edbc3d70..53f04cdc9b 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -17,10 +17,7 @@ use super::World; use crate::model::ModelSQLReader; use crate::query_queue::{Argument, QueryQueue}; use crate::simple_broker::SimpleBroker; -use crate::types::{ - Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, - Model as ModelRegistered, -}; +use crate::types::{Entity as EntityUpdated, Event as EventEmitted, Model as ModelRegistered}; use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; pub const FELT_DELIMITER: &str = "/"; @@ -156,6 +153,7 @@ impl Sql { pub async fn set_entity( &mut self, entity: Ty, + is_event_message: bool, event_id: &str, block_timestamp: u64, ) -> Result<()> { @@ -172,7 +170,11 @@ impl Sql { let namespaced_name = entity.name(); let (model_namespace, model_name) = namespaced_name.split_once('-').unwrap(); - let entity_id = format!("{:#x}", poseidon_hash_many(&keys)); + let entity_id = format!( + "{}{:#x}", + if is_event_message { "event:" } else { "" }, + poseidon_hash_many(&keys) + ); let model_id = format!("{:#x}", compute_selector_from_names(model_namespace, model_name)); self.query_queue.enqueue( @@ -186,7 +188,7 @@ impl Sql { ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ event_id=EXCLUDED.event_id RETURNING *"; - let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities) + let mut entity_updated: EntityUpdated = sqlx::query_as(&insert_entities) .bind(&entity_id) .bind(&keys_str) .bind(event_id) @@ -200,7 +202,7 @@ impl Sql { self.build_set_entity_queries_recursive( path, event_id, - (&entity_id, false), + &entity_id, &entity, block_timestamp, &vec![], @@ -208,66 +210,6 @@ impl Sql { self.query_queue.execute_all().await?; SimpleBroker::publish(entity_updated); - - Ok(()) - } - - pub async fn set_event_message( - &mut self, - entity: Ty, - event_id: &str, - block_timestamp: u64, - ) -> Result<()> { - let keys = if let Ty::Struct(s) = &entity { - let mut keys = Vec::new(); - for m in s.keys() { - keys.extend(m.serialize()?); - } - keys - } else { - return Err(anyhow!("Entity is not a struct")); - }; - - let namespaced_name = entity.name(); - let (model_namespace, model_name) = namespaced_name.split_once('-').unwrap(); - - let entity_id = format!("{:#x}", poseidon_hash_many(&keys)); - let model_id = format!("{:#x}", compute_selector_from_names(model_namespace, model_name)); - - self.query_queue.enqueue( - "INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ - model_id) DO NOTHING", - vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], - ); - - let keys_str = felts_sql_string(&keys); - let insert_entities = "INSERT INTO event_messages (id, keys, event_id, executed_at) \ - VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ - updated_at=CURRENT_TIMESTAMP, event_id=EXCLUDED.event_id RETURNING \ - *"; - let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities) - .bind(&entity_id) - .bind(&keys_str) - .bind(event_id) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .fetch_one(&self.pool) - .await?; - - event_message_updated.updated_model = Some(entity.clone()); - - let path = vec![namespaced_name]; - self.build_set_entity_queries_recursive( - path, - event_id, - (&entity_id, true), - &entity, - block_timestamp, - &vec![], - ); - self.query_queue.execute_all().await?; - - SimpleBroker::publish(event_message_updated); - Ok(()) } @@ -561,13 +503,11 @@ impl Sql { path: Vec, event_id: &str, // The id of the entity and if the entity is an event message - entity_id: (&str, bool), + entity_id: &str, entity: &Ty, block_timestamp: u64, indexes: &Vec, ) { - let (entity_id, is_event_message) = entity_id; - let update_members = |members: &[Member], query_queue: &mut QueryQueue, indexes: &Vec| { let table_id = path.join("$"); @@ -576,19 +516,11 @@ impl Sql { "event_id".to_string(), "executed_at".to_string(), "updated_at".to_string(), - if is_event_message { - "event_message_id".to_string() - } else { - "entity_id".to_string() - }, + "entity_id".to_string(), ]; let mut arguments = vec![ - Argument::String(if is_event_message { - "event:".to_string() + entity_id - } else { - entity_id.to_string() - }), + Argument::String(entity_id.to_string()), Argument::String(event_id.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), Argument::String(chrono::Utc::now().to_rfc3339()), @@ -648,7 +580,7 @@ impl Sql { self.build_set_entity_queries_recursive( path_clone, event_id, - (entity_id, is_event_message), + entity_id, &member.ty, block_timestamp, indexes, @@ -658,7 +590,11 @@ impl Sql { Ty::Enum(e) => { if e.options.iter().all( |o| { - if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } + if let Ty::Tuple(t) = &o.ty { + t.is_empty() + } else { + false + } }, ) { return; @@ -684,7 +620,7 @@ impl Sql { self.build_set_entity_queries_recursive( path_clone, event_id, - (entity_id, is_event_message), + entity_id, &option.ty, block_timestamp, indexes, @@ -713,7 +649,7 @@ impl Sql { self.build_set_entity_queries_recursive( path_clone, event_id, - (entity_id, is_event_message), + entity_id, member, block_timestamp, indexes, @@ -751,7 +687,7 @@ impl Sql { self.build_set_entity_queries_recursive( path_clone, event_id, - (entity_id, is_event_message), + entity_id, member, block_timestamp, &indexes, diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index 00f2d47f11..b69a62ed6a 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -43,21 +43,6 @@ pub struct Entity { pub updated_model: Option, } -#[derive(FromRow, Deserialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct EventMessage { - pub id: String, - pub keys: String, - pub event_id: String, - pub executed_at: DateTime, - pub created_at: DateTime, - pub updated_at: DateTime, - - // this should never be None. as a EventMessage cannot be deleted - #[sqlx(skip)] - pub updated_model: Option, -} - #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Model {