From b0d998fa278e6f537c794a053fbfe3e9f21eb488 Mon Sep 17 00:00:00 2001 From: Tarrence van As Date: Mon, 25 Sep 2023 16:24:30 -0400 Subject: [PATCH] Torii integration test work --- .vscode/settings.json | 6 + Cargo.lock | 4 + crates/dojo-test-utils/src/lib.rs | 1 + crates/dojo-test-utils/src/migration.rs | 15 ++ crates/dojo-types/src/component.rs | 4 +- crates/sozo/src/commands/completions.rs | 2 +- crates/sozo/src/lib.rs | 3 + crates/sozo/src/main.rs | 9 +- .../sozo/src/ops/migration/migration_test.rs | 36 ++- crates/sozo/src/ops/migration/mod.rs | 2 +- crates/torii/core/src/engine.rs | 106 +++++---- crates/torii/core/src/processors/mod.rs | 2 +- .../core/src/processors/register_model.rs | 4 +- .../core/src/processors/register_system.rs | 2 +- .../core/src/processors/store_set_record.rs | 2 +- .../core/src/processors/store_system_call.rs | 2 +- crates/torii/core/src/sql.rs | 148 ++++++------- crates/torii/core/src/sql_test.rs | 3 +- crates/torii/graphql/Cargo.toml | 4 + crates/torii/graphql/src/tests/common/mod.rs | 154 ------------- .../torii/graphql/src/tests/entities_test.rs | 37 +++- crates/torii/graphql/src/tests/mod.rs | 205 +++++++++++++++++- crates/torii/graphql/src/tests/models_test.rs | 4 +- .../graphql/src/tests/subscription_test.rs | 9 +- .../torii/migrations/20230316154230_setup.sql | 12 +- crates/torii/server/src/cli.rs | 6 +- 26 files changed, 435 insertions(+), 347 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 crates/dojo-test-utils/src/migration.rs create mode 100644 crates/sozo/src/lib.rs delete mode 100644 crates/torii/graphql/src/tests/common/mod.rs diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000..d0624a80d6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "rust-analyzer.linkedProjects": [ + "./crates/sozo/Cargo.toml", + "./crates/sozo/Cargo.toml" + ] +} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b5bb0d7e8b..8e4008272a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6924,17 +6924,21 @@ dependencies = [ "base64 0.21.4", "camino", "chrono", + "dojo-test-utils", "dojo-types", "dojo-world", "indexmap 1.9.3", + "scarb-ui", "serde", "serde_json", + "sozo", "sqlx", "starknet", "starknet-crypto 0.6.0", "tokio", "tokio-stream", "tokio-util", + "torii-client", "torii-core", "tracing", "url", diff --git a/crates/dojo-test-utils/src/lib.rs b/crates/dojo-test-utils/src/lib.rs index e2f7b4af16..f8f7c63e82 100644 --- a/crates/dojo-test-utils/src/lib.rs +++ b/crates/dojo-test-utils/src/lib.rs @@ -1,4 +1,5 @@ pub mod compiler; +pub mod migration; pub mod rpc; pub mod sequencer; diff --git a/crates/dojo-test-utils/src/migration.rs b/crates/dojo-test-utils/src/migration.rs new file mode 100644 index 0000000000..4e5c001183 --- /dev/null +++ b/crates/dojo-test-utils/src/migration.rs @@ -0,0 +1,15 @@ +use std::path::PathBuf; + +use anyhow::Result; +use camino::Utf8PathBuf; +use dojo_world::manifest::Manifest; +use dojo_world::migration::strategy::{prepare_for_migration, MigrationStrategy}; +use dojo_world::migration::world::WorldDiff; +use starknet::macros::felt; + +pub fn prepare_migration(path: PathBuf) -> Result { + let target_dir = Utf8PathBuf::from_path_buf(path).unwrap(); + let manifest = Manifest::load_from_path(target_dir.join("manifest.json")).unwrap(); + let world = WorldDiff::compute(manifest, None); + prepare_for_migration(None, Some(felt!("0x12345")), target_dir, world) +} diff --git a/crates/dojo-types/src/component.rs b/crates/dojo-types/src/component.rs index 4773d97457..a1ed74fcbe 100644 --- a/crates/dojo-types/src/component.rs +++ b/crates/dojo-types/src/component.rs @@ -25,7 +25,9 @@ impl Ty { } pub fn flatten(&self) -> Vec { - Ty::flatten_ty(self.clone()) + let mut flattened = Ty::flatten_ty(self.clone()); + flattened.reverse(); + flattened } fn flatten_ty(ty: Ty) -> Vec { diff --git a/crates/sozo/src/commands/completions.rs b/crates/sozo/src/commands/completions.rs index 123cb04659..65c99ac2bb 100644 --- a/crates/sozo/src/commands/completions.rs +++ b/crates/sozo/src/commands/completions.rs @@ -4,7 +4,7 @@ use anyhow::Result; use clap::{Args, CommandFactory}; use clap_complete::{generate, Shell}; -use crate::SozoArgs; +use crate::args::SozoArgs; #[derive(Args, Debug)] pub struct CompletionsArgs { diff --git a/crates/sozo/src/lib.rs b/crates/sozo/src/lib.rs new file mode 100644 index 0000000000..fc9ec51d87 --- /dev/null +++ b/crates/sozo/src/lib.rs @@ -0,0 +1,3 @@ +pub mod args; +pub mod commands; +pub mod ops; diff --git a/crates/sozo/src/main.rs b/crates/sozo/src/main.rs index b21d47e172..ac5d6c212d 100644 --- a/crates/sozo/src/main.rs +++ b/crates/sozo/src/main.rs @@ -8,12 +8,7 @@ use dojo_lang::plugin::CairoPluginRepository; use scarb::compiler::CompilerRepository; use scarb::core::Config; use scarb_ui::{OutputFormat, Ui}; - -mod args; -mod commands; -mod ops; - -use args::{Commands, SozoArgs}; +use sozo::args::{Commands, SozoArgs}; fn main() { let args = SozoArgs::parse(); @@ -46,5 +41,5 @@ fn cli_main(args: SozoArgs) -> Result<()> { .compilers(compilers) .build()?; - commands::run(args.command, &config) + sozo::commands::run(args.command, &config) } diff --git a/crates/sozo/src/ops/migration/migration_test.rs b/crates/sozo/src/ops/migration/migration_test.rs index e422912681..94d6975146 100644 --- a/crates/sozo/src/ops/migration/migration_test.rs +++ b/crates/sozo/src/ops/migration/migration_test.rs @@ -1,10 +1,10 @@ -use anyhow::Result; use camino::Utf8PathBuf; +use dojo_test_utils::migration::prepare_migration; use dojo_test_utils::sequencer::{ get_default_test_starknet_config, SequencerConfig, StarknetConfig, TestSequencer, }; use dojo_world::manifest::Manifest; -use dojo_world::migration::strategy::{prepare_for_migration, MigrationStrategy}; +use dojo_world::migration::strategy::prepare_for_migration; use dojo_world::migration::world::WorldDiff; use scarb_ui::{OutputFormat, Ui, Verbosity}; use starknet::accounts::{ExecutionEncoding, SingleOwnerAccount}; @@ -18,17 +18,10 @@ use starknet::signers::{LocalWallet, SigningKey}; use crate::commands::options::transaction::TransactionOptions; use crate::ops::migration::execute_strategy; -pub fn prepare_example_ecs_migration() -> Result { - let target_dir = Utf8PathBuf::from_path_buf("../../examples/ecs/target/dev".into()).unwrap(); - let manifest = Manifest::load_from_path(target_dir.join("manifest.json")).unwrap(); - let world = WorldDiff::compute(manifest, None); - prepare_for_migration(None, Some(felt!("0x12345")), target_dir, world) -} - #[tokio::test(flavor = "multi_thread")] async fn migrate_with_auto_mine() { let ui = Ui::new(Verbosity::Verbose, OutputFormat::Text); - let migration = prepare_example_ecs_migration().unwrap(); + let migration = prepare_migration("../../examples/ecs/target/dev".into()).unwrap(); let sequencer = TestSequencer::start(SequencerConfig::default(), get_default_test_starknet_config()).await; @@ -37,13 +30,14 @@ async fn migrate_with_auto_mine() { account.set_block_id(BlockId::Tag(BlockTag::Pending)); execute_strategy(&migration, &account, &ui, None).await.unwrap(); + sequencer.stop().unwrap(); } #[tokio::test(flavor = "multi_thread")] async fn migrate_with_block_time() { let ui = Ui::new(Verbosity::Verbose, OutputFormat::Text); - let migration = prepare_example_ecs_migration().unwrap(); + let migration = prepare_migration("../../examples/ecs/target/dev".into()).unwrap(); let sequencer = TestSequencer::start( SequencerConfig { block_time: Some(1000), ..Default::default() }, @@ -61,7 +55,7 @@ async fn migrate_with_block_time() { #[tokio::test(flavor = "multi_thread")] async fn migrate_with_small_fee_multiplier_will_fail() { let ui = Ui::new(Verbosity::Verbose, OutputFormat::Text); - let migration = prepare_example_ecs_migration().unwrap(); + let migration = prepare_migration("../../examples/ecs/target/dev".into()).unwrap(); let sequencer = TestSequencer::start( Default::default(), @@ -79,14 +73,16 @@ async fn migrate_with_small_fee_multiplier_will_fail() { ExecutionEncoding::Legacy, ); - assert!(execute_strategy( - &migration, - &account, - &ui, - Some(TransactionOptions { fee_estimate_multiplier: Some(0.2f64) }), - ) - .await - .is_err()); + assert!( + execute_strategy( + &migration, + &account, + &ui, + Some(TransactionOptions { fee_estimate_multiplier: Some(0.2f64) }), + ) + .await + .is_err() + ); sequencer.stop().unwrap(); } diff --git a/crates/sozo/src/ops/migration/mod.rs b/crates/sozo/src/ops/migration/mod.rs index 22fefe5640..62dcf9de00 100644 --- a/crates/sozo/src/ops/migration/mod.rs +++ b/crates/sozo/src/ops/migration/mod.rs @@ -240,7 +240,7 @@ where // returns the Some(block number) at which migration world is deployed, returns none if world was // not redeployed -async fn execute_strategy( +pub async fn execute_strategy( strategy: &MigrationStrategy, migrator: &SingleOwnerAccount, ui: &Ui, diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index e920ee59b1..ff3d7e7ed4 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -7,10 +7,9 @@ use starknet::core::types::{ }; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; -use starknet_crypto::FieldElement; use tokio::time::sleep; +use tokio_util::sync::CancellationToken; use torii_client::contract::world::WorldContractReader; - use tracing::{error, info, warn}; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; @@ -31,21 +30,19 @@ impl Default for Processors

{ #[derive(Debug)] pub struct EngineConfig { pub block_time: Duration, - pub world_address: FieldElement, pub start_block: u64, } impl Default for EngineConfig { fn default() -> Self { - Self { - block_time: Duration::from_secs(1), - world_address: FieldElement::ZERO, - start_block: 0, - } + Self { block_time: Duration::from_secs(1), start_block: 0 } } } -pub struct Engine<'a, P: Provider + Sync + Send> { +pub struct Engine<'a, P: Provider + Sync + Send> +where + P::Error: 'static, +{ world: &'a WorldContractReader<'a, P>, db: &'a Sql, provider: &'a P, @@ -53,7 +50,10 @@ pub struct Engine<'a, P: Provider + Sync + Send> { config: EngineConfig, } -impl<'a, P: Provider + Sync + Send> Engine<'a, P> { +impl<'a, P: Provider + Sync + Send> Engine<'a, P> +where + P::Error: 'static, +{ pub fn new( world: &'a WorldContractReader<'a, P>, db: &'a Sql, @@ -64,10 +64,10 @@ impl<'a, P: Provider + Sync + Send> Engine<'a, P> { Self { world, db, provider, processors, config } } - pub async fn start(&self) -> Result<(), Box> { + pub async fn start(&self, cts: CancellationToken) -> Result<(), Box> { let db_head = self.db.head().await?; - let mut current_block_number = match db_head { + let current_block_number = match db_head { 0 => self.config.start_block, _ => { if self.config.start_block != 0 { @@ -78,45 +78,57 @@ impl<'a, P: Provider + Sync + Send> Engine<'a, P> { }; loop { + if cts.is_cancelled() { + break Ok(()); + } + sleep(self.config.block_time).await; + match self.sync_to_head(current_block_number).await { + Ok(block_with_txs) => block_with_txs, + Err(e) => { + error!("getting block: {}", e); + continue; + } + }; + } + } - let latest_block_with_txs = - match self.provider.get_block_with_txs(BlockId::Tag(BlockTag::Latest)).await { - Ok(block_with_txs) => block_with_txs, - Err(e) => { - error!("getting block: {}", e); - continue; - } - }; + pub async fn sync_to_head(&self, from: u64) -> Result> { + let latest_block_with_txs = + self.provider.get_block_with_txs(BlockId::Tag(BlockTag::Latest)).await?; + + let latest_block_number = match latest_block_with_txs { + MaybePendingBlockWithTxs::Block(latest_block_with_txs) => { + latest_block_with_txs.block_number + } + _ => return Err(anyhow::anyhow!("Getting latest block number").into()), + }; + + self.sync_range(from, latest_block_number).await?; + + Ok(latest_block_number) + } - let latest_block_number = match latest_block_with_txs { - MaybePendingBlockWithTxs::Block(latest_block_with_txs) => { - latest_block_with_txs.block_number + pub async fn sync_range(&self, mut from: u64, to: u64) -> Result<(), Box> { + // Process all blocks from current to latest. + while from <= to { + let block_with_txs = match self.provider.get_block_with_txs(BlockId::Number(from)).await + { + Ok(block_with_txs) => block_with_txs, + Err(e) => { + error!("getting block: {}", e); + continue; } - _ => continue, }; - // Process all blocks from current to latest. - while current_block_number <= latest_block_number { - let block_with_txs = match self - .provider - .get_block_with_txs(BlockId::Number(current_block_number)) - .await - { - Ok(block_with_txs) => block_with_txs, - Err(e) => { - error!("getting block: {}", e); - continue; - } - }; - - self.process(block_with_txs).await?; + self.process(block_with_txs).await?; - self.db.set_head(current_block_number).await?; - self.db.execute().await?; - current_block_number += 1; - } + self.db.set_head(from).await?; + self.db.execute().await?; + from += 1; } + + Ok(()) } async fn process(&self, block: MaybePendingBlockWithTxs) -> Result<(), Box> { @@ -154,7 +166,7 @@ impl<'a, P: Provider + Sync + Send> Engine<'a, P> { if let TransactionReceipt::Invoke(invoke_receipt) = receipt.clone() { for (event_idx, event) in invoke_receipt.events.iter().enumerate() { - if event.from_address != self.config.world_address { + if event.from_address != self.world.address { continue; } @@ -188,7 +200,7 @@ impl<'a, P: Provider + Sync + Send> Engine<'a, P> { } } -async fn process_block( +async fn process_block( db: &Sql, provider: &P, processors: &[Box>], @@ -200,7 +212,7 @@ async fn process_block( Ok(()) } -async fn process_transaction( +async fn process_transaction( db: &Sql, provider: &P, processors: &[Box>], @@ -215,7 +227,7 @@ async fn process_transaction( } #[allow(clippy::too_many_arguments)] -async fn process_event( +async fn process_event( world: &WorldContractReader<'_, P>, db: &Sql, provider: &P, diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index 85c006005b..71cf21d5ff 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -9,7 +9,7 @@ use crate::sql::Sql; pub mod register_model; pub mod register_system; pub mod store_set_record; -pub mod store_system_call; +// pub mod store_system_call; #[async_trait] pub trait EventProcessor { diff --git a/crates/torii/core/src/processors/register_model.rs b/crates/torii/core/src/processors/register_model.rs index d20db2b4c7..95bcac0166 100644 --- a/crates/torii/core/src/processors/register_model.rs +++ b/crates/torii/core/src/processors/register_model.rs @@ -13,7 +13,7 @@ use crate::sql::Sql; pub struct RegisterModelProcessor; #[async_trait] -impl EventProcessor

for RegisterModelProcessor { +impl EventProcessor

for RegisterModelProcessor { fn event_key(&self) -> String { "ComponentRegistered".to_string() } @@ -31,7 +31,7 @@ impl EventProcessor

for RegisterModelProcessor let model = world.component(&name, BlockId::Tag(BlockTag::Latest)).await?; let schema = model.schema(BlockId::Tag(BlockTag::Latest)).await?; let layout = model.layout(BlockId::Tag(BlockTag::Latest)).await?; - info!("registered model: {}", name); + info!("Registered model: {}", name); db.register_model(schema, layout, event.data[1]).await?; diff --git a/crates/torii/core/src/processors/register_system.rs b/crates/torii/core/src/processors/register_system.rs index a7e29b9f5d..6f117518d2 100644 --- a/crates/torii/core/src/processors/register_system.rs +++ b/crates/torii/core/src/processors/register_system.rs @@ -14,7 +14,7 @@ use crate::sql::Sql; pub struct RegisterSystemProcessor; #[async_trait] -impl EventProcessor

for RegisterSystemProcessor { +impl EventProcessor

for RegisterSystemProcessor { fn event_key(&self) -> String { "SystemRegistered".to_string() } diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index f67f04b652..9cc14b9285 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -17,7 +17,7 @@ const COMPONENT_INDEX: usize = 0; const NUM_KEYS_INDEX: usize = 1; #[async_trait] -impl EventProcessor

for StoreSetRecordProcessor { +impl EventProcessor

for StoreSetRecordProcessor { fn event_key(&self) -> String { "StoreSetRecord".to_string() } diff --git a/crates/torii/core/src/processors/store_system_call.rs b/crates/torii/core/src/processors/store_system_call.rs index b0cfdb8c20..cfbdd5de71 100644 --- a/crates/torii/core/src/processors/store_system_call.rs +++ b/crates/torii/core/src/processors/store_system_call.rs @@ -20,7 +20,7 @@ const EXECUTE_ENTRYPOINT: &str = "0x240060cdb34fcc260f41eac7474ee1d7c80b7e3607daff9ac67c7ea2ebb1c44"; #[async_trait] -impl TransactionProcessor

for StoreSystemCallProcessor { +impl TransactionProcessor

for StoreSystemCallProcessor { async fn process( &self, db: &Sql, diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index adc2622a30..1c0d3b713e 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -22,7 +22,7 @@ use crate::types::{Entity, Model as ModelType}; mod test; lazy_static! { - static ref SQL_TYPES: HashMap = { + static ref CAIRO_TO_SQL_TYPE: HashMap = { let mut m = HashMap::new(); m.insert("u8".to_string(), "INTEGER".to_string()); m.insert("u16".to_string(), "INTEGER".to_string()); @@ -32,10 +32,10 @@ lazy_static! { m.insert("u256".to_string(), "TEXT".to_string()); m.insert("usize".to_string(), "INTEGER".to_string()); m.insert("bool".to_string(), "INTEGER".to_string()); - m.insert("Cursor".to_string(), "TEXT".to_string()); + // m.insert("Cursor".to_string(), "TEXT".to_string()); m.insert("ContractAddress".to_string(), "TEXT".to_string()); m.insert("ClassHash".to_string(), "TEXT".to_string()); - m.insert("DateTime".to_string(), "TEXT".to_string()); + // m.insert("DateTime".to_string(), "TEXT".to_string()); m.insert("felt252".to_string(), "TEXT".to_string()); m }; @@ -156,6 +156,7 @@ impl Sql { let root = types.first().unwrap(); let root_name = root.name(); + println!("root {}", root_name); let layout_blob = layout.iter().map(|x| (*x).try_into().unwrap()).collect::>(); let mut queries = vec![format!( "INSERT INTO models (id, name, class_hash, layout) VALUES ('{}', '{}', '{:#x}', '{}') \ @@ -166,10 +167,10 @@ impl Sql { hex::encode(&layout_blob), class_hash )]; - queries.extend(build_model_query(root, None)); + queries.extend(build_model_query(root, 0, None)); - for ty in &types[1..] { - queries.extend(build_model_query(ty, Some(root_name.clone()))); + for (model_idx, ty) in types[1..].iter().enumerate() { + queries.extend(build_model_query(ty, model_idx + 1, Some(root_name.clone()))); } self.queue(queries).await; @@ -219,37 +220,45 @@ impl Sql { let keys_str = felts_sql_string(&keys); let model_names = model_names_sql_string(entity_result, &model)?; - let insert_entities = format!( + let mut queries = vec![format!( "INSERT INTO entities (id, keys, model_names) VALUES ('{}', '{}', '{}') ON \ - CONFLICT(id) DO UPDATE SET - model_names=excluded.model_names, + CONFLICT(id) DO UPDATE SET model_names=excluded.model_names, \ updated_at=CURRENT_TIMESTAMP", entity_id, keys_str, model_names - ); + )]; - let member_names_result = - sqlx::query("SELECT * FROM model_members WHERE model_id = ? ORDER BY id ASC") - .bind(model.clone()) - .fetch_all(&self.pool) - .await?; + let members: Vec<(String, String, String)> = sqlx::query_as( + "SELECT id, name, type FROM model_members WHERE model_id = ? ORDER BY model_idx, \ + member_idx ASC", + ) + .bind(model.clone()) + .fetch_all(&self.pool) + .await?; + + let (primitive_members, _): (Vec<_>, Vec<_>) = + members.into_iter().partition(|member| CAIRO_TO_SQL_TYPE.contains_key(&member.2)); // keys are part of model members, so combine keys and model values array let mut member_values: Vec = Vec::new(); member_values.extend(keys); member_values.extend(values); - let names_str = members_sql_string(&member_names_result)?; - let values_str = values_sql_string(&member_names_result, &member_values)?; - - let insert_models = format!( - "INSERT OR REPLACE INTO external_{} (entity_id{}) VALUES ('{}' {})", - model, names_str, entity_id, values_str - ); + let insert_models: Vec<_> = primitive_members + .into_iter() + .zip(member_values.into_iter()) + .map(|((id, name, ty), value)| { + format!( + "INSERT OR REPLACE INTO [{id}] (entity_id, external_{name}) VALUES \ + ('{entity_id}' {})", + format_value(&ty, &value).unwrap() + ) + }) + .collect(); - println!("{insert_models}"); + queries.extend(insert_models); // tx commit required - self.queue(vec![insert_entities, insert_models]).await; + self.queue(queries).await; self.execute().await?; let query_result = sqlx::query("SELECT created_at FROM entities WHERE id = ?") @@ -342,10 +351,7 @@ impl Executable for Sql { query_queue.clear(); } - println!("{:?}", queries); - let mut tx = self.pool.begin().await?; - for query in queries { tx.execute(sqlx::query(&query)).await?; } @@ -372,89 +378,61 @@ fn model_names_sql_string(entity_result: Option, new_model: &str) -> Ok(model_names) } -fn values_sql_string(member_results: &[SqliteRow], values: &[FieldElement]) -> Result { - let types: Result> = - member_results.iter().map(|row| Ok(row.try_get::("type")?)).collect(); - // format according to type - let values: Result> = values - .iter() - .zip(types?.iter()) - .map(|(value, ty)| { - let sql_type = SQL_TYPES - .get(ty) - .ok_or_else(|| anyhow::anyhow!("SQL type not found for: {}", ty))?; - - match sql_type.as_str() { - "INTEGER" => Ok(format!(",'{}'", value)), - "TEXT" => Ok(format!(",'{:#x}'", value)), - _ => Err(anyhow::anyhow!("Format not supported for type: {}", ty)), - } - }) - .collect(); - - Ok(values?.join("")) -} - -fn members_sql_string(member_results: &[SqliteRow]) -> Result { - let names: Result> = member_results - .iter() - .map(|row| { - let name = row.try_get::("name")?; - Ok(format!(", external_{}", name)) - }) - .collect(); - - Ok(names?.join("")) +fn format_value(ty: &str, value: &FieldElement) -> Result { + match CAIRO_TO_SQL_TYPE.get(ty) { + Some(sql_type) => match sql_type.as_str() { + "INTEGER" => Ok(format!(", '{}'", value)), + "TEXT" => Ok(format!(", '{:#x}'", value)), + _ => Err(anyhow::anyhow!("Format not supported for type: {}", ty)), + }, + _ => Err(anyhow::anyhow!("Format not supported for type: {}", ty)), + } } fn felts_sql_string(felts: &[FieldElement]) -> String { felts.iter().map(|k| format!("{:#x}", k)).collect::>().join("/") + "/" } -fn build_model_query(model: &Ty, parent_id: Option) -> Vec { - let model_id = if let Some(parent_id) = parent_id.clone() { - format!("{parent_id}:{}", model.name()) +fn build_model_query(model: &Ty, model_idx: usize, parent_id: Option) -> Vec { + let name = if let Some(parent_id) = parent_id.clone() { + format!("{parent_id}${}", model.name()) } else { model.name() }; + let model_id = if let Some(parent_id) = parent_id.clone() { parent_id } else { model.name() }; let mut queries = vec![]; - let mut query = format!( - "CREATE TABLE IF NOT EXISTS external_{} (entity_id TEXT NOT NULL PRIMARY KEY, ", - model_id - ); + let mut query = + format!("CREATE TABLE IF NOT EXISTS [{}] (entity_id TEXT NOT NULL PRIMARY KEY, ", name); match model { Ty::Struct(s) => { - for (i, member) in s.children.iter().enumerate() { - if let Some(sql_type) = SQL_TYPES.get(&member.ty.name()) { - queries.push(format!( - "INSERT OR IGNORE INTO model_members (id, model_id, idx, name, type, key) \ - VALUES ('{model_id}', '{model_id}', '{i}', '{}', '{}', {})", - member.name, - member.ty.name(), - member.key, - )); - + for (member_idx, member) in s.children.iter().enumerate() { + if let Some(sql_type) = CAIRO_TO_SQL_TYPE.get(&member.ty.name()) { query.push_str(&format!("external_{} {}, ", member.name, sql_type)); }; + + queries.push(format!( + "INSERT OR IGNORE INTO model_members (id, model_id, model_idx, member_idx, \ + name, type, key) VALUES ('{name}', '{model_id}', '{model_idx}', \ + '{member_idx}', '{}', '{}', {})", + member.name, + member.ty.name(), + member.key, + )); } } Ty::Enum(_) => {} _ => {} } + query.push_str("created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, "); + if let Some(id) = parent_id { - query.push_str(&format!( - "parent_id TEXT NOT NULL DEFAULT {id}, FOREIGN KEY (parent_id) REFERENCES \ - external_{id} (id)), " - )); + query.push_str(&format!("FOREIGN KEY (entity_id) REFERENCES {id} (entity_id), ")); }; - query.push_str( - "created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (entity_id) REFERENCES entities(id));", - ); + query.push_str("FOREIGN KEY (entity_id) REFERENCES entities(id));"); queries.push(query); queries } diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index 2f89794f7a..72111069f1 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -70,8 +70,7 @@ async fn test_load_from_manifest(pool: SqlitePool) { assert_eq!(name, "Position"); assert_eq!(class_hash, format!("{:#x}", FieldElement::TWO)); - let position_models = - sqlx::query("SELECT * FROM external_Position").fetch_all(&pool).await.unwrap(); + let position_models = sqlx::query("SELECT * FROM [Position]").fetch_all(&pool).await.unwrap(); assert_eq!(position_models.len(), 0); state diff --git a/crates/torii/graphql/Cargo.toml b/crates/torii/graphql/Cargo.toml index 5847608b98..05e33083c2 100644 --- a/crates/torii/graphql/Cargo.toml +++ b/crates/torii/graphql/Cargo.toml @@ -16,8 +16,10 @@ async-trait.workspace = true base64 = "0.21.2" chrono.workspace = true indexmap = "1.9.3" +scarb-ui.workspace = true serde.workspace = true serde_json.workspace = true +sozo = { path = "../../sozo" } sqlx = { version = "0.6.2", features = [ "chrono", "macros", "offline", "runtime-actix-rustls", "sqlite", "uuid" ] } tokio-stream = "0.1.11" tokio-util = "0.7.7" @@ -29,7 +31,9 @@ warp.workspace = true [dev-dependencies] camino.workspace = true +dojo-test-utils = { path = "../../dojo-test-utils" } dojo-types = { path = "../../dojo-types" } dojo-world = { path = "../../dojo-world" } starknet-crypto.workspace = true starknet.workspace = true +torii-client = { path = "../client" } diff --git a/crates/torii/graphql/src/tests/common/mod.rs b/crates/torii/graphql/src/tests/common/mod.rs deleted file mode 100644 index 614c8c99d7..0000000000 --- a/crates/torii/graphql/src/tests/common/mod.rs +++ /dev/null @@ -1,154 +0,0 @@ -use camino::Utf8PathBuf; -use dojo_types::component::{Member, Struct, Ty}; -use serde::Deserialize; -use serde_json::Value; -use sqlx::SqlitePool; -use starknet::core::types::FieldElement; -use tokio_stream::StreamExt; -use torii_core::sql::{Executable, Sql}; - -use crate::schema::build_schema; - -#[derive(Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct Connection { - pub total_count: i64, - pub edges: Vec>, -} - -#[derive(Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct Edge { - pub node: T, - pub cursor: String, -} - -#[derive(Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct Entity { - pub model_names: String, - pub keys: Option>, - pub created_at: Option, -} - -#[derive(Deserialize, Debug)] -pub struct Moves { - pub __typename: String, - pub remaining: u32, - pub last_direction: u8, - pub entity: Option, -} - -#[derive(Deserialize, Debug)] -pub struct Position { - pub __typename: String, - pub x: u32, - pub y: u32, - pub entity: Option, -} - -pub enum Paginate { - Forward, - Backward, -} - -#[allow(dead_code)] -pub async fn run_graphql_query(pool: &SqlitePool, query: &str) -> Value { - let schema = build_schema(pool).await.unwrap(); - let res = schema.execute(query).await; - - assert!(res.errors.is_empty(), "GraphQL query returned errors: {:?}", res.errors); - serde_json::to_value(res.data).expect("Failed to serialize GraphQL response") -} - -pub async fn run_graphql_subscription( - pool: &SqlitePool, - subscription: &str, -) -> async_graphql::Value { - // Build dynamic schema - let schema = build_schema(pool).await.unwrap(); - schema.execute_stream(subscription).next().await.unwrap().into_result().unwrap().data - // fn subscribe() is called from inside dynamic subscription -} - -pub async fn entity_fixtures(pool: &SqlitePool) { - let state = init(pool).await; - - // Set entity with one moves model - // remaining: 10, last_direction: 0 - let key = vec![FieldElement::ONE]; - let moves_values = vec![FieldElement::from_hex_be("0xa").unwrap(), FieldElement::ZERO]; - state.set_entity("Moves".to_string(), key, moves_values.clone()).await.unwrap(); - - // Set entity with one position model - // x: 42 - // y: 69 - let key = vec![FieldElement::TWO]; - let position_values = vec![ - FieldElement::from_hex_be("0x2a").unwrap(), - FieldElement::from_hex_be("0x45").unwrap(), - ]; - state.set_entity("Position".to_string(), key, position_values.clone()).await.unwrap(); - - // Set an entity with both moves and position models - // remaining: 1, last_direction: 0 - // x: 69 - // y: 42 - let key = vec![FieldElement::THREE]; - let moves_values = vec![FieldElement::from_hex_be("0x1").unwrap(), FieldElement::ZERO]; - let position_values = vec![ - FieldElement::from_hex_be("0x45").unwrap(), - FieldElement::from_hex_be("0x2a").unwrap(), - ]; - state.set_entity("Moves".to_string(), key.clone(), moves_values).await.unwrap(); - state.set_entity("Position".to_string(), key, position_values).await.unwrap(); - - state.execute().await.unwrap(); -} - -pub async fn init(pool: &SqlitePool) -> Sql { - let manifest = dojo_world::manifest::Manifest::load_from_path( - Utf8PathBuf::from_path_buf("../../../examples/ecs/target/dev/manifest.json".into()) - .unwrap(), - ) - .unwrap(); - - let state = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap(); - state.load_from_manifest(manifest).await.unwrap(); - - state -} - -pub async fn paginate( - pool: &SqlitePool, - cursor: Option, - direction: Paginate, - page_size: usize, -) -> Connection { - let (first_last, before_after) = match direction { - Paginate::Forward => ("first", "after"), - Paginate::Backward => ("last", "before"), - }; - - let cursor = cursor.map_or(String::new(), |c| format!(", {before_after}: \"{c}\"")); - let query = format!( - " - {{ - entities ({first_last}: {page_size} {cursor}) - {{ - totalCount - edges {{ - cursor - node {{ - modelNames - }} - }} - }} - }} - " - ); - - let value = run_graphql_query(pool, &query).await; - let entities = value.get("entities").ok_or("entities not found").unwrap(); - serde_json::from_value(entities.clone()).unwrap() -} diff --git a/crates/torii/graphql/src/tests/entities_test.rs b/crates/torii/graphql/src/tests/entities_test.rs index 3acf973c7f..6ea97362e9 100644 --- a/crates/torii/graphql/src/tests/entities_test.rs +++ b/crates/torii/graphql/src/tests/entities_test.rs @@ -1,15 +1,36 @@ #[cfg(test)] mod tests { + use dojo_test_utils::migration::prepare_migration; + use dojo_test_utils::sequencer::{ + get_default_test_starknet_config, SequencerConfig, TestSequencer, + }; use sqlx::SqlitePool; + use starknet::providers::jsonrpc::HttpTransport; + use starknet::providers::JsonRpcClient; use starknet_crypto::{poseidon_hash_many, FieldElement}; + use torii_client::contract::world::WorldContractReader; + use torii_core::sql::Sql; - use crate::tests::common::{ - entity_fixtures, paginate, run_graphql_query, Entity, Moves, Paginate, Position, + use crate::tests::{ + bootstrap_engine, create_pool, entity_fixtures, paginate, run_graphql_query, Entity, Moves, + Paginate, Position, }; - #[sqlx::test(migrations = "../migrations")] - async fn test_entity(pool: SqlitePool) { - entity_fixtures(&pool).await; + #[tokio::test(flavor = "multi_thread")] + async fn test_entity() { + let pool = create_pool().await; + let db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap(); + let migration = prepare_migration("../../../examples/ecs/target/dev".into()).unwrap(); + let sequencer = + TestSequencer::start(SequencerConfig::default(), get_default_test_starknet_config()) + .await; + let provider = JsonRpcClient::new(HttpTransport::new(sequencer.url())); + let world = WorldContractReader::new(migration.world_address().unwrap(), &provider); + + let _ = bootstrap_engine(&world, &db, &provider, &migration, &sequencer).await; + + entity_fixtures(&db).await; + let entity_id = poseidon_hash_many(&[FieldElement::ONE]); let query = format!( r#" @@ -31,7 +52,8 @@ mod tests { #[ignore] #[sqlx::test(migrations = "../migrations")] async fn test_entity_models(pool: SqlitePool) { - entity_fixtures(&pool).await; + let db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap(); + entity_fixtures(&db).await; let entity_id = poseidon_hash_many(&[FieldElement::THREE]); let query = format!( @@ -70,7 +92,8 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_entities_pagination(pool: SqlitePool) { - entity_fixtures(&pool).await; + let db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap(); + entity_fixtures(&db).await; let page_size = 2; diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index b578891237..e5ed72f2c5 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -1,4 +1,203 @@ -mod common; +use camino::Utf8PathBuf; +use dojo_test_utils::sequencer::TestSequencer; +use dojo_world::migration::strategy::MigrationStrategy; +use scarb_ui::{OutputFormat, Ui, Verbosity}; +use serde::Deserialize; +use serde_json::Value; +use sozo::ops::migration::execute_strategy; +use sqlx::sqlite::SqlitePoolOptions; +use sqlx::SqlitePool; +use starknet::core::types::{BlockId, BlockTag, FieldElement}; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::JsonRpcClient; +use tokio_stream::StreamExt; +use torii_client::contract::world::WorldContractReader; +use torii_core::engine::{Engine, EngineConfig, Processors}; +use torii_core::processors::register_model::RegisterModelProcessor; +use torii_core::processors::register_system::RegisterSystemProcessor; +use torii_core::processors::store_set_record::StoreSetRecordProcessor; +use torii_core::sql::{Executable, Sql}; + mod entities_test; -mod models_test; -mod subscription_test; +// mod models_test; +// mod subscription_test; + +use crate::schema::build_schema; + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Connection { + pub total_count: i64, + pub edges: Vec>, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Edge { + pub node: T, + pub cursor: String, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Entity { + pub model_names: String, + pub keys: Option>, + pub created_at: Option, +} + +#[derive(Deserialize, Debug)] +pub struct Moves { + pub __typename: String, + pub remaining: u32, + pub last_direction: u8, + pub entity: Option, +} + +#[derive(Deserialize, Debug)] +pub struct Position { + pub __typename: String, + pub x: u32, + pub y: u32, + pub entity: Option, +} + +pub enum Paginate { + Forward, + Backward, +} + +#[allow(dead_code)] +pub async fn run_graphql_query(pool: &SqlitePool, query: &str) -> Value { + let schema = build_schema(pool).await.unwrap(); + let res = schema.execute(query).await; + + assert!(res.errors.is_empty(), "GraphQL query returned errors: {:?}", res.errors); + serde_json::to_value(res.data).expect("Failed to serialize GraphQL response") +} + +pub async fn create_pool() -> SqlitePool { + let pool = + SqlitePoolOptions::new().max_connections(5).connect("sqlite::memory:").await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + pool +} + +pub async fn bootstrap_engine<'a>( + world: &'a WorldContractReader<'a, JsonRpcClient>, + db: &'a Sql, + provider: &'a JsonRpcClient, + migration: &MigrationStrategy, + sequencer: &TestSequencer, +) -> Result>, Box> { + let mut account = sequencer.account(); + account.set_block_id(BlockId::Tag(BlockTag::Pending)); + + let manifest = dojo_world::manifest::Manifest::load_from_path( + Utf8PathBuf::from_path_buf("../../../examples/ecs/target/dev/manifest.json".into()) + .unwrap(), + ) + .unwrap(); + + db.load_from_manifest(manifest.clone()).await.unwrap(); + + let ui = Ui::new(Verbosity::Verbose, OutputFormat::Text); + execute_strategy(migration, &account, &ui, None).await.unwrap(); + + let engine = Engine::new( + world, + db, + provider, + Processors { + event: vec![ + Box::new(RegisterModelProcessor), + Box::new(RegisterSystemProcessor), + Box::new(StoreSetRecordProcessor), + ], + ..Processors::default() + }, + EngineConfig::default(), + ); + + let _ = engine.sync_to_head(0).await?; + + Ok(engine) +} + +#[allow(dead_code)] +pub async fn run_graphql_subscription( + pool: &SqlitePool, + subscription: &str, +) -> async_graphql::Value { + // Build dynamic schema + let schema = build_schema(pool).await.unwrap(); + schema.execute_stream(subscription).next().await.unwrap().into_result().unwrap().data + // fn subscribe() is called from inside dynamic subscription +} + +pub async fn entity_fixtures(db: &Sql) { + // Set entity with one moves model + // remaining: 10, last_direction: 0 + let key = vec![FieldElement::ONE]; + let moves_values = vec![FieldElement::from_hex_be("0xa").unwrap(), FieldElement::ZERO]; + db.set_entity("Moves".to_string(), key, moves_values.clone()).await.unwrap(); + + // Set entity with one position model + // x: 42 + // y: 69 + let key = vec![FieldElement::TWO]; + let position_values = vec![ + FieldElement::from_hex_be("0x2a").unwrap(), + FieldElement::from_hex_be("0x45").unwrap(), + ]; + db.set_entity("Position".to_string(), key, position_values.clone()).await.unwrap(); + + // Set an entity with both moves and position models + // remaining: 1, last_direction: 0 + // x: 69 + // y: 42 + let key = vec![FieldElement::THREE]; + let moves_values = vec![FieldElement::from_hex_be("0x1").unwrap(), FieldElement::ZERO]; + let position_values = vec![ + FieldElement::from_hex_be("0x45").unwrap(), + FieldElement::from_hex_be("0x2a").unwrap(), + ]; + db.set_entity("Moves".to_string(), key.clone(), moves_values).await.unwrap(); + db.set_entity("Position".to_string(), key, position_values).await.unwrap(); + + db.execute().await.unwrap(); +} + +pub async fn paginate( + pool: &SqlitePool, + cursor: Option, + direction: Paginate, + page_size: usize, +) -> Connection { + let (first_last, before_after) = match direction { + Paginate::Forward => ("first", "after"), + Paginate::Backward => ("last", "before"), + }; + + let cursor = cursor.map_or(String::new(), |c| format!(", {before_after}: \"{c}\"")); + let query = format!( + " + {{ + entities ({first_last}: {page_size} {cursor}) + {{ + totalCount + edges {{ + cursor + node {{ + modelNames + }} + }} + }} + }} + " + ); + + let value = run_graphql_query(pool, &query).await; + let entities = value.get("entities").ok_or("entities not found").unwrap(); + serde_json::from_value(entities.clone()).unwrap() +} diff --git a/crates/torii/graphql/src/tests/models_test.rs b/crates/torii/graphql/src/tests/models_test.rs index 824e8487be..4d61e5a08d 100644 --- a/crates/torii/graphql/src/tests/models_test.rs +++ b/crates/torii/graphql/src/tests/models_test.rs @@ -2,9 +2,7 @@ mod tests { use sqlx::SqlitePool; - use crate::tests::common::{ - entity_fixtures, run_graphql_query, Connection, Edge, Moves, Position, - }; + use crate::tests::{entity_fixtures, run_graphql_query, Connection, Edge, Moves, Position}; type OrderTestFn = dyn Fn(&Vec>) -> bool; diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index 00dea4314b..dea292bdb2 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -7,15 +7,17 @@ mod tests { use sqlx::SqlitePool; use starknet_crypto::{poseidon_hash_many, FieldElement}; use tokio::sync::mpsc; + use tokio_util::sync::CancellationToken; use torii_core::sql::Sql; - use crate::tests::common::{init, run_graphql_subscription}; + use crate::tests::{init, run_graphql_subscription}; #[sqlx::test(migrations = "../migrations")] async fn test_entity_subscription(pool: SqlitePool) { // Sleep in order to run this test in a single thread tokio::time::sleep(Duration::from_secs(1)).await; - let state = init(&pool).await; + let cts = CancellationToken::new(); + let state = init(cts, &pool).await; // 0. Preprocess expected entity value let key = vec![FieldElement::ONE]; let entity_id = format!("{:#x}", poseidon_hash_many(&key)); @@ -59,7 +61,8 @@ mod tests { async fn test_entity_subscription_with_id(pool: SqlitePool) { // Sleep in order to run this test in a single thread tokio::time::sleep(Duration::from_secs(1)).await; - let state = init(&pool).await; + let cts = CancellationToken::new(); + let state = init(cts, &pool).await; // 0. Preprocess expected entity value let key = vec![FieldElement::ONE]; let entity_id = format!("{:#x}", poseidon_hash_many(&key)); diff --git a/crates/torii/migrations/20230316154230_setup.sql b/crates/torii/migrations/20230316154230_setup.sql index 4fc74863b3..6d06ca0aef 100644 --- a/crates/torii/migrations/20230316154230_setup.sql +++ b/crates/torii/migrations/20230316154230_setup.sql @@ -25,15 +25,15 @@ CREATE TABLE models ( CREATE INDEX idx_models_created_at ON models (created_at); CREATE TABLE model_members( - id TEXT PRIMARY KEY, - idx INTEGER NOT NULL, + id TEXT NOT NULL, + model_idx INTEGER NOT NULL, + member_idx INTEGER NOT NULL, model_id TEXT NOT NULL, name TEXT NOT NULL, type TEXT NOT NULL, key BOOLEAN NOT NULL, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (model_id) REFERENCES models(id) - UNIQUE (model_id, name) + PRIMARY KEY (id, model_idx) FOREIGN KEY (model_id) REFERENCES models(id) UNIQUE (id, member_idx) ); CREATE INDEX idx_model_members_model_id ON model_members (model_id); @@ -46,7 +46,7 @@ CREATE TABLE system_calls ( created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (system_id) REFERENCES systems(id), UNIQUE (transaction_hash) -); +); CREATE INDEX idx_system_calls_created_at ON system_calls (created_at); @@ -69,6 +69,7 @@ CREATE TABLE entities ( ); CREATE INDEX idx_entities_keys ON entities (keys); + CREATE INDEX idx_entities_keys_create_on ON entities (keys, created_at); CREATE TABLE events ( @@ -80,4 +81,5 @@ CREATE TABLE events ( ); CREATE INDEX idx_events_keys ON events (keys); + CREATE INDEX idx_events_created_at ON events (created_at); \ No newline at end of file diff --git a/crates/torii/server/src/cli.rs b/crates/torii/server/src/cli.rs index 61710e46c8..80f73dd460 100644 --- a/crates/torii/server/src/cli.rs +++ b/crates/torii/server/src/cli.rs @@ -23,6 +23,8 @@ use tracing::error; use tracing_subscriber::fmt; use url::Url; +mod server; + /// Dojo World Indexer #[derive(Parser, Debug)] #[command(name = "torii", author, version, about, long_about = None)] @@ -101,7 +103,7 @@ async fn main() -> anyhow::Result<()> { &db, &provider, processors, - EngineConfig { world_address, start_block: args.start_block, ..Default::default() }, + EngineConfig { start_block: args.start_block, ..Default::default() }, ); let addr = format!("{}:{}", args.host, args.port) @@ -109,7 +111,7 @@ async fn main() -> anyhow::Result<()> { .expect("able to parse address"); tokio::select! { - res = engine.start() => { + res = engine.start(cts) => { if let Err(e) = res { error!("Indexer failed with error: {e}"); }