Skip to content

Commit

Permalink
feat: introduce updated at field in top level torii query (#2807)
Browse files Browse the repository at this point in the history
* introduce updated at field in top level query

* fmt

* fmt + clippy

* clippy

* fix timestamp issue

* fix query count

* remove query count

* docs: add comments to explain fields

---------

Co-authored-by: glihm <[email protected]>
  • Loading branch information
edisontim and glihm authored Dec 16, 2024
1 parent 4e8f254 commit 8111b6d
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 1 deletion.
2 changes: 2 additions & 0 deletions crates/torii/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ pub enum QueryError {
SqliteJoinLimit,
#[error("Invalid namespaced model: {0}")]
InvalidNamespacedModel(String),
#[error("Invalid timestamp: {0}. Expected valid number of seconds since unix epoch.")]
InvalidTimestamp(u64),
}
20 changes: 20 additions & 0 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl ModelReader<Error> for ModelSQLReader {
}

/// Creates a query that fetches all models and their nested data.
#[allow(clippy::too_many_arguments)]
pub fn build_sql_query(
schemas: &Vec<Ty>,
table_name: &str,
Expand All @@ -124,6 +125,7 @@ pub fn build_sql_query(
order_by: Option<&str>,
limit: Option<u32>,
offset: Option<u32>,
internal_updated_at: u64,
) -> Result<(String, String), Error> {
fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
match ty {
Expand Down Expand Up @@ -172,6 +174,7 @@ pub fn build_sql_query(
selections.push(format!("{}.id", table_name));
selections.push(format!("{}.keys", table_name));

let mut internal_updated_at_clause = Vec::with_capacity(schemas.len());
// Process each model schema
for model in schemas {
let model_table = model.name();
Expand All @@ -180,6 +183,10 @@ pub fn build_sql_query(
[{model_table}].{entity_relation_column}",
));

if internal_updated_at > 0 {
internal_updated_at_clause.push(format!("[{model_table}].internal_updated_at >= ?"));
}

// Collect columns with table prefix
collect_columns(&model_table, "", model, &mut selections);
}
Expand All @@ -197,6 +204,18 @@ pub fn build_sql_query(
count_query += &format!(" WHERE {}", where_clause);
}

if !internal_updated_at_clause.is_empty() {
if where_clause.is_none() {
query += " WHERE ";
count_query += " WHERE ";
} else {
query += " AND ";
count_query += " AND ";
}
query += &format!(" {}", internal_updated_at_clause.join(" AND "));
count_query += &format!(" {}", internal_updated_at_clause.join(" AND "));
}

// Use custom order by if provided, otherwise default to event_id DESC
if let Some(order_clause) = order_by {
query += &format!(" ORDER BY {}", order_clause);
Expand Down Expand Up @@ -494,6 +513,7 @@ mod tests {
None,
None,
None,
0,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message Query {
bool dont_include_hashed_keys = 4;
repeated OrderBy order_by = 5;
repeated string entity_models = 6;
uint64 internal_updated_at = 7;
}

message EventQuery {
Expand Down
33 changes: 32 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use proto::world::{
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use sqlx::prelude::FromRow;
use sqlx::sqlite::SqliteRow;
use sqlx::types::chrono::{DateTime, Utc};
use sqlx::{Pool, Row, Sqlite};
use starknet::core::types::Felt;
use starknet::providers::jsonrpc::HttpTransport;
Expand Down Expand Up @@ -229,6 +230,7 @@ impl DojoWorld {
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
self.query_by_hashed_keys(
table,
Expand All @@ -240,6 +242,7 @@ impl DojoWorld {
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,
)
.await
}
Expand All @@ -258,6 +261,7 @@ impl DojoWorld {
row_events.iter().map(map_row_to_event).collect()
}

#[allow(clippy::too_many_arguments)]
async fn fetch_entities(
&self,
table: &str,
Expand All @@ -266,6 +270,7 @@ impl DojoWorld {
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<Vec<proto::types::Entity>, Error> {
let entity_models =
entity_models.iter().map(|tag| compute_selector_from_tag(tag)).collect::<Vec<Felt>>();
Expand Down Expand Up @@ -354,9 +359,22 @@ impl DojoWorld {
order_by,
None,
None,
internal_updated_at,
)?;

let rows = sqlx::query(&entity_query).bind(models_str).fetch_all(&mut *tx).await?;
let mut query = sqlx::query(&entity_query).bind(models_str);
if internal_updated_at > 0 {
for _ in 0..schemas.len() {
let time = DateTime::<Utc>::from_timestamp(internal_updated_at as i64, 0)
.ok_or_else(|| {
Error::from(QueryError::InvalidTimestamp(internal_updated_at))
})?
.to_rfc3339();
query = query.bind(time.clone());
}
}
let rows = query.fetch_all(&mut *tx).await?;

let schemas = Arc::new(schemas);

let group_entities: Result<Vec<_>, Error> = rows
Expand Down Expand Up @@ -430,6 +448,7 @@ impl DojoWorld {
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
// TODO: use prepared statement for where clause
let filter_ids = match hashed_keys {
Expand Down Expand Up @@ -510,6 +529,7 @@ impl DojoWorld {
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,
)
.await?;
Ok((entities, total_count))
Expand All @@ -527,6 +547,7 @@ impl DojoWorld {
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let keys_pattern = build_keys_pattern(keys_clause)?;

Expand Down Expand Up @@ -655,6 +676,7 @@ impl DojoWorld {
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,
)
.await?;
Ok((entities, total_count))
Expand Down Expand Up @@ -699,6 +721,7 @@ impl DojoWorld {
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let entity_models =
entity_models.iter().map(|model| compute_selector_from_tag(model)).collect::<Vec<_>>();
Expand Down Expand Up @@ -765,6 +788,7 @@ impl DojoWorld {
order_by,
limit,
offset,
internal_updated_at,
)?;

let total_count = sqlx::query_scalar(&count_query)
Expand Down Expand Up @@ -798,6 +822,7 @@ impl DojoWorld {
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let (where_clause, having_clause, join_clause, bind_values) =
build_composite_clause(table, model_relation_table, &composite)?;
Expand Down Expand Up @@ -868,6 +893,7 @@ impl DojoWorld {
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,
)
.await?;
Ok((entities, total_count))
Expand Down Expand Up @@ -1036,6 +1062,7 @@ impl DojoWorld {
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,
)
.await?
}
Expand All @@ -1059,6 +1086,7 @@ impl DojoWorld {
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,
)
.await?
}
Expand All @@ -1073,6 +1101,7 @@ impl DojoWorld {
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,
)
.await?
}
Expand All @@ -1087,6 +1116,7 @@ impl DojoWorld {
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,
)
.await?
}
Expand All @@ -1101,6 +1131,7 @@ impl DojoWorld {
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,
)
.await?
}
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/tests/entities_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ async fn test_entities_queries(sequencer: &RunnerCtx) {
false,
None,
vec![],
0,
)
.await
.unwrap()
Expand Down
6 changes: 6 additions & 0 deletions crates/torii/grpc/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ pub struct Query {
pub offset: u32,
pub dont_include_hashed_keys: bool,
pub order_by: Vec<OrderBy>,
/// If the array is not empty, only the given models are retrieved.
/// All entities that don't have a model in the array are excluded.
pub entity_models: Vec<String>,
/// The internal updated at timestamp in seconds (unix timestamp) from which entities are
/// retrieved (inclusive). Use 0 to retrieve all entities.
pub internal_updated_at: u64,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
Expand Down Expand Up @@ -271,6 +276,7 @@ impl From<Query> for proto::types::Query {
dont_include_hashed_keys: value.dont_include_hashed_keys,
order_by: value.order_by.into_iter().map(|o| o.into()).collect(),
entity_models: value.entity_models,
internal_updated_at: value.internal_updated_at,
}
}
}
Expand Down

0 comments on commit 8111b6d

Please sign in to comment.