Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce updated at field in top level torii query #2807

Merged
merged 8 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/torii/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ pub enum QueryError {
SqliteJoinLimit,
#[error("Invalid namespaced model: {0}")]
InvalidNamespacedModel(String),
#[error("Invalid timestamp: {0}. Expected valid number of seconds since unix epoch.")]
InvalidTimestamp(u64),
}
20 changes: 20 additions & 0 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
}

/// Creates a query that fetches all models and their nested data.
#[allow(clippy::too_many_arguments)]
pub fn build_sql_query(
schemas: &Vec<Ty>,
table_name: &str,
Expand All @@ -124,6 +125,7 @@
order_by: Option<&str>,
limit: Option<u32>,
offset: Option<u32>,
internal_updated_at: u64,
) -> Result<(String, String), Error> {
fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
match ty {
Expand Down Expand Up @@ -172,6 +174,7 @@
selections.push(format!("{}.id", table_name));
selections.push(format!("{}.keys", table_name));

let mut internal_updated_at_clause = Vec::with_capacity(schemas.len());
// Process each model schema
for model in schemas {
let model_table = model.name();
Expand All @@ -180,6 +183,10 @@
[{model_table}].{entity_relation_column}",
));

if internal_updated_at > 0 {
internal_updated_at_clause.push(format!("[{model_table}].internal_updated_at >= ?"));

Check warning on line 187 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L187

Added line #L187 was not covered by tests
Comment on lines +186 to +187
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here there's a potential speedup if we can trust the updated_at that is inside the entities table. Which could avoid having this WHERE clause added for each models.

}

// Collect columns with table prefix
collect_columns(&model_table, "", model, &mut selections);
}
Expand All @@ -197,6 +204,18 @@
count_query += &format!(" WHERE {}", where_clause);
}

if !internal_updated_at_clause.is_empty() {
if where_clause.is_none() {
query += " WHERE ";
count_query += " WHERE ";
} else {
query += " AND ";
count_query += " AND ";
}
query += &format!(" {}", internal_updated_at_clause.join(" AND "));
count_query += &format!(" {}", internal_updated_at_clause.join(" AND "));

Check warning on line 216 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L208-L216

Added lines #L208 - L216 were not covered by tests
}

// Use custom order by if provided, otherwise default to event_id DESC
if let Some(order_clause) = order_by {
query += &format!(" ORDER BY {}", order_clause);
Expand Down Expand Up @@ -494,6 +513,7 @@
None,
None,
None,
0,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message Query {
bool dont_include_hashed_keys = 4;
repeated OrderBy order_by = 5;
repeated string entity_models = 6;
uint64 internal_updated_at = 7;
}

message EventQuery {
Expand Down
58 changes: 48 additions & 10 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use sqlx::prelude::FromRow;
use sqlx::sqlite::SqliteRow;
use sqlx::types::chrono::{DateTime, Utc};
use sqlx::{Pool, Row, Sqlite};
use starknet::core::types::Felt;
use starknet::providers::jsonrpc::HttpTransport;
Expand Down Expand Up @@ -229,6 +230,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,

Check warning on line 233 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L233

Added line #L233 was not covered by tests
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
self.query_by_hashed_keys(
table,
Expand All @@ -240,6 +242,7 @@
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,

Check warning on line 245 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L245

Added line #L245 was not covered by tests
)
.await
}
Expand All @@ -258,6 +261,7 @@
row_events.iter().map(map_row_to_event).collect()
}

#[allow(clippy::too_many_arguments)]
async fn fetch_entities(
&self,
table: &str,
Expand All @@ -266,7 +270,8 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
) -> Result<Vec<proto::types::Entity>, Error> {
internal_updated_at: u64,
) -> Result<(Vec<proto::types::Entity>, usize), Error> {
let entity_models =
entity_models.iter().map(|tag| compute_selector_from_tag(tag)).collect::<Vec<Felt>>();

Expand Down Expand Up @@ -319,6 +324,8 @@
insert_start.elapsed()
);

let mut query_result_count: usize = 0;

let query_start = std::time::Instant::now();
for (models_str, entity_ids) in &model_groups {
tracing::debug!("Processing model group with {} entities", entity_ids.len());
Expand All @@ -344,7 +351,7 @@
continue;
}

let (entity_query, _) = build_sql_query(
let (entity_query, count_query) = build_sql_query(
&schemas,
table,
entity_relation_column,
Expand All @@ -354,9 +361,27 @@
order_by,
None,
None,
internal_updated_at,
)?;

let rows = sqlx::query(&entity_query).bind(models_str).fetch_all(&mut *tx).await?;
let mut query = sqlx::query(&entity_query).bind(models_str);
let mut count_query = sqlx::query(&count_query).bind(models_str);
if internal_updated_at > 0 {
for _ in 0..schemas.len() {
let time = DateTime::<Utc>::from_timestamp(internal_updated_at as i64, 0)
.ok_or_else(|| {
Error::from(QueryError::InvalidTimestamp(internal_updated_at))
})?
.to_rfc3339();
Comment on lines +368 to +372
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Fix incorrect timestamp conversion and handle possible overflow

  • The DateTime::<Utc>::from_timestamp method does not exist. Use Utc.timestamp_opt instead.
  • Casting internal_updated_at from u64 to i64 can cause overflow if the value exceeds i64::MAX. Ensure internal_updated_at is within the valid range before casting.

Apply this diff to fix the code:

+        if internal_updated_at > i64::MAX as u64 {
+            return Err(Error::from(QueryError::InvalidTimestamp(internal_updated_at)));
+        }
-        let time = DateTime::<Utc>::from_timestamp(internal_updated_at as i64, 0)
-            .ok_or_else(|| {
-                Error::from(QueryError::InvalidTimestamp(internal_updated_at))
-            })?
-            .to_rfc3339();
+        let time = Utc.timestamp_opt(internal_updated_at as i64, 0)
+            .single()
+            .ok_or_else(|| Error::from(QueryError::InvalidTimestamp(internal_updated_at)))?
+            .to_rfc3339();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let time = DateTime::<Utc>::from_timestamp(internal_updated_at as i64, 0)
.ok_or_else(|| {
Error::from(QueryError::InvalidTimestamp(internal_updated_at))
})?
.to_rfc3339();
if internal_updated_at > i64::MAX as u64 {
return Err(Error::from(QueryError::InvalidTimestamp(internal_updated_at)));
}
let time = Utc.timestamp_opt(internal_updated_at as i64, 0)
.single()
.ok_or_else(|| Error::from(QueryError::InvalidTimestamp(internal_updated_at)))?
.to_rfc3339();

query = query.bind(time.clone());
count_query = count_query.bind(time);

Check warning on line 377 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L370-L377

Added lines #L370 - L377 were not covered by tests
}
}
let rows = query.fetch_all(&mut *tx).await?;
let fetch_one_result = count_query.fetch_one(&mut *tx).await?;
let query_count: u32 = fetch_one_result.get(0);
query_result_count += query_count as usize;

let schemas = Arc::new(schemas);

let group_entities: Result<Vec<_>, Error> = rows
Expand All @@ -376,7 +401,7 @@

tracing::debug!("Total fetch_entities operation took {:?}", start.elapsed());

Ok(all_entities)
Ok((all_entities, query_result_count))
}

async fn fetch_historical_event_messages(
Expand Down Expand Up @@ -430,6 +455,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,

Check warning on line 458 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L458

Added line #L458 was not covered by tests
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
// TODO: use prepared statement for where clause
let filter_ids = match hashed_keys {
Expand Down Expand Up @@ -502,17 +528,18 @@
let db_entities: Vec<(String, String)> =
sqlx::query_as(&query).bind(limit).bind(offset).fetch_all(&self.pool).await?;

let entities = self
let (entities, query_result_count) = self

Check warning on line 531 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L531

Added line #L531 was not covered by tests
.fetch_entities(
table,
entity_relation_column,
db_entities,
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,

Check warning on line 539 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L539

Added line #L539 was not covered by tests
)
.await?;
Ok((entities, total_count))
Ok((entities, query_result_count as u32))

Check warning on line 542 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L542

Added line #L542 was not covered by tests
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -527,6 +554,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let keys_pattern = build_keys_pattern(keys_clause)?;

Expand Down Expand Up @@ -647,17 +675,18 @@
.fetch_all(&self.pool)
.await?;

let entities = self
let (entities, query_result_count) = self
.fetch_entities(
table,
entity_relation_column,
db_entities,
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,
)
.await?;
Ok((entities, total_count))
Ok((entities, query_result_count as u32))
}

pub(crate) async fn events_by_keys(
Expand Down Expand Up @@ -699,6 +728,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,

Check warning on line 731 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L731

Added line #L731 was not covered by tests
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let entity_models =
entity_models.iter().map(|model| compute_selector_from_tag(model)).collect::<Vec<_>>();
Expand Down Expand Up @@ -765,6 +795,7 @@
order_by,
limit,
offset,
internal_updated_at,

Check warning on line 798 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L798

Added line #L798 was not covered by tests
)?;

let total_count = sqlx::query_scalar(&count_query)
Expand Down Expand Up @@ -798,6 +829,7 @@
dont_include_hashed_keys: bool,
order_by: Option<&str>,
entity_models: Vec<String>,
internal_updated_at: u64,

Check warning on line 832 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L832

Added line #L832 was not covered by tests
) -> Result<(Vec<proto::types::Entity>, u32), Error> {
let (where_clause, having_clause, join_clause, bind_values) =
build_composite_clause(table, model_relation_table, &composite)?;
Expand Down Expand Up @@ -860,17 +892,18 @@

let db_entities: Vec<(String, String)> = db_query.fetch_all(&self.pool).await?;

let entities = self
let (entities, query_result_count) = self

Check warning on line 895 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L895

Added line #L895 was not covered by tests
.fetch_entities(
table,
entity_relation_column,
db_entities,
dont_include_hashed_keys,
order_by,
entity_models,
internal_updated_at,

Check warning on line 903 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L903

Added line #L903 was not covered by tests
)
.await?;
Ok((entities, total_count))
Ok((entities, query_result_count as u32))

Check warning on line 906 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L906

Added line #L906 was not covered by tests
}

pub async fn model_metadata(
Expand Down Expand Up @@ -1036,6 +1069,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1072 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1072

Added line #L1072 was not covered by tests
)
.await?
}
Expand All @@ -1059,6 +1093,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1096 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1096

Added line #L1096 was not covered by tests
)
.await?
}
Expand All @@ -1073,6 +1108,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1111 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1111

Added line #L1111 was not covered by tests
)
.await?
}
Expand All @@ -1087,6 +1123,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1126 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1126

Added line #L1126 was not covered by tests
)
.await?
}
Expand All @@ -1101,6 +1138,7 @@
query.dont_include_hashed_keys,
order_by,
query.entity_models,
query.internal_updated_at,

Check warning on line 1141 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1141

Added line #L1141 was not covered by tests
)
.await?
}
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/tests/entities_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ async fn test_entities_queries(sequencer: &RunnerCtx) {
false,
None,
vec![],
0,
)
.await
.unwrap()
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/grpc/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
pub dont_include_hashed_keys: bool,
pub order_by: Vec<OrderBy>,
pub entity_models: Vec<String>,
pub internal_updated_at: u64,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
Expand Down Expand Up @@ -271,6 +272,7 @@
dont_include_hashed_keys: value.dont_include_hashed_keys,
order_by: value.order_by.into_iter().map(|o| o.into()).collect(),
entity_models: value.entity_models,
internal_updated_at: value.internal_updated_at,

Check warning on line 275 in crates/torii/grpc/src/types/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/types/mod.rs#L275

Added line #L275 was not covered by tests
}
}
}
Expand Down
Loading