Skip to content

Commit

Permalink
refactor(torii-grpc): chunk schema joins to avoid sqlite join limit (#…
Browse files Browse the repository at this point in the history
…2839)

* refactor(torii-grpc): chunk schema joins to avoid sqlite join limit

* update member & composite

* fmt

* fix

* clippy
  • Loading branch information
Larkooo authored Dec 23, 2024
1 parent 7b37e22 commit 6be2d0c
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 115 deletions.
149 changes: 149 additions & 0 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,155 @@ pub fn map_row_to_ty(
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub async fn fetch_entities(
pool: &Pool<sqlx::Sqlite>,
schemas: &[Ty],
table_name: &str,
model_relation_table: &str,
entity_relation_column: &str,
where_clause: Option<&str>,
having_clause: Option<&str>,
order_by: Option<&str>,
limit: Option<u32>,
offset: Option<u32>,
bind_values: Vec<String>,
) -> Result<(Vec<sqlx::sqlite::SqliteRow>, u32), Error> {
// Helper function to collect columns (existing implementation)
fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
match ty {
Ty::Struct(s) => {
for child in &s.children {
let new_path = if path.is_empty() {
child.name.clone()
} else {
format!("{}.{}", path, child.name)
};
collect_columns(table_prefix, &new_path, &child.ty, selections);
}
}
Ty::Tuple(t) => {
for (i, child) in t.iter().enumerate() {
let new_path =
if path.is_empty() { format!("{}", i) } else { format!("{}.{}", path, i) };
collect_columns(table_prefix, &new_path, child, selections);
}
}
Ty::Enum(e) => {
// Add the enum variant column with table prefix and alias
selections.push(format!("[{table_prefix}].[{path}] as \"{table_prefix}.{path}\"",));

// Add columns for each variant's value (if not empty tuple)
for option in &e.options {
if let Ty::Tuple(t) = &option.ty {
if t.is_empty() {
continue;
}
}
let variant_path = format!("{}.{}", path, option.name);
collect_columns(table_prefix, &variant_path, &option.ty, selections);
}
}
Ty::Array(_) | Ty::Primitive(_) | Ty::ByteArray(_) => {
selections.push(format!("[{table_prefix}].[{path}] as \"{table_prefix}.{path}\"",));
}
}
}

const MAX_JOINS: usize = 64;
let schema_chunks = schemas.chunks(MAX_JOINS);
let mut total_count = 0;
let mut all_rows = Vec::new();

for chunk in schema_chunks {
let mut selections = Vec::new();
let mut joins = Vec::new();

// Add base table columns
selections.push(format!("{}.id", table_name));
selections.push(format!("{}.keys", table_name));
selections.push(format!("group_concat({model_relation_table}.model_id) as model_ids"));

// Process each model schema in the chunk
for model in chunk {
let model_table = model.name();
joins.push(format!(
"LEFT JOIN [{model_table}] ON {table_name}.id = \
[{model_table}].{entity_relation_column}"
));
collect_columns(&model_table, "", model, &mut selections);
}

joins.push(format!(
"JOIN {model_relation_table} ON {table_name}.id = {model_relation_table}.entity_id"
));

let selections_clause = selections.join(", ");
let joins_clause = joins.join(" ");

// Build count query
let count_query = format!(
"SELECT COUNT(*) FROM (SELECT {}.id, group_concat({}.model_id) as model_ids FROM [{}] \
{} {} GROUP BY {}.id {})",
table_name,
model_relation_table,
table_name,
joins_clause,
where_clause.map_or(String::new(), |w| format!(" WHERE {}", w)),
table_name,
having_clause.map_or(String::new(), |h| format!(" HAVING {}", h))
);

// Execute count query
let mut count_stmt = sqlx::query_scalar(&count_query);
for value in &bind_values {
count_stmt = count_stmt.bind(value);
}
let chunk_count: u32 = count_stmt.fetch_one(pool).await?;
total_count += chunk_count;

if chunk_count > 0 {
// Build main query
let mut query =
format!("SELECT {} FROM [{}] {}", selections_clause, table_name, joins_clause);

if let Some(where_clause) = where_clause {
query += &format!(" WHERE {}", where_clause);
}

query += &format!(" GROUP BY {table_name}.id");

if let Some(having_clause) = having_clause {
query += &format!(" HAVING {}", having_clause);
}

if let Some(order_clause) = order_by {
query += &format!(" ORDER BY {}", order_clause);
} else {
query += &format!(" ORDER BY {}.event_id DESC", table_name);
}

if let Some(limit) = limit {
query += &format!(" LIMIT {}", limit);
}

if let Some(offset) = offset {
query += &format!(" OFFSET {}", offset);
}

// Execute main query
let mut stmt = sqlx::query(&query);
for value in &bind_values {
stmt = stmt.bind(value);
}
let chunk_rows = stmt.fetch_all(pool).await?;
all_rows.extend(chunk_rows);
}
}

Ok((all_rows, total_count))
}

#[cfg(test)]
mod tests {
use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty};
Expand Down
Loading

0 comments on commit 6be2d0c

Please sign in to comment.