Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce updated at field in top level torii query #2807

Merged
merged 8 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/torii/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ pub enum QueryError {
SqliteJoinLimit,
#[error("Invalid namespaced model: {0}")]
InvalidNamespacedModel(String),
#[error("Invalid timestamp: {0}. Expected valid number of seconds since unix epoch.")]
InvalidTimestamp(u64),
}
20 changes: 20 additions & 0 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
}

/// Creates a query that fetches all models and their nested data.
#[allow(clippy::too_many_arguments)]
pub fn build_sql_query(
schemas: &Vec<Ty>,
table_name: &str,
Expand All @@ -124,6 +125,7 @@
order_by: Option<&str>,
limit: Option<u32>,
offset: Option<u32>,
internal_updated_at: u64,
) -> Result<(String, String), Error> {
fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
match ty {
Expand Down Expand Up @@ -172,6 +174,7 @@
selections.push(format!("{}.id", table_name));
selections.push(format!("{}.keys", table_name));

let mut internal_updated_at_clause = Vec::with_capacity(schemas.len());
// Process each model schema
for model in schemas {
let model_table = model.name();
Expand All @@ -180,6 +183,10 @@
[{model_table}].{entity_relation_column}",
));

if internal_updated_at > 0 {
internal_updated_at_clause.push(format!("[{model_table}].internal_updated_at >= ?"));

Check warning on line 187 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L187

Added line #L187 was not covered by tests
Comment on lines +186 to +187
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here there's a potential speedup if we can trust the updated_at that is inside the entities table. Which could avoid having this WHERE clause added for each models.

}

// Collect columns with table prefix
collect_columns(&model_table, "", model, &mut selections);
}
Expand All @@ -197,6 +204,18 @@
count_query += &format!(" WHERE {}", where_clause);
}

if !internal_updated_at_clause.is_empty() {
if where_clause.is_none() {
query += " WHERE ";
count_query += " WHERE ";
} else {
query += " AND ";
count_query += " AND ";
}
query += &format!(" {}", internal_updated_at_clause.join(" AND "));
count_query += &format!(" {}", internal_updated_at_clause.join(" AND "));

Check warning on line 216 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L208-L216

Added lines #L208 - L216 were not covered by tests
}

// Use custom order by if provided, otherwise default to event_id DESC
if let Some(order_clause) = order_by {
query += &format!(" ORDER BY {}", order_clause);
Expand Down Expand Up @@ -494,6 +513,7 @@
None,
None,
None,
0,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message Query {
bool dont_include_hashed_keys = 4;
repeated OrderBy order_by = 5;
repeated string entity_models = 6;
uint64 internal_updated_at = 7;
}

message EventQuery {
Expand Down
33 changes: 32 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use sqlx::prelude::FromRow;
use sqlx::sqlite::SqliteRow;
use sqlx::types::chrono::{DateTime, Utc};
use sqlx::{Pool, Row, Sqlite};
use starknet::core::types::Felt;
use starknet::providers::jsonrpc::HttpTransport;
Expand Down Expand Up @@ -229,6 +230,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,

Check warning on line 233 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L233

Added line #L233 was not covered by tests
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
self.query_by_hashed_keys(
table,
Expand All @@ -240,6 +242,7 @@
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,

Check warning on line 245 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L245

Added line #L245 was not covered by tests
)
.await
}
Expand All @@ -258,6 +261,7 @@
row_events.iter().map(map_row_to_event).collect()
}

#[allow(clippy::too_many_arguments)]
async fn fetch_entities(
&self,
table: &str,
Expand All @@ -266,6 +270,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<Vec<proto::types::Entity>, Error> {
let entity_models =
entity_models.iter().map(|tag| compute_selector_from_tag(tag)).collect::<Vec<Felt>>();
Expand Down Expand Up @@ -354,9 +359,22 @@
order_by,
None,
None,
internal_updated_at,
)?;

let rows = sqlx::query(&entity_query).bind(models_str).fetch_all(&mut *tx).await?;
let mut query = sqlx::query(&entity_query).bind(models_str);
if internal_updated_at > 0 {
for _ in 0..schemas.len() {
let time = DateTime::<Utc>::from_timestamp(internal_updated_at as i64, 0)
.ok_or_else(|| {
Error::from(QueryError::InvalidTimestamp(internal_updated_at))
})?
.to_rfc3339();
Comment on lines +368 to +372
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Fix incorrect timestamp conversion and handle possible overflow

  • The DateTime::<Utc>::from_timestamp method does not exist. Use Utc.timestamp_opt instead.
  • Casting internal_updated_at from u64 to i64 can cause overflow if the value exceeds i64::MAX. Ensure internal_updated_at is within the valid range before casting.

Apply this diff to fix the code:

+        if internal_updated_at > i64::MAX as u64 {
+            return Err(Error::from(QueryError::InvalidTimestamp(internal_updated_at)));
+        }
-        let time = DateTime::<Utc>::from_timestamp(internal_updated_at as i64, 0)
-            .ok_or_else(|| {
-                Error::from(QueryError::InvalidTimestamp(internal_updated_at))
-            })?
-            .to_rfc3339();
+        let time = Utc.timestamp_opt(internal_updated_at as i64, 0)
+            .single()
+            .ok_or_else(|| Error::from(QueryError::InvalidTimestamp(internal_updated_at)))?
+            .to_rfc3339();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let time = DateTime::<Utc>::from_timestamp(internal_updated_at as i64, 0)
.ok_or_else(|| {
Error::from(QueryError::InvalidTimestamp(internal_updated_at))
})?
.to_rfc3339();
if internal_updated_at > i64::MAX as u64 {
return Err(Error::from(QueryError::InvalidTimestamp(internal_updated_at)));
}
let time = Utc.timestamp_opt(internal_updated_at as i64, 0)
.single()
.ok_or_else(|| Error::from(QueryError::InvalidTimestamp(internal_updated_at)))?
.to_rfc3339();

query = query.bind(time.clone());

Check warning on line 373 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L367-L373

Added lines #L367 - L373 were not covered by tests
}
}
let rows = query.fetch_all(&mut *tx).await?;

let schemas = Arc::new(schemas);

let group_entities: Result<Vec<_>, Error> = rows
Expand Down Expand Up @@ -430,6 +448,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,

Check warning on line 451 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L451

Added line #L451 was not covered by tests
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
// TODO: use prepared statement for where clause
let filter_ids = match hashed_keys {
Expand Down Expand Up @@ -510,6 +529,7 @@
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,

Check warning on line 532 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L532

Added line #L532 was not covered by tests
)
.await?;
Ok((entities, total_count))
Expand All @@ -527,6 +547,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let keys_pattern = build_keys_pattern(keys_clause)?;

Expand Down Expand Up @@ -655,6 +676,7 @@
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,
)
.await?;
Ok((entities, total_count))
Expand Down Expand Up @@ -699,6 +721,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,

Check warning on line 724 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L724

Added line #L724 was not covered by tests
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let entity_models =
entity_models.iter().map(|model| compute_selector_from_tag(model)).collect::<Vec<_>>();
Expand Down Expand Up @@ -765,6 +788,7 @@
order_by,
limit,
offset,
internal_updated_at,

Check warning on line 791 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L791

Added line #L791 was not covered by tests
)?;

let total_count = sqlx::query_scalar(&count_query)
Expand Down Expand Up @@ -798,6 +822,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,

Check warning on line 825 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L825

Added line #L825 was not covered by tests
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let (where_clause, having_clause, join_clause, bind_values) =
build_composite_clause(table, model_relation_table, &composite)?;
Expand Down Expand Up @@ -868,6 +893,7 @@
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,

Check warning on line 896 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L896

Added line #L896 was not covered by tests
)
.await?;
Ok((entities, total_count))
Expand Down Expand Up @@ -1036,6 +1062,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1065 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1065

Added line #L1065 was not covered by tests
)
.await?
}
Expand All @@ -1059,6 +1086,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1089 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1089

Added line #L1089 was not covered by tests
)
.await?
}
Expand All @@ -1073,6 +1101,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1104 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1104

Added line #L1104 was not covered by tests
)
.await?
}
Expand All @@ -1087,6 +1116,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1119 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1119

Added line #L1119 was not covered by tests
)
.await?
}
Expand All @@ -1101,6 +1131,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1134 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1134

Added line #L1134 was not covered by tests
)
.await?
}
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/tests/entities_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ async fn test_entities_queries(sequencer: &RunnerCtx) {
false,
None,
vec![],
0,
)
.await
.unwrap()
Expand Down
6 changes: 6 additions & 0 deletions crates/torii/grpc/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@
pub offset: u32,
pub dont_include_hashed_keys: bool,
pub order_by: Vec<OrderBy>,
/// If the array is not empty, only the given models are retrieved.
/// All entities that don't have a model in the array are excluded.
pub entity_models: Vec<String>,
/// The internal updated at timestamp in seconds (unix timestamp) from which entities are
/// retrieved (inclusive). Use 0 to retrieve all entities.
pub internal_updated_at: u64,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
Expand Down Expand Up @@ -271,6 +276,7 @@
dont_include_hashed_keys: value.dont_include_hashed_keys,
order_by: value.order_by.into_iter().map(|o| o.into()).collect(),
entity_models: value.entity_models,
internal_updated_at: value.internal_updated_at,

Check warning on line 279 in crates/torii/grpc/src/types/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/types/mod.rs#L279

Added line #L279 was not covered by tests
}
}
}
Expand Down
Loading