From 9f0b3f1d77f8e8dfdd81e6fd156904fd15eacdc5 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 24 Apr 2024 12:16:04 -0400 Subject: [PATCH 1/5] feat: spawn event message service --- crates/torii/grpc/src/server/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 935cb7456a..89345e3185 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -75,6 +75,12 @@ impl DojoWorld { Arc::clone(&entity_manager), Arc::clone(&model_cache), )); + + tokio::task::spawn(subscriptions::event_message::Service::new( + pool.clone(), + Arc::clone(&event_message_manager), + Arc::clone(&model_cache), + )); Self { pool, From 6e112962e8a2ba5f2d027af53795db5927b68f22 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 24 Apr 2024 12:16:25 -0400 Subject: [PATCH 2/5] fmt --- crates/torii/grpc/src/server/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 89345e3185..b809147696 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -75,7 +75,7 @@ impl DojoWorld { Arc::clone(&entity_manager), Arc::clone(&model_cache), )); - + tokio::task::spawn(subscriptions::event_message::Service::new( pool.clone(), Arc::clone(&event_message_manager), From 6143f60c98e8184152c065dc5454630e3d399973 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 24 Apr 2024 12:47:58 -0400 Subject: [PATCH 3/5] refactor: grpc event message subscriptions and queries --- crates/torii/core/src/model.rs | 8 +++---- crates/torii/grpc/src/server/mod.rs | 21 +++++++++++++++---- .../grpc/src/server/subscriptions/entity.rs | 2 +- .../src/server/subscriptions/event_message.rs | 2 +- .../grpc/src/server/tests/entities_test.rs | 1 + .../migrations/20240314182410_event_model.sql | 1 - 6 files changed, 24 insertions(+), 11 deletions(-) diff --git a/crates/torii/core/src/model.rs b/crates/torii/core/src/model.rs index 59b3fe1044..f194bacab0 100644 --- a/crates/torii/core/src/model.rs +++ b/crates/torii/core/src/model.rs @@ -173,7 +173,7 @@ pub fn parse_sql_model_members(model: &str, model_members_all: &[SqlModelMember] } /// Creates a query that fetches all models and their nested data. -pub fn build_sql_query(model_schemas: &Vec) -> Result { +pub fn build_sql_query(model_schemas: &Vec, entities_table: &str, entity_relation_column: &str) -> Result { fn parse_struct( path: &str, schema: &Struct, @@ -223,11 +223,11 @@ pub fn build_sql_query(model_schemas: &Vec) -> Result { let selections_clause = global_selections.join(", "); let join_clause = global_tables .into_iter() - .map(|table| format!(" JOIN {table} ON entities.id = {table}.entity_id")) + .map(|table| format!(" JOIN {table} ON {entities_table}.id = {table}.{entity_relation_column}")) .collect::>() .join(" "); - Ok(format!("SELECT entities.id, entities.keys, {selections_clause} FROM entities{join_clause}")) + Ok(format!("SELECT {entities_table}.id, {entities_table}.keys, {selections_clause} FROM {entities_table}{join_clause}")) } /// Populate the values of a Ty (schema) from SQLite row. @@ -528,7 +528,7 @@ mod tests { ], }); - let query = build_sql_query(&vec![ty]).unwrap(); + let query = build_sql_query(&vec![ty], "entities", "entity_id").unwrap(); assert_eq!( query, r#"SELECT entities.id, entities.keys, Position.external_name AS "Position.name", Position.external_age AS "Position.age", Position$vec.external_x AS "Position$vec.x", Position$vec.external_y AS "Position$vec.y" FROM entities JOIN Position ON entities.id = Position.entity_id JOIN Position$vec ON entities.id = Position$vec.entity_id"# diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index b809147696..9e8ef28a17 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -143,7 +143,8 @@ impl DojoWorld { limit: u32, offset: u32, ) -> Result<(Vec, u32), Error> { - self.query_by_hashed_keys("entities", "entity_model", None, limit, offset).await + self.query_by_hashed_keys("entities", "entity_model", "entity_id", None, limit, offset) + .await } async fn events_all(&self, limit: u32, offset: u32) -> Result, Error> { @@ -164,6 +165,7 @@ impl DojoWorld { &self, table: &str, model_relation_table: &str, + entity_relation_column: &str, hashed_keys: Option, limit: u32, offset: u32, @@ -218,7 +220,10 @@ impl DojoWorld { let model_ids: Vec<&str> = models_str.split(',').collect(); let schemas = self.model_cache.schemas(model_ids).await?; - let entity_query = format!("{} WHERE {table}.id = ?", build_sql_query(&schemas)?); + let entity_query = format!( + "{} WHERE {table}.id = ?", + build_sql_query(&schemas, table, entity_relation_column)? + ); let row = sqlx::query(&entity_query).bind(&entity_id).fetch_one(&self.pool).await?; let models = schemas @@ -245,6 +250,7 @@ impl DojoWorld { &self, table: &str, model_relation_table: &str, + entity_relation_column: &str, keys_clause: proto::types::KeysClause, limit: u32, offset: u32, @@ -302,7 +308,7 @@ impl DojoWorld { // query to filter with limit and offset let entities_query = format!( "{} WHERE {table}.keys LIKE ? ORDER BY {table}.event_id DESC LIMIT ? OFFSET ?", - build_sql_query(&schemas)? + build_sql_query(&schemas, table, entity_relation_column)? ); let db_entities = sqlx::query(&entities_query) .bind(&keys_pattern) @@ -361,6 +367,7 @@ impl DojoWorld { &self, table: &str, model_relation_table: &str, + entity_relation_column: &str, member_clause: proto::types::MemberClause, _limit: u32, _offset: u32, @@ -408,7 +415,7 @@ impl DojoWorld { let column_name = format!("external_{}", member_clause.member); let member_query = format!( "{} WHERE {table_name}.{column_name} {comparison_operator} ?", - build_sql_query(&schemas)? + build_sql_query(&schemas, table, entity_relation_column)? ); let db_entities = @@ -518,6 +525,7 @@ impl DojoWorld { self.query_by_hashed_keys( "entities", "entity_model", + "entity_id", Some(hashed_keys), query.limit, query.offset, @@ -536,6 +544,7 @@ impl DojoWorld { self.query_by_keys( "entities", "entity_model", + "entity_id", keys, query.limit, query.offset, @@ -546,6 +555,7 @@ impl DojoWorld { self.query_by_member( "entities", "entity_model", + "entity_id", member, query.limit, query.offset, @@ -595,6 +605,7 @@ impl DojoWorld { self.query_by_hashed_keys( "event_messages", "event_model", + "event_message_id", Some(hashed_keys), query.limit, query.offset, @@ -613,6 +624,7 @@ impl DojoWorld { self.query_by_keys( "event_messages", "event_model", + "event_message_id", keys, query.limit, query.offset, @@ -623,6 +635,7 @@ impl DojoWorld { self.query_by_member( "event_messages", "event_model", + "event_message_id", member, query.limit, query.offset, diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 96cfda7245..1e9d851f62 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -102,7 +102,7 @@ impl Service { let model_ids: Vec<&str> = model_ids.split(',').collect(); let schemas = cache.schemas(model_ids).await?; - let entity_query = format!("{} WHERE entities.id = ?", build_sql_query(&schemas)?); + let entity_query = format!("{} WHERE entities.id = ?", build_sql_query(&schemas, "entities", "entity_id")?); let row = sqlx::query(&entity_query).bind(hashed_keys).fetch_one(&pool).await?; let models = schemas diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index d8d5c35439..c807a22a00 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -102,7 +102,7 @@ impl Service { let schemas = cache.schemas(model_ids).await?; let entity_query = - format!("{} WHERE event_messages.id = ?", build_sql_query(&schemas)?); + format!("{} WHERE event_messages.id = ?", build_sql_query(&schemas, "event_messages", "event_message_id")?); let row = sqlx::query(&entity_query).bind(hashed_keys).fetch_one(&pool).await?; let models = schemas diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index 9f75782e01..660438d83e 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -89,6 +89,7 @@ async fn test_entities_queries() { .query_by_keys( "entities", "entity_model", + "entity_id", KeysClause { model: "Moves".to_string(), keys: vec![account.address()] }.into(), 1, 0, diff --git a/crates/torii/migrations/20240314182410_event_model.sql b/crates/torii/migrations/20240314182410_event_model.sql index 3e2ff3e7d0..dcec9f3bd4 100644 --- a/crates/torii/migrations/20240314182410_event_model.sql +++ b/crates/torii/migrations/20240314182410_event_model.sql @@ -2,7 +2,6 @@ CREATE TABLE event_messages ( id TEXT NOT NULL PRIMARY KEY, keys TEXT, event_id TEXT NOT NULL, - model_names TEXT, executed_at DATETIME NOT NULL, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP From 37b8d4b0ec94a89d5bf8a71c9b78c17e15be2593 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 24 Apr 2024 12:54:23 -0400 Subject: [PATCH 4/5] refactor: global consts --- crates/torii/grpc/src/server/mod.rs | 66 ++++++++++++++++++----------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 9e8ef28a17..0a572cc9f8 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -41,6 +41,14 @@ use crate::proto::world::{SubscribeEntitiesRequest, SubscribeEntityResponse}; use crate::proto::{self}; use crate::types::ComparisonOperator; +pub(crate) static ENTITIES_TABLE: &str = "entities"; +pub(crate) static ENTITIES_MODEL_RELATION_TABLE: &str = "entity_model"; +pub(crate) static ENTITIES_ENTITY_RELATION_COLUMN: &str = "entity_id"; + +pub(crate) static EVENTS_MESSAGES_TABLE: &str = "events_messages"; +pub(crate) static EVENTS_MESSAGES_MODEL_RELATION_TABLE: &str = "event_model"; +pub(crate) static EVENTS_MESSAGES_ENTITY_RELATION_COLUMN: &str = "event_message_id"; + #[derive(Clone)] pub struct DojoWorld { pool: Pool, @@ -143,8 +151,15 @@ impl DojoWorld { limit: u32, offset: u32, ) -> Result<(Vec, u32), Error> { - self.query_by_hashed_keys("entities", "entity_model", "entity_id", None, limit, offset) - .await + self.query_by_hashed_keys( + ENTITIES_TABLE, + ENTITIES_MODEL_RELATION_TABLE, + ENTITIES_ENTITY_RELATION_COLUMN, + None, + limit, + offset, + ) + .await } async fn events_all(&self, limit: u32, offset: u32) -> Result, Error> { @@ -433,6 +448,7 @@ impl DojoWorld { &self, _table: &str, _model_relation_table: &str, + _entity_relation_column: &str, _composite: proto::types::CompositeClause, _limit: u32, _offset: u32, @@ -523,9 +539,9 @@ impl DojoWorld { } self.query_by_hashed_keys( - "entities", - "entity_model", - "entity_id", + ENTITIES_TABLE, + ENTITIES_MODEL_RELATION_TABLE, + ENTITIES_ENTITY_RELATION_COLUMN, Some(hashed_keys), query.limit, query.offset, @@ -542,9 +558,9 @@ impl DojoWorld { } self.query_by_keys( - "entities", - "entity_model", - "entity_id", + ENTITIES_TABLE, + ENTITIES_MODEL_RELATION_TABLE, + ENTITIES_ENTITY_RELATION_COLUMN, keys, query.limit, query.offset, @@ -553,9 +569,9 @@ impl DojoWorld { } ClauseType::Member(member) => { self.query_by_member( - "entities", - "entity_model", - "entity_id", + ENTITIES_TABLE, + ENTITIES_MODEL_RELATION_TABLE, + ENTITIES_ENTITY_RELATION_COLUMN, member, query.limit, query.offset, @@ -564,8 +580,9 @@ impl DojoWorld { } ClauseType::Composite(composite) => { self.query_by_composite( - "entities", - "entity_model", + ENTITIES_TABLE, + ENTITIES_MODEL_RELATION_TABLE, + ENTITIES_ENTITY_RELATION_COLUMN, composite, query.limit, query.offset, @@ -603,9 +620,9 @@ impl DojoWorld { } self.query_by_hashed_keys( - "event_messages", - "event_model", - "event_message_id", + EVENTS_MESSAGES_TABLE, + EVENTS_MESSAGES_MODEL_RELATION_TABLE, + EVENTS_MESSAGES_ENTITY_RELATION_COLUMN, Some(hashed_keys), query.limit, query.offset, @@ -622,9 +639,9 @@ impl DojoWorld { } self.query_by_keys( - "event_messages", - "event_model", - "event_message_id", + EVENTS_MESSAGES_TABLE, + EVENTS_MESSAGES_MODEL_RELATION_TABLE, + EVENTS_MESSAGES_ENTITY_RELATION_COLUMN, keys, query.limit, query.offset, @@ -633,9 +650,9 @@ impl DojoWorld { } ClauseType::Member(member) => { self.query_by_member( - "event_messages", - "event_model", - "event_message_id", + EVENTS_MESSAGES_TABLE, + EVENTS_MESSAGES_MODEL_RELATION_TABLE, + EVENTS_MESSAGES_ENTITY_RELATION_COLUMN, member, query.limit, query.offset, @@ -644,8 +661,9 @@ impl DojoWorld { } ClauseType::Composite(composite) => { self.query_by_composite( - "event_messages", - "event_model", + EVENTS_MESSAGES_TABLE, + EVENTS_MESSAGES_MODEL_RELATION_TABLE, + ENTITIES_ENTITY_RELATION_COLUMN, composite, query.limit, query.offset, From 53547b86153d3a365bbf9e22ba71f3ec625b99af Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 24 Apr 2024 13:04:59 -0400 Subject: [PATCH 5/5] fmt --- crates/torii/core/src/model.rs | 15 ++++++++++++--- .../torii/grpc/src/server/subscriptions/entity.rs | 5 ++++- .../src/server/subscriptions/event_message.rs | 6 ++++-- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/crates/torii/core/src/model.rs b/crates/torii/core/src/model.rs index f194bacab0..2c60fd344e 100644 --- a/crates/torii/core/src/model.rs +++ b/crates/torii/core/src/model.rs @@ -173,7 +173,11 @@ pub fn parse_sql_model_members(model: &str, model_members_all: &[SqlModelMember] } /// Creates a query that fetches all models and their nested data. -pub fn build_sql_query(model_schemas: &Vec, entities_table: &str, entity_relation_column: &str) -> Result { +pub fn build_sql_query( + model_schemas: &Vec, + entities_table: &str, + entity_relation_column: &str, +) -> Result { fn parse_struct( path: &str, schema: &Struct, @@ -223,11 +227,16 @@ pub fn build_sql_query(model_schemas: &Vec, entities_table: &str, entity_rel let selections_clause = global_selections.join(", "); let join_clause = global_tables .into_iter() - .map(|table| format!(" JOIN {table} ON {entities_table}.id = {table}.{entity_relation_column}")) + .map(|table| { + format!(" JOIN {table} ON {entities_table}.id = {table}.{entity_relation_column}") + }) .collect::>() .join(" "); - Ok(format!("SELECT {entities_table}.id, {entities_table}.keys, {selections_clause} FROM {entities_table}{join_clause}")) + Ok(format!( + "SELECT {entities_table}.id, {entities_table}.keys, {selections_clause} FROM \ + {entities_table}{join_clause}" + )) } /// Populate the values of a Ty (schema) from SQLite row. diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 1e9d851f62..a92b88543d 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -102,7 +102,10 @@ impl Service { let model_ids: Vec<&str> = model_ids.split(',').collect(); let schemas = cache.schemas(model_ids).await?; - let entity_query = format!("{} WHERE entities.id = ?", build_sql_query(&schemas, "entities", "entity_id")?); + let entity_query = format!( + "{} WHERE entities.id = ?", + build_sql_query(&schemas, "entities", "entity_id")? + ); let row = sqlx::query(&entity_query).bind(hashed_keys).fetch_one(&pool).await?; let models = schemas diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index c807a22a00..faf3518d0f 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -101,8 +101,10 @@ impl Service { let model_ids: Vec<&str> = model_ids.split(',').collect(); let schemas = cache.schemas(model_ids).await?; - let entity_query = - format!("{} WHERE event_messages.id = ?", build_sql_query(&schemas, "event_messages", "event_message_id")?); + let entity_query = format!( + "{} WHERE event_messages.id = ?", + build_sql_query(&schemas, "event_messages", "event_message_id")? + ); let row = sqlx::query(&entity_query).bind(hashed_keys).fetch_one(&pool).await?; let models = schemas