diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/query_queue.rs index d42fdb94b3..130f31378e 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/query_queue.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; -use sqlx::{Executor, Pool, Sqlite}; +use anyhow::{Context, Result}; +use dojo_types::schema::Ty; +use sqlx::{FromRow, Pool, Sqlite}; use starknet::core::types::Felt; use crate::simple_broker::SimpleBroker; @@ -29,34 +31,49 @@ pub enum BrokerMessage { #[derive(Debug, Clone)] pub struct QueryQueue { pool: Pool, - pub queue: VecDeque<(String, Vec)>, + pub queue: VecDeque<(String, Vec, QueryType)>, // publishes that are related to queries in the queue, they should be sent // after the queries are executed pub publish_queue: VecDeque, } +#[derive(Debug, Clone)] +pub enum QueryType { + SetEntity(Ty), + Other, +} + impl QueryQueue { pub fn new(pool: Pool) -> Self { QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() } } - pub fn enqueue>(&mut self, statement: S, arguments: Vec) { - self.queue.push_back((statement.into(), arguments)); + pub fn enqueue>( + &mut self, + statement: S, + arguments: Vec, + query_type: QueryType, + ) { + self.queue.push_back((statement.into(), arguments, query_type)); } - pub fn push_front>(&mut self, statement: S, arguments: Vec) { - self.queue.push_front((statement.into(), arguments)); + pub fn push_front>( + &mut self, + statement: S, + arguments: Vec, + query_type: QueryType, + ) { + self.queue.push_front((statement.into(), arguments, query_type)); } pub fn push_publish(&mut self, value: BrokerMessage) { self.publish_queue.push_back(value); } - pub async fn execute_all(&mut self) -> sqlx::Result { - let mut total_affected = 0_u64; + pub async fn execute_all(&mut self) -> Result<()> { let mut tx = self.pool.begin().await?; - while let Some((statement, arguments)) = self.queue.pop_front() { + while let Some((statement, arguments, query_type)) = self.queue.pop_front() { let mut query = sqlx::query(&statement); for arg in &arguments { @@ -69,20 +86,40 @@ impl QueryQueue { } } - total_affected += tx.execute(query).await?.rows_affected(); + match query_type { + QueryType::SetEntity(entity) => { + let row = query.fetch_one(&mut *tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let mut entity_updated = EntityUpdated::from_row(&row)?; + entity_updated.updated_model = Some(entity); + entity_updated.deleted = false; + let broker_message = BrokerMessage::EntityUpdated(entity_updated); + send_broker_message(broker_message); + } + QueryType::Other => { + query.execute(&mut *tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + } + } } tx.commit().await?; while let Some(message) = self.publish_queue.pop_front() { - match message { - BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), - BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), - BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), - BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), - } + send_broker_message(message); } - Ok(total_affected) + Ok(()) + } +} + +fn send_broker_message(message: BrokerMessage) { + match message { + BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), + BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), + BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), + BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), } } diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index f3b6f887d0..cf4e43290c 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -16,7 +16,7 @@ use starknet_crypto::poseidon_hash_many; use tracing::debug; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, BrokerMessage, QueryQueue}; +use crate::query_queue::{Argument, BrokerMessage, QueryQueue, QueryType}; use crate::types::{ Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, @@ -53,6 +53,7 @@ impl Sql { Argument::FieldElement(world_address), Argument::String(WORLD_CONTRACT_TYPE.to_string()), ], + QueryType::Other, ); query_queue.execute_all().await?; @@ -106,6 +107,7 @@ impl Sql { "UPDATE contracts SET head = ?, last_pending_block_world_tx = ?, \ last_pending_block_tx = ? WHERE id = ?", vec![head, last_pending_block_world_tx, last_pending_block_tx, id], + QueryType::Other, ); } @@ -174,26 +176,29 @@ impl Sql { let entity_id = format!("{:#x}", entity_id); let model_id = format!("{:#x}", model_id); + let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \ + ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ + updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ + event_id=EXCLUDED.event_id"; + + self.query_queue.enqueue( + insert_entities, + vec![ + Argument::String(entity_id.clone()), + Argument::String(keys_str.to_string()), + Argument::String(event_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + QueryType::Other, + ); + self.query_queue.enqueue( "INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ model_id) DO NOTHING", vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], + QueryType::Other, ); - let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \ - ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ - updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ - event_id=EXCLUDED.event_id RETURNING *"; - let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities) - .bind(&entity_id) - .bind(keys_str) - .bind(event_id) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .fetch_one(&self.pool) - .await?; - - entity_updated.updated_model = Some(entity.clone()); - let path = vec![namespaced_name]; self.build_set_entity_queries_recursive( path, @@ -204,7 +209,12 @@ impl Sql { &vec![], ); - self.query_queue.push_publish(BrokerMessage::EntityUpdated(entity_updated)); + let query_entities_for_publish = "SELECT * FROM entities WHERE id = ?"; + self.query_queue.enqueue( + query_entities_for_publish.to_string(), + vec![Argument::String(entity_id.clone())], + QueryType::SetEntity(entity.clone()), + ); Ok(()) } @@ -235,6 +245,7 @@ impl Sql { "INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ model_id) DO NOTHING", vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], + QueryType::Other, ); let keys_str = felts_sql_string(&keys); @@ -293,18 +304,18 @@ impl Sql { ); self.execute().await?; - let mut update_entity = sqlx::query_as::<_, EntityUpdated>( - "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \ - = ? RETURNING *", - ) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .bind(event_id) - .bind(entity_id) - .fetch_one(&self.pool) - .await?; + let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \ + event_id=? WHERE id = ? RETURNING *"; - update_entity.updated_model = Some(wrapped_ty); - self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity)); + self.query_queue.enqueue( + update_query.to_string(), + vec![ + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + Argument::String(event_id.to_string()), + Argument::String(entity_id.clone()), + ], + QueryType::SetEntity(wrapped_ty), + ); Ok(()) } @@ -376,6 +387,7 @@ impl Sql { UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \ updated_at=CURRENT_TIMESTAMP", vec![resource, uri, executed_at], + QueryType::Other, ); } @@ -405,7 +417,7 @@ impl Sql { let statement = format!("UPDATE metadata SET {} WHERE id = ?", update.join(",")); arguments.push(Argument::FieldElement(*resource)); - self.query_queue.enqueue(statement, arguments); + self.query_queue.enqueue(statement, arguments, QueryType::Other); Ok(()) } @@ -520,6 +532,7 @@ impl Sql { Argument::String(transaction_type.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], + QueryType::Other, ); } @@ -540,6 +553,7 @@ impl Sql { "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \ (?, ?, ?, ?, ?)", vec![id, keys, data, hash, executed_at], + QueryType::Other, ); let emitted = EventEmitted { @@ -728,7 +742,7 @@ impl Sql { ) }; - query_queue.enqueue(statement, arguments); + query_queue.enqueue(statement, arguments, QueryType::Other); }; match entity { @@ -826,7 +840,7 @@ impl Sql { let mut arguments = vec![Argument::String(entity_id.to_string())]; arguments.extend(indexes.iter().map(|idx| Argument::Int(*idx))); - self.query_queue.enqueue(query, arguments); + self.query_queue.enqueue(query, arguments, QueryType::Other); // insert the new array elements for (idx, member) in array.iter().enumerate() { @@ -865,8 +879,11 @@ impl Sql { Ty::Struct(s) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue - .push_front(statement, vec![Argument::String(entity_id.to_string())]); + self.query_queue.push_front( + statement, + vec![Argument::String(entity_id.to_string())], + QueryType::Other, + ); for member in s.children.iter() { let mut path_clone = path.clone(); path_clone.push(member.name.clone()); @@ -883,8 +900,11 @@ impl Sql { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue - .push_front(statement, vec![Argument::String(entity_id.to_string())]); + self.query_queue.push_front( + statement, + vec![Argument::String(entity_id.to_string())], + QueryType::Other, + ); for child in e.options.iter() { if let Ty::Tuple(t) = &child.ty { @@ -901,8 +921,11 @@ impl Sql { Ty::Array(array) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue - .push_front(statement, vec![Argument::String(entity_id.to_string())]); + self.query_queue.push_front( + statement, + vec![Argument::String(entity_id.to_string())], + QueryType::Other, + ); for member in array.iter() { let mut path_clone = path.clone(); @@ -913,8 +936,11 @@ impl Sql { Ty::Tuple(t) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue - .push_front(statement, vec![Argument::String(entity_id.to_string())]); + self.query_queue.push_front( + statement, + vec![Argument::String(entity_id.to_string())], + QueryType::Other, + ); for (idx, member) in t.iter().enumerate() { let mut path_clone = path.clone(); @@ -1028,7 +1054,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments); + self.query_queue.enqueue(statement, arguments, QueryType::Other); } } Ty::Tuple(tuple) => { @@ -1056,7 +1082,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments); + self.query_queue.enqueue(statement, arguments, QueryType::Other); } } Ty::Array(array) => { @@ -1081,7 +1107,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments); + self.query_queue.enqueue(statement, arguments, QueryType::Other); } Ty::Enum(e) => { for (idx, child) in e @@ -1120,7 +1146,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments); + self.query_queue.enqueue(statement, arguments, QueryType::Other); } } _ => {} @@ -1159,10 +1185,10 @@ impl Sql { create_table_query .push_str("FOREIGN KEY (event_message_id) REFERENCES event_messages(id));"); - self.query_queue.enqueue(create_table_query, vec![]); + self.query_queue.enqueue(create_table_query, vec![], QueryType::Other); indices.iter().for_each(|s| { - self.query_queue.enqueue(s, vec![]); + self.query_queue.enqueue(s, vec![], QueryType::Other); }); } diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index fec2b9cbd9..0d8f7c22b2 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -349,24 +349,23 @@ impl Relay

{ continue; } - if let Err(e) = self - .db - // event id is message id - .set_entity( - ty, - &message_id.to_string(), - Utc::now().timestamp() as u64, - entity_id, - model_id, - &keys_str - ) - .await + if let Err(e) = set_entity( + &mut self.db, + ty, + &message_id.to_string(), + Utc::now().timestamp() as u64, + entity_id, + model_id, + &keys_str, + ) + .await { info!( target: LOG_TARGET, error = %e, "Setting message." ); + continue; } info!( @@ -520,6 +519,20 @@ fn get_identity_from_ty(ty: &Ty) -> Result { Ok(identity) } +async fn set_entity( + db: &mut Sql, + ty: Ty, + message_id: &str, + block_timestamp: u64, + entity_id: Felt, + model_id: Felt, + keys: &str, +) -> anyhow::Result<()> { + db.set_entity(ty, message_id, block_timestamp, entity_id, model_id, keys).await?; + db.execute().await?; + Ok(()) +} + #[cfg(test)] mod tests { use tempfile::tempdir; diff --git a/spawn-and-move-db.tar.gz b/spawn-and-move-db.tar.gz index 76124d426f..ac6788ec41 100644 Binary files a/spawn-and-move-db.tar.gz and b/spawn-and-move-db.tar.gz differ diff --git a/types-test-db.tar.gz b/types-test-db.tar.gz index 234e62017c..b5a45bd0d7 100644 Binary files a/types-test-db.tar.gz and b/types-test-db.tar.gz differ