Skip to content

Commit

Permalink
feat: spawn event message subscription service (#1876)
Browse files Browse the repository at this point in the history
* feat: spawn event message service

* fmt

* refactor: grpc event message subscriptions and queries

* refactor: global consts

* fmt
  • Loading branch information
Larkooo authored Apr 24, 2024
1 parent 77df165 commit 6fde452
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 28 deletions.
17 changes: 13 additions & 4 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ pub fn parse_sql_model_members(model: &str, model_members_all: &[SqlModelMember]
}

/// Creates a query that fetches all models and their nested data.
pub fn build_sql_query(model_schemas: &Vec<Ty>) -> Result<String, Error> {
pub fn build_sql_query(
model_schemas: &Vec<Ty>,
entities_table: &str,
entity_relation_column: &str,
) -> Result<String, Error> {
fn parse_struct(
path: &str,
schema: &Struct,
Expand Down Expand Up @@ -223,11 +227,16 @@ pub fn build_sql_query(model_schemas: &Vec<Ty>) -> Result<String, Error> {
let selections_clause = global_selections.join(", ");
let join_clause = global_tables
.into_iter()
.map(|table| format!(" JOIN {table} ON entities.id = {table}.entity_id"))
.map(|table| {
format!(" JOIN {table} ON {entities_table}.id = {table}.{entity_relation_column}")
})
.collect::<Vec<_>>()
.join(" ");

Ok(format!("SELECT entities.id, entities.keys, {selections_clause} FROM entities{join_clause}"))
Ok(format!(
"SELECT {entities_table}.id, {entities_table}.keys, {selections_clause} FROM \
{entities_table}{join_clause}"
))
}

/// Populate the values of a Ty (schema) from SQLite row.
Expand Down Expand Up @@ -528,7 +537,7 @@ mod tests {
],
});

let query = build_sql_query(&vec![ty]).unwrap();
let query = build_sql_query(&vec![ty], "entities", "entity_id").unwrap();
assert_eq!(
query,
r#"SELECT entities.id, entities.keys, Position.external_name AS "Position.name", Position.external_age AS "Position.age", Position$vec.external_x AS "Position$vec.x", Position$vec.external_y AS "Position$vec.y" FROM entities JOIN Position ON entities.id = Position.entity_id JOIN Position$vec ON entities.id = Position$vec.entity_id"#
Expand Down
77 changes: 57 additions & 20 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ use crate::proto::world::{SubscribeEntitiesRequest, SubscribeEntityResponse};
use crate::proto::{self};
use crate::types::ComparisonOperator;

pub(crate) static ENTITIES_TABLE: &str = "entities";
pub(crate) static ENTITIES_MODEL_RELATION_TABLE: &str = "entity_model";
pub(crate) static ENTITIES_ENTITY_RELATION_COLUMN: &str = "entity_id";

pub(crate) static EVENTS_MESSAGES_TABLE: &str = "events_messages";
pub(crate) static EVENTS_MESSAGES_MODEL_RELATION_TABLE: &str = "event_model";
pub(crate) static EVENTS_MESSAGES_ENTITY_RELATION_COLUMN: &str = "event_message_id";

#[derive(Clone)]
pub struct DojoWorld {
pool: Pool<Sqlite>,
Expand Down Expand Up @@ -76,6 +84,12 @@ impl DojoWorld {
Arc::clone(&model_cache),
));

tokio::task::spawn(subscriptions::event_message::Service::new(
pool.clone(),
Arc::clone(&event_message_manager),
Arc::clone(&model_cache),
));

Self {
pool,
world_address,
Expand Down Expand Up @@ -137,7 +151,15 @@ impl DojoWorld {
limit: u32,
offset: u32,
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
self.query_by_hashed_keys("entities", "entity_model", None, limit, offset).await
self.query_by_hashed_keys(
ENTITIES_TABLE,
ENTITIES_MODEL_RELATION_TABLE,
ENTITIES_ENTITY_RELATION_COLUMN,
None,
limit,
offset,
)
.await
}

async fn events_all(&self, limit: u32, offset: u32) -> Result<Vec<proto::types::Event>, Error> {
Expand All @@ -158,6 +180,7 @@ impl DojoWorld {
&self,
table: &str,
model_relation_table: &str,
entity_relation_column: &str,
hashed_keys: Option<proto::types::HashedKeysClause>,
limit: u32,
offset: u32,
Expand Down Expand Up @@ -212,7 +235,10 @@ impl DojoWorld {
let model_ids: Vec<&str> = models_str.split(',').collect();
let schemas = self.model_cache.schemas(model_ids).await?;

let entity_query = format!("{} WHERE {table}.id = ?", build_sql_query(&schemas)?);
let entity_query = format!(
"{} WHERE {table}.id = ?",
build_sql_query(&schemas, table, entity_relation_column)?
);
let row = sqlx::query(&entity_query).bind(&entity_id).fetch_one(&self.pool).await?;

let models = schemas
Expand All @@ -239,6 +265,7 @@ impl DojoWorld {
&self,
table: &str,
model_relation_table: &str,
entity_relation_column: &str,
keys_clause: proto::types::KeysClause,
limit: u32,
offset: u32,
Expand Down Expand Up @@ -296,7 +323,7 @@ impl DojoWorld {
// query to filter with limit and offset
let entities_query = format!(
"{} WHERE {table}.keys LIKE ? ORDER BY {table}.event_id DESC LIMIT ? OFFSET ?",
build_sql_query(&schemas)?
build_sql_query(&schemas, table, entity_relation_column)?
);
let db_entities = sqlx::query(&entities_query)
.bind(&keys_pattern)
Expand Down Expand Up @@ -355,6 +382,7 @@ impl DojoWorld {
&self,
table: &str,
model_relation_table: &str,
entity_relation_column: &str,
member_clause: proto::types::MemberClause,
_limit: u32,
_offset: u32,
Expand Down Expand Up @@ -402,7 +430,7 @@ impl DojoWorld {
let column_name = format!("external_{}", member_clause.member);
let member_query = format!(
"{} WHERE {table_name}.{column_name} {comparison_operator} ?",
build_sql_query(&schemas)?
build_sql_query(&schemas, table, entity_relation_column)?
);

let db_entities =
Expand All @@ -420,6 +448,7 @@ impl DojoWorld {
&self,
_table: &str,
_model_relation_table: &str,
_entity_relation_column: &str,
_composite: proto::types::CompositeClause,
_limit: u32,
_offset: u32,
Expand Down Expand Up @@ -510,8 +539,9 @@ impl DojoWorld {
}

self.query_by_hashed_keys(
"entities",
"entity_model",
ENTITIES_TABLE,
ENTITIES_MODEL_RELATION_TABLE,
ENTITIES_ENTITY_RELATION_COLUMN,
Some(hashed_keys),
query.limit,
query.offset,
Expand All @@ -528,8 +558,9 @@ impl DojoWorld {
}

self.query_by_keys(
"entities",
"entity_model",
ENTITIES_TABLE,
ENTITIES_MODEL_RELATION_TABLE,
ENTITIES_ENTITY_RELATION_COLUMN,
keys,
query.limit,
query.offset,
Expand All @@ -538,8 +569,9 @@ impl DojoWorld {
}
ClauseType::Member(member) => {
self.query_by_member(
"entities",
"entity_model",
ENTITIES_TABLE,
ENTITIES_MODEL_RELATION_TABLE,
ENTITIES_ENTITY_RELATION_COLUMN,
member,
query.limit,
query.offset,
Expand All @@ -548,8 +580,9 @@ impl DojoWorld {
}
ClauseType::Composite(composite) => {
self.query_by_composite(
"entities",
"entity_model",
ENTITIES_TABLE,
ENTITIES_MODEL_RELATION_TABLE,
ENTITIES_ENTITY_RELATION_COLUMN,
composite,
query.limit,
query.offset,
Expand Down Expand Up @@ -587,8 +620,9 @@ impl DojoWorld {
}

self.query_by_hashed_keys(
"event_messages",
"event_model",
EVENTS_MESSAGES_TABLE,
EVENTS_MESSAGES_MODEL_RELATION_TABLE,
EVENTS_MESSAGES_ENTITY_RELATION_COLUMN,
Some(hashed_keys),
query.limit,
query.offset,
Expand All @@ -605,8 +639,9 @@ impl DojoWorld {
}

self.query_by_keys(
"event_messages",
"event_model",
EVENTS_MESSAGES_TABLE,
EVENTS_MESSAGES_MODEL_RELATION_TABLE,
EVENTS_MESSAGES_ENTITY_RELATION_COLUMN,
keys,
query.limit,
query.offset,
Expand All @@ -615,8 +650,9 @@ impl DojoWorld {
}
ClauseType::Member(member) => {
self.query_by_member(
"event_messages",
"event_model",
EVENTS_MESSAGES_TABLE,
EVENTS_MESSAGES_MODEL_RELATION_TABLE,
EVENTS_MESSAGES_ENTITY_RELATION_COLUMN,
member,
query.limit,
query.offset,
Expand All @@ -625,8 +661,9 @@ impl DojoWorld {
}
ClauseType::Composite(composite) => {
self.query_by_composite(
"event_messages",
"event_model",
EVENTS_MESSAGES_TABLE,
EVENTS_MESSAGES_MODEL_RELATION_TABLE,
ENTITIES_ENTITY_RELATION_COLUMN,
composite,
query.limit,
query.offset,
Expand Down
5 changes: 4 additions & 1 deletion crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ impl Service {
let model_ids: Vec<&str> = model_ids.split(',').collect();
let schemas = cache.schemas(model_ids).await?;

let entity_query = format!("{} WHERE entities.id = ?", build_sql_query(&schemas)?);
let entity_query = format!(
"{} WHERE entities.id = ?",
build_sql_query(&schemas, "entities", "entity_id")?
);
let row = sqlx::query(&entity_query).bind(hashed_keys).fetch_one(&pool).await?;

let models = schemas
Expand Down
6 changes: 4 additions & 2 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ impl Service {
let model_ids: Vec<&str> = model_ids.split(',').collect();
let schemas = cache.schemas(model_ids).await?;

let entity_query =
format!("{} WHERE event_messages.id = ?", build_sql_query(&schemas)?);
let entity_query = format!(
"{} WHERE event_messages.id = ?",
build_sql_query(&schemas, "event_messages", "event_message_id")?
);
let row = sqlx::query(&entity_query).bind(hashed_keys).fetch_one(&pool).await?;

let models = schemas
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/tests/entities_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ async fn test_entities_queries() {
.query_by_keys(
"entities",
"entity_model",
"entity_id",
KeysClause { model: "Moves".to_string(), keys: vec![account.address()] }.into(),
1,
0,
Expand Down
1 change: 0 additions & 1 deletion crates/torii/migrations/20240314182410_event_model.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ CREATE TABLE event_messages (
id TEXT NOT NULL PRIMARY KEY,
keys TEXT,
event_id TEXT NOT NULL,
model_names TEXT,
executed_at DATETIME NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
Expand Down

0 comments on commit 6fde452

Please sign in to comment.