Skip to content

Commit

Permalink
Torii grpc handle entities_all query (#1240)
Browse files Browse the repository at this point in the history
  • Loading branch information
broody authored Dec 6, 2023
1 parent c6d8297 commit 48d04d1
Show file tree
Hide file tree
Showing 14 changed files with 649 additions and 139 deletions.
561 changes: 507 additions & 54 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ serde = { version = "1.0.156", features = [ "derive" ] }
serde_json = "1.0"
serde_with = "2.3.1"
smol_str = { version = "0.2.0", features = [ "serde" ] }
sqlx = { version = "0.6.2", features = [ "chrono", "macros", "offline", "runtime-actix-rustls", "sqlite", "uuid" ] }
sqlx = { version = "0.7.2", features = [ "chrono", "macros", "runtime-async-std", "runtime-tokio", "sqlite", "uuid", "regexp" ] }
starknet = "0.7.0"
starknet-crypto = "0.6.1"
starknet_api = { git = "https://github.com/starkware-libs/starknet-api", rev = "ecc9b6946ef13003da202838e4124a9ad2efabb0" }
Expand Down
10 changes: 5 additions & 5 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ impl Client {
self.subscribed_entities.entities_keys.read()
}

/// Retrieves entities matching specified keys and/or model name in query parameter.
/// Retrieves entities matching query parameter.
///
/// The query can include keys and a model name, both optional. Without parameters, it fetches
/// all entities, which is less efficient as it requires additional queries for each
/// entity's model data. Specifying a model name optimizes the process by limiting the
/// retrieval to entities with that model, requiring just one query.
/// The query param includes an optional clause for filtering. Without clause, it fetches ALL
/// entities, this is less efficient as it requires an additional query for each entity's
/// model data. Specifying a clause can optimize the query by limiting the retrieval to specific
/// type of entites matching keys and/or models.
pub async fn entities(&self, query: Query) -> Result<Vec<Entity>, Error> {
let mut grpc_client = self.inner.write().await;
let RetrieveEntitiesResponse { entities } = grpc_client.retrieve_entities(query).await?;
Expand Down
23 changes: 16 additions & 7 deletions crates/torii/core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,27 @@ type ModelName = String;

pub struct ModelCache {
pool: SqlitePool,
schemas: RwLock<HashMap<ModelName, Ty>>,
cache: RwLock<HashMap<ModelName, Ty>>,
}

impl ModelCache {
pub fn new(pool: SqlitePool) -> Self {
Self { pool, schemas: RwLock::new(HashMap::new()) }
Self { pool, cache: RwLock::new(HashMap::new()) }
}

pub async fn schemas(&self, models: Vec<&str>) -> Result<Vec<Ty>, Error> {
let mut schemas = Vec::with_capacity(models.len());
for model in models {
schemas.push(self.schema(model).await?);
}

Ok(schemas)
}

pub async fn schema(&self, model: &str) -> Result<Ty, Error> {
{
let schemas = self.schemas.read().await;
if let Some(schema) = schemas.get(model) {
let cache = self.cache.read().await;
if let Some(schema) = cache.get(model) {
return Ok(schema.clone());
}
}
Expand All @@ -44,13 +53,13 @@ impl ModelCache {
}

let ty = parse_sql_model_members(model, &model_members);
let mut schemas = self.schemas.write().await;
schemas.insert(model.into(), ty.clone());
let mut cache = self.cache.write().await;
cache.insert(model.into(), ty.clone());

Ok(ty)
}

pub async fn clear(&self) {
self.schemas.write().await.clear();
self.cache.write().await.clear();
}
}
2 changes: 2 additions & 0 deletions crates/torii/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub enum ParseError {
pub enum QueryError {
#[error("unsupported query")]
UnsupportedQuery,
#[error("missing param: {0}")]
MissingParam(String),
#[error("model not found: {0}")]
ModelNotFound(String),
#[error("exceeds sqlite `JOIN` limit (64)")]
Expand Down
11 changes: 3 additions & 8 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ pub fn build_sql_query(model_schemas: &Vec<Ty>) -> Result<String, Error> {
let selections_clause = global_selections.join(", ");
let join_clause = global_tables
.into_iter()
.map(|table| format!(" LEFT JOIN {table} ON entities.id = {table}.entity_id"))
.map(|table| format!(" JOIN {table} ON entities.id = {table}.entity_id"))
.collect::<Vec<_>>()
.join(" ");

Ok(format!("SELECT entities.keys, {selections_clause} FROM entities{join_clause}"))
Ok(format!("SELECT entities.id, entities.keys, {selections_clause} FROM entities{join_clause}"))
}

/// Populate the values of a Ty (schema) from SQLite row.
Expand Down Expand Up @@ -504,14 +504,9 @@ mod tests {
});

let query = build_sql_query(&vec![ty]).unwrap();
println!("{query}");
assert_eq!(
query,
"SELECT entities.keys, Position.external_name AS \"Position.name\", \
Position.external_age AS \"Position.age\", Position$Vec2.external_x AS \
\"Position$Vec2.x\", Position$Vec2.external_y AS \"Position$Vec2.y\" FROM entities \
LEFT JOIN Position ON entities.id = Position.entity_id LEFT JOIN Position$Vec2 ON \
entities.id = Position$Vec2.entity_id"
r#"SELECT entities.id, entities.keys, Position.external_name AS "Position.name", Position.external_age AS "Position.age", Position$Vec2.external_x AS "Position$Vec2.x", Position$Vec2.external_y AS "Position$Vec2.y" FROM entities JOIN Position ON entities.id = Position.entity_id JOIN Position$Vec2 ON entities.id = Position$Vec2.entity_id"#
);
}
}
8 changes: 4 additions & 4 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Sql {
let indexer_query = sqlx::query_as::<_, (i64,)>("SELECT head FROM indexers WHERE id = ?")
.bind(format!("{:#x}", self.world_address));

let indexer: (i64,) = indexer_query.fetch_one(&mut conn).await?;
let indexer: (i64,) = indexer_query.fetch_one(&mut *conn).await?;
Ok(indexer.0.try_into().expect("doesn't fit in u64"))
}

Expand All @@ -68,7 +68,7 @@ impl Sql {
let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let meta: World = sqlx::query_as("SELECT * FROM worlds WHERE id = ?")
.bind(format!("{:#x}", self.world_address))
.fetch_one(&mut conn)
.fetch_one(&mut *conn)
.await?;

Ok(meta)
Expand Down Expand Up @@ -223,14 +223,14 @@ impl Sql {
.bind(format!("{:#x}", key));

let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let row: (i32, String, String) = query.fetch_one(&mut conn).await?;
let row: (i32, String, String) = query.fetch_one(&mut *conn).await?;
Ok(serde_json::from_str(&row.2).unwrap())
}

pub async fn entities(&self, model: String) -> Result<Vec<Vec<FieldElement>>> {
let query = sqlx::query_as::<_, (i32, String, String)>("SELECT * FROM ?").bind(model);
let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let mut rows = query.fetch_all(&mut conn).await?;
let mut rows = query.fetch_all(&mut *conn).await?;
Ok(rows.drain(..).map(|row| serde_json::from_str(&row.2).unwrap()).collect())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/graphql/src/object/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ fn model_union_field() -> Field {
let model_ids: Vec<(String,)> =
sqlx::query_as("SELECT model_id from entity_model WHERE entity_id = ?")
.bind(&entity_id)
.fetch_all(&mut conn)
.fetch_all(&mut *conn)
.await?;

let mut results: Vec<FieldValue<'_>> = Vec::new();
Expand Down
9 changes: 4 additions & 5 deletions crates/torii/graphql/src/query/data.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use async_graphql::connection::PageInfo;
use sqlx::pool::PoolConnection;
use sqlx::sqlite::SqliteRow;
use sqlx::{Result, Row, Sqlite};
use sqlx::{Result, Row, SqliteConnection};

use super::filter::{Filter, FilterValue};
use super::order::{CursorDirection, Direction, Order};
use crate::constants::DEFAULT_LIMIT;
use crate::object::connection::{cursor, ConnectionArguments};

pub async fn count_rows(
conn: &mut PoolConnection<Sqlite>,
conn: &mut SqliteConnection,
table_name: &str,
keys: &Option<Vec<String>>,
filters: &Option<Vec<Filter>>,
Expand All @@ -26,7 +25,7 @@ pub async fn count_rows(
}

pub async fn fetch_single_row(
conn: &mut PoolConnection<Sqlite>,
conn: &mut SqliteConnection,
table_name: &str,
id_column: &str,
id: &str,
Expand All @@ -37,7 +36,7 @@ pub async fn fetch_single_row(

#[allow(clippy::too_many_arguments)]
pub async fn fetch_multiple_rows(
conn: &mut PoolConnection<Sqlite>,
conn: &mut SqliteConnection,
table_name: &str,
id_column: &str,
keys: &Option<Vec<String>>,
Expand Down
7 changes: 3 additions & 4 deletions crates/torii/graphql/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use async_graphql::dynamic::TypeRef;
use async_graphql::{Name, Value};
use convert_case::{Case, Casing};
use dojo_types::primitive::{Primitive, SqlType};
use sqlx::pool::PoolConnection;
use sqlx::sqlite::SqliteRow;
use sqlx::{Row, Sqlite};
use sqlx::{Row, SqliteConnection};
use torii_core::sql::FELT_DELIMITER;

use crate::constants::{BOOLEAN_TRUE, ENTITY_ID_COLUMN, INTERNAL_ENTITY_ID_KEY};
Expand All @@ -18,7 +17,7 @@ pub mod filter;
pub mod order;

pub async fn type_mapping_query(
conn: &mut PoolConnection<Sqlite>,
conn: &mut SqliteConnection,
model_id: &str,
) -> sqlx::Result<TypeMapping> {
let model_members = fetch_model_members(conn, model_id).await?;
Expand All @@ -29,7 +28,7 @@ pub async fn type_mapping_query(
}

async fn fetch_model_members(
conn: &mut PoolConnection<Sqlite>,
conn: &mut SqliteConnection,
model_id: &str,
) -> sqlx::Result<Vec<ModelMember>> {
sqlx::query_as(
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/graphql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub async fn build_schema(pool: &SqlitePool) -> Result<Schema> {

async fn build_objects(pool: &SqlitePool) -> Result<(Vec<Box<dyn ObjectTrait>>, Union)> {
let mut conn = pool.acquire().await?;
let models: Vec<Model> = sqlx::query_as("SELECT * FROM models").fetch_all(&mut conn).await?;
let models: Vec<Model> = sqlx::query_as("SELECT * FROM models").fetch_all(&mut *conn).await?;

// predefined objects
let mut objects: Vec<Box<dyn ObjectTrait>> = vec![
Expand Down
Loading

0 comments on commit 48d04d1

Please sign in to comment.