diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 63cab8a523..3d6f369a21 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -10,6 +10,8 @@ use dojo_types::packing::unpack; use dojo_types::schema::Ty; use dojo_types::WorldMetadata; use dojo_world::contracts::WorldContractReader; +use futures::lock::Mutex; +use futures::Future; use parking_lot::{RwLock, RwLockReadGuard}; use starknet::core::utils::cairo_short_string_to_felt; use starknet::providers::jsonrpc::HttpTransport; @@ -20,6 +22,7 @@ use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming}; use torii_grpc::proto::world::RetrieveEntitiesResponse; use torii_grpc::types::schema::Entity; use torii_grpc::types::{KeysClause, Query}; +use torii_relay::client::EventLoop; use torii_relay::types::Message; use crate::client::error::{Error, ParseError}; @@ -99,9 +102,10 @@ impl Client { }) } - /// Waits for the relay to be ready and listening for messages. - pub async fn wait_for_relay(&self) -> Result<(), Error> { - self.relay_client.command_sender.wait_for_relay().await.map_err(Error::RelayClient) + /// Starts the relay client event loop. + /// This is a blocking call. Spawn this on a separate task. + pub fn relay_runner(&self) -> Arc> { + self.relay_client.event_loop.clone() } /// Publishes a message to a topic. diff --git a/crates/torii/core/src/cache.rs b/crates/torii/core/src/cache.rs index 0a3ae7c1ec..b795a6f1bf 100644 --- a/crates/torii/core/src/cache.rs +++ b/crates/torii/core/src/cache.rs @@ -40,6 +40,10 @@ impl ModelCache { } async fn update_schema(&self, model: &str) -> Result { + let model_name: String = sqlx::query_scalar("SELECT name FROM models WHERE id = ?") + .bind(model) + .fetch_one(&self.pool) + .await?; let model_members: Vec = sqlx::query_as( "SELECT id, model_idx, member_idx, name, type, type_enum, enum_options, key FROM \ model_members WHERE model_id = ? ORDER BY model_idx ASC, member_idx ASC", @@ -52,7 +56,7 @@ impl ModelCache { return Err(QueryError::ModelNotFound(model.into()).into()); } - let ty = parse_sql_model_members(model, &model_members); + let ty = parse_sql_model_members(&model_name, &model_members); let mut cache = self.cache.write().await; cache.insert(model.into(), ty.clone()); diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index b5332acb27..99b8918dfb 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -16,7 +16,7 @@ use proto::world::{ }; use sqlx::sqlite::SqliteRow; use sqlx::{Pool, Row, Sqlite}; -use starknet::core::utils::cairo_short_string_to_felt; +use starknet::core::utils::{cairo_short_string_to_felt, get_selector_from_name}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use starknet_crypto::FieldElement; @@ -99,9 +99,9 @@ impl DojoWorld { .fetch_one(&self.pool) .await?; - let models: Vec<(String, String, String, u32, u32, String)> = sqlx::query_as( - "SELECT name, class_hash, contract_address, packed_size, unpacked_size, layout FROM \ - models", + let models: Vec<(String, String, String, String, u32, u32, String)> = sqlx::query_as( + "SELECT id, name, class_hash, contract_address, packed_size, unpacked_size, layout \ + FROM models", ) .fetch_all(&self.pool) .await?; @@ -110,12 +110,12 @@ impl DojoWorld { for model in models { let schema = self.model_cache.schema(&model.0).await?; models_metadata.push(proto::types::ModelMetadata { - name: model.0, - class_hash: model.1, - contract_address: model.2, - packed_size: model.3, - unpacked_size: model.4, - layout: hex::decode(&model.5).unwrap(), + name: model.1, + class_hash: model.2, + contract_address: model.3, + packed_size: model.4, + unpacked_size: model.5, + layout: hex::decode(&model.6).unwrap(), schema: serde_json::to_vec(&schema).unwrap(), }); } @@ -191,7 +191,7 @@ impl DojoWorld { // query to filter with limit and offset let query = format!( r#" - SELECT {table}.id, group_concat({model_relation_table}.model_id) as model_names + SELECT {table}.id, group_concat({model_relation_table}.model_id) as model_ids FROM {table} JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id {filter_ids} @@ -206,8 +206,8 @@ impl DojoWorld { let mut entities = Vec::with_capacity(db_entities.len()); for (entity_id, models_str) in db_entities { - let model_names: Vec<&str> = models_str.split(',').collect(); - let schemas = self.model_cache.schemas(model_names).await?; + let model_ids: Vec<&str> = models_str.split(',').collect(); + let schemas = self.model_cache.schemas(model_ids).await?; let entity_query = format!("{} WHERE {table}.id = ?", build_sql_query(&schemas)?); let row = sqlx::query(&entity_query).bind(&entity_id).fetch_one(&self.pool).await?; @@ -261,7 +261,7 @@ impl DojoWorld { JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id WHERE {model_relation_table}.model_id = '{}' and {table}.keys LIKE ? "#, - keys_clause.model + get_selector_from_name(&keys_clause.model).map_err(ParseError::NonAsciiName)?, ); // total count of rows that matches keys_pattern without limit and offset @@ -270,21 +270,21 @@ impl DojoWorld { let models_query = format!( r#" - SELECT group_concat({model_relation_table}.model_id) as model_names + SELECT group_concat({model_relation_table}.model_id) as model_ids FROM {table} JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id WHERE {table}.keys LIKE ? GROUP BY {table}.id - HAVING model_names REGEXP '(^|,){}(,|$)' + HAVING model_ids REGEXP '(^|,){}(,|$)' LIMIT 1 "#, - keys_clause.model + get_selector_from_name(&keys_clause.model).map_err(ParseError::NonAsciiName)?, ); let (models_str,): (String,) = sqlx::query_as(&models_query).bind(&keys_pattern).fetch_one(&self.pool).await?; - let model_names = models_str.split(',').collect::>(); - let schemas = self.model_cache.schemas(model_names).await?; + let model_ids = models_str.split(',').collect::>(); + let schemas = self.model_cache.schemas(model_ids).await?; // query to filter with limit and offset let entities_query = format!( @@ -377,19 +377,19 @@ impl DojoWorld { let models_query = format!( r#" - SELECT group_concat({model_relation_table}.model_id) as model_names + SELECT group_concat({model_relation_table}.model_id) as model_ids FROM {table} JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id GROUP BY {table}.id - HAVING model_names REGEXP '(^|,){}(,|$)' + HAVING model_ids REGEXP '(^|,){}(,|$)' LIMIT 1 "#, - member_clause.model + get_selector_from_name(&member_clause.model).map_err(ParseError::NonAsciiName)?, ); let (models_str,): (String,) = sqlx::query_as(&models_query).fetch_one(&self.pool).await?; - let model_names = models_str.split(',').collect::>(); - let schemas = self.model_cache.schemas(model_names).await?; + let model_ids = models_str.split(',').collect::>(); + let schemas = self.model_cache.schemas(model_ids).await?; let table_name = member_clause.model; let column_name = format!("external_{}", member_clause.member); @@ -422,6 +422,9 @@ impl DojoWorld { } pub async fn model_metadata(&self, model: &str) -> Result { + // selector + let model = get_selector_from_name(model).map_err(ParseError::NonAsciiName)?; + let (name, class_hash, contract_address, packed_size, unpacked_size, layout): ( String, String, @@ -433,11 +436,11 @@ impl DojoWorld { "SELECT name, class_hash, contract_address, packed_size, unpacked_size, layout FROM \ models WHERE id = ?", ) - .bind(model) + .bind(format!("{:#x}", model)) .fetch_one(&self.pool) .await?; - let schema = self.model_cache.schema(model).await?; + let schema = self.model_cache.schema(&format!("{:#x}", model)).await?; let layout = hex::decode(&layout).unwrap(); Ok(proto::types::ModelMetadata { diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 639c6be6c9..96cfda7245 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -91,16 +91,16 @@ impl Service { // publish all updates if ids is empty or only ids that are subscribed to if sub.hashed_keys.is_empty() || sub.hashed_keys.contains(&hashed) { let models_query = r#" - SELECT group_concat(entity_model.model_id) as model_names + SELECT group_concat(entity_model.model_id) as model_ids FROM entities JOIN entity_model ON entities.id = entity_model.entity_id WHERE entities.id = ? GROUP BY entities.id "#; - let (model_names,): (String,) = + let (model_ids,): (String,) = sqlx::query_as(models_query).bind(hashed_keys).fetch_one(&pool).await?; - let model_names: Vec<&str> = model_names.split(',').collect(); - let schemas = cache.schemas(model_names).await?; + let model_ids: Vec<&str> = model_ids.split(',').collect(); + let schemas = cache.schemas(model_ids).await?; let entity_query = format!("{} WHERE entities.id = ?", build_sql_query(&schemas)?); let row = sqlx::query(&entity_query).bind(hashed_keys).fetch_one(&pool).await?; diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index a6c205f8a5..d8d5c35439 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -90,16 +90,16 @@ impl Service { // publish all updates if ids is empty or only ids that are subscribed to if sub.hashed_keys.is_empty() || sub.hashed_keys.contains(&hashed) { let models_query = r#" - SELECT group_concat(event_model.model_id) as model_names + SELECT group_concat(event_model.model_id) as model_ids FROM event_messages JOIN event_model ON event_messages.id = event_model.entity_id WHERE event_messages.id = ? GROUP BY event_messages.id "#; - let (model_names,): (String,) = + let (model_ids,): (String,) = sqlx::query_as(models_query).bind(hashed_keys).fetch_one(&pool).await?; - let model_names: Vec<&str> = model_names.split(',').collect(); - let schemas = cache.schemas(model_names).await?; + let model_ids: Vec<&str> = model_ids.split(',').collect(); + let schemas = cache.schemas(model_ids).await?; let entity_query = format!("{} WHERE event_messages.id = ?", build_sql_query(&schemas)?); diff --git a/crates/torii/libp2p/src/client/mod.rs b/crates/torii/libp2p/src/client/mod.rs index 23af75b3cc..5412c0b9ec 100644 --- a/crates/torii/libp2p/src/client/mod.rs +++ b/crates/torii/libp2p/src/client/mod.rs @@ -41,7 +41,6 @@ pub struct EventLoop { #[derive(Debug)] enum Command { Publish(Message, oneshot::Sender>), - WaitForRelay(oneshot::Sender>), } impl RelayClient { @@ -162,37 +161,36 @@ impl CommandSender { rx.await.expect("Failed to receive response") } - - pub async fn wait_for_relay(&self) -> Result<(), Error> { - let (tx, rx) = oneshot::channel(); - - self.sender.unbounded_send(Command::WaitForRelay(tx)).expect("Failed to send command"); - - rx.await.expect("Failed to receive response") - } } impl EventLoop { + async fn handle_command( + &mut self, + command: Command, + is_relay_ready: bool, + commands_queue: Arc>>, + ) { + match command { + Command::Publish(data, sender) => { + // if the relay is not ready yet, add the message to the queue + if !is_relay_ready { + commands_queue.lock().await.push(Command::Publish(data, sender)); + } else { + sender.send(self.publish(&data)).expect("Failed to send response"); + } + } + } + } + pub async fn run(&mut self) { let mut is_relay_ready = false; - let mut relay_ready_tx = None; + let commands_queue = Arc::new(Mutex::new(Vec::new())); loop { // Poll the swarm for new events. select! { command = self.command_receiver.select_next_some() => { - match command { - Command::Publish(data, sender) => { - sender.send(self.publish(&data)).expect("Failed to send response"); - } - Command::WaitForRelay(sender) => { - if is_relay_ready { - sender.send(Ok(())).expect("Failed to send response"); - } else { - relay_ready_tx = Some(sender); - } - } - } + self.handle_command(command, is_relay_ready, commands_queue.clone()).await; }, event = self.swarm.select_next_some() => { match event { @@ -200,9 +198,13 @@ impl EventLoop { // Handle behaviour events. info!(target: LOG_TARGET, topic = ?topic, "Relay ready. Received subscription confirmation."); - is_relay_ready = true; - if let Some(tx) = relay_ready_tx.take() { - tx.send(Ok(())).expect("Failed to send response"); + if !is_relay_ready { + is_relay_ready = true; + + // Execute all the commands that were queued while the relay was not ready. + for command in commands_queue.lock().await.drain(..) { + self.handle_command(command, is_relay_ready, commands_queue.clone()).await; + } } } SwarmEvent::ConnectionClosed { cause: Some(cause), .. } => { diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index dd9bac463b..2aa0e9268f 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -49,7 +49,6 @@ mod test { client.event_loop.lock().await.run().await; }); - client.command_sender.wait_for_relay().await?; let mut data = Struct { name: "Message".to_string(), children: vec![] }; data.children.push(Member {