diff --git a/Cargo.lock b/Cargo.lock index b7f5845e06..a223b809e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7892,6 +7892,7 @@ dependencies = [ "sqlx", "starknet", "starknet-crypto 0.6.0", + "thiserror", "tokio", "tokio-stream", "tokio-util", diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index 1fef948d88..930d4c550c 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -28,6 +28,7 @@ slab = "0.4.2" sqlx = { version = "0.6.2", features = [ "chrono", "macros", "offline", "runtime-actix-rustls", "sqlite", "uuid" ] } starknet-crypto.workspace = true starknet.workspace = true +thiserror.workspace = true tokio = { version = "1.32.0", features = [ "sync" ], default-features = true } tokio-stream = "0.1.11" tokio-util = "0.7.7" diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 88aca228a1..e17b4258de 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -10,7 +10,7 @@ use starknet::providers::Provider; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::time::sleep; use tokio_util::sync::CancellationToken; -use torii_client::contract::world::WorldContractReader; +use torii_client::contract::world::{ContractReaderError, WorldContractReader}; use tracing::{error, info, warn}; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; @@ -28,6 +28,12 @@ impl Default for Processors

{ } } +#[derive(Debug, thiserror::Error)] +pub enum EngineError

{ + #[error(transparent)] + ContractReaderError(ContractReaderError

), +} + #[derive(Debug)] pub struct EngineConfig { pub block_time: Duration, @@ -68,8 +74,9 @@ where } pub async fn start(&mut self, cts: CancellationToken) -> Result<(), Box> { - if self.db.head().await? == 0 { - self.db.set_head(self.config.start_block).await?; + let mut head = self.db.head().await?; + if head == 0 { + head = self.config.start_block; } else if self.config.start_block != 0 { warn!("start block ignored, stored head exists and will be used instead"); } @@ -79,9 +86,8 @@ where break Ok(()); } - let head = self.db.head().await?; match self.sync_to_head(head).await { - Ok(block_with_txs) => block_with_txs, + Ok(latest_block_number) => head = latest_block_number, Err(e) => { error!("getting block: {}", e); continue; @@ -123,7 +129,7 @@ where self.process(block_with_txs).await?; - self.db.set_head(from).await?; + self.db.set_head(from); self.db.execute().await?; from += 1; } @@ -237,7 +243,7 @@ async fn process_event( event: &Event, event_idx: usize, ) -> Result<(), Box> { - db.store_event(event, event_idx, invoke_receipt.transaction_hash).await?; + db.store_event(event, event_idx, invoke_receipt.transaction_hash); for processor in processors { if get_selector_from_name(&processor.event_key())? == event.keys[0] { diff --git a/crates/torii/core/src/lib.rs b/crates/torii/core/src/lib.rs index 668937bca8..6049e63d98 100644 --- a/crates/torii/core/src/lib.rs +++ b/crates/torii/core/src/lib.rs @@ -9,6 +9,7 @@ pub mod simple_broker; pub mod sql; pub mod types; +#[allow(dead_code)] #[derive(FromRow, Deserialize)] pub struct World { #[sqlx(try_from = "String")] diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index eecd6b4ea0..34bca747a3 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Result}; use chrono::{DateTime, Utc}; use dojo_types::primitive::Primitive; use dojo_types::schema::Ty; -use dojo_world::manifest::{Manifest, System}; +use dojo_world::manifest::System; use sqlx::pool::PoolConnection; use sqlx::{Executor, Pool, Row, Sqlite}; use starknet::core::types::{Event, FieldElement}; @@ -48,30 +48,6 @@ impl Sql { Ok(Self { pool, world_address, query_queue: vec![] }) } - pub async fn load_from_manifest(&mut self, manifest: Manifest) -> Result<()> { - let mut updates = vec![ - format!("world_address = '{:#x}'", self.world_address), - format!("world_class_hash = '{:#x}'", manifest.world.class_hash), - format!("executor_class_hash = '{:#x}'", manifest.executor.class_hash), - ]; - - if let Some(executor_address) = manifest.executor.address { - updates.push(format!("executor_address = '{:#x}'", executor_address)); - } - - self.query_queue.push(format!( - "UPDATE worlds SET {} WHERE id = '{:#x}'", - updates.join(","), - self.world_address - )); - - for system in manifest.systems { - self.register_system(system).await?; - } - - self.execute().await - } - pub async fn head(&self) -> Result { let mut conn: PoolConnection = self.pool.acquire().await?; let indexer: (i64,) = sqlx::query_as(&format!( @@ -83,12 +59,11 @@ impl Sql { Ok(indexer.0.try_into().expect("doesnt fit in u64")) } - pub async fn set_head(&mut self, head: u64) -> Result<()> { + pub fn set_head(&mut self, head: u64) { self.query_queue.push(format!( "UPDATE indexers SET head = {head} WHERE id = '{:#x}'", self.world_address )); - Ok(()) } pub async fn world(&self) -> Result { @@ -101,19 +76,6 @@ impl Sql { Ok(meta) } - pub async fn set_world(&mut self, world: World) -> Result<()> { - self.query_queue.push(format!( - "UPDATE worlds SET world_address='{:#x}', world_class_hash='{:#x}', \ - executor_address='{:#x}', executor_class_hash='{:#x}' WHERE id = '{:#x}'", - world.world_address, - world.world_class_hash, - world.executor_address, - world.executor_class_hash, - world.world_address, - )); - Ok(()) - } - pub async fn register_model( &mut self, model: Ty, @@ -218,10 +180,9 @@ impl Sql { Ok(()) } - pub async fn delete_entity(&mut self, model: String, key: FieldElement) -> Result<()> { + pub fn delete_entity(&mut self, model: String, key: FieldElement) { let query = format!("DELETE FROM {model} WHERE id = {key}"); self.query_queue.push(query); - Ok(()) } pub async fn entity(&self, model: String, key: FieldElement) -> Result> { @@ -239,12 +200,12 @@ impl Sql { Ok(rows.drain(..).map(|row| serde_json::from_str(&row.2).unwrap()).collect()) } - pub async fn store_system_call( + pub fn store_system_call( &mut self, system: String, transaction_hash: FieldElement, calldata: &[FieldElement], - ) -> Result<()> { + ) { let query = format!( "INSERT OR IGNORE INTO system_calls (data, transaction_hash, system_id) VALUES ('{}', \ '{:#x}', '{}')", @@ -253,15 +214,9 @@ impl Sql { system ); self.query_queue.push(query); - Ok(()) } - pub async fn store_event( - &mut self, - event: &Event, - event_idx: usize, - transaction_hash: FieldElement, - ) -> Result<()> { + pub fn store_event(&mut self, event: &Event, event_idx: usize, transaction_hash: FieldElement) { let keys_str = felts_sql_string(&event.keys); let data_str = felts_sql_string(&event.data); @@ -273,7 +228,6 @@ impl Sql { ); self.query_queue.push(query); - Ok(()) } fn build_register_queries_recursive( diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index 84916a4b42..7ba279366e 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -1,15 +1,11 @@ -use camino::Utf8PathBuf; use dojo_test_utils::migration::prepare_migration; use dojo_test_utils::sequencer::{ get_default_test_starknet_config, SequencerConfig, TestSequencer, }; -use dojo_types::primitive::Primitive; -use dojo_types::schema::{Member, Struct, Ty}; -use dojo_world::manifest::System; use dojo_world::migration::strategy::MigrationStrategy; use scarb_ui::{OutputFormat, Ui, Verbosity}; use sozo::ops::migration::execute_strategy; -use sqlx::sqlite::{SqlitePool, SqlitePoolOptions}; +use sqlx::sqlite::SqlitePoolOptions; use starknet::core::types::{BlockId, BlockTag, Event, FieldElement}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; @@ -31,14 +27,6 @@ pub async fn bootstrap_engine<'a>( let mut account = sequencer.account(); account.set_block_id(BlockId::Tag(BlockTag::Pending)); - let manifest = dojo_world::manifest::Manifest::load_from_path( - Utf8PathBuf::from_path_buf("../../../examples/ecs/target/dev/manifest.json".into()) - .unwrap(), - ) - .unwrap(); - - db.load_from_manifest(manifest.clone()).await.unwrap(); - let ui = Ui::new(Verbosity::Verbose, OutputFormat::Text); execute_strategy(migration, &account, &ui, None).await.unwrap(); @@ -63,155 +51,52 @@ pub async fn bootstrap_engine<'a>( Ok(engine) } -#[sqlx::test(migrations = "../migrations")] -async fn test_load_from_manifest(pool: SqlitePool) { - let manifest = dojo_world::manifest::Manifest::load_from_path( - Utf8PathBuf::from_path_buf("../../../examples/ecs/target/dev/manifest.json".into()) - .unwrap(), - ) - .unwrap(); +#[tokio::test(flavor = "multi_thread")] +async fn test_load_from_remote() { + let pool = + SqlitePoolOptions::new().max_connections(5).connect("sqlite::memory:").await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + let migration = prepare_migration("../../../examples/ecs/target/dev".into()).unwrap(); + let sequencer = + TestSequencer::start(SequencerConfig::default(), get_default_test_starknet_config()).await; + let provider = JsonRpcClient::new(HttpTransport::new(sequencer.url())); + let world = WorldContractReader::new(migration.world_address().unwrap(), &provider); - let mut state = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap(); - state.load_from_manifest(manifest.clone()).await.unwrap(); + let mut db = Sql::new(pool.clone(), migration.world_address().unwrap()).await.unwrap(); + let _ = bootstrap_engine(&world, &mut db, &provider, &migration, &sequencer).await; let models = sqlx::query("SELECT * FROM models").fetch_all(&pool).await.unwrap(); - assert_eq!(models.len(), 0); - - let mut world = state.world().await.unwrap(); - - assert_eq!(world.world_address.0, FieldElement::ZERO); - assert_eq!(world.world_class_hash.0, manifest.world.class_hash); - assert_eq!(world.executor_address.0, FieldElement::ZERO); - assert_eq!(world.executor_class_hash.0, manifest.executor.class_hash); - - world.executor_address.0 = FieldElement::ONE; - state.set_world(world).await.unwrap(); - state.execute().await.unwrap(); - - let world = state.world().await.unwrap(); - assert_eq!(world.executor_address.0, FieldElement::ONE); - - let head = state.head().await.unwrap(); - assert_eq!(head, 0); - - state.set_head(1).await.unwrap(); - state.execute().await.unwrap(); - - let head = state.head().await.unwrap(); - assert_eq!(head, 1); - - state - .register_model( - Ty::Struct(Struct { - name: "Position".into(), - children: vec![ - Member { - name: "player".into(), - ty: Ty::Primitive(Primitive::ContractAddress(None)), - key: false, - }, - Member { - name: "x".to_string(), - key: true, - ty: Ty::Primitive(Primitive::U32(None)), - }, - Member { - name: "y".to_string(), - key: true, - ty: Ty::Primitive(Primitive::U32(None)), - }, - ], - }), - vec![], - FieldElement::TWO, - ) - .await - .unwrap(); - state.execute().await.unwrap(); - - let (id, name, class_hash): (String, String, String) = - sqlx::query_as("SELECT id, name, class_hash FROM models WHERE id = 'Position'") + assert_eq!(models.len(), 2); + + let (id, name): (String, String) = + sqlx::query_as("SELECT id, name FROM models WHERE id = 'Position'") .fetch_one(&pool) .await .unwrap(); assert_eq!(id, "Position"); assert_eq!(name, "Position"); - assert_eq!(class_hash, format!("{:#x}", FieldElement::TWO)); - - let position_models = sqlx::query("SELECT * FROM [Position]").fetch_all(&pool).await.unwrap(); - assert_eq!(position_models.len(), 0); - - state - .register_system(System { - name: "Position".into(), - inputs: vec![], - outputs: vec![], - class_hash: FieldElement::THREE, - dependencies: vec![], - ..Default::default() - }) - .await - .unwrap(); - state.execute().await.unwrap(); - - let (id, name, class_hash): (String, String, String) = - sqlx::query_as("SELECT id, name, class_hash FROM systems WHERE id = 'Position'") + + let (id, name): (String, String) = + sqlx::query_as("SELECT id, name FROM models WHERE id = 'Moves'") .fetch_one(&pool) .await .unwrap(); - assert_eq!(id, "Position"); - assert_eq!(name, "Position"); - assert_eq!(class_hash, format!("{:#x}", FieldElement::THREE)); - - state - .set_entity(Ty::Struct(Struct { - name: "Position".to_string(), - children: vec![ - Member { - name: "player".to_string(), - key: true, - ty: Ty::Primitive(Primitive::ContractAddress(Some(FieldElement::ONE))), - }, - Member { - name: "x".to_string(), - key: true, - ty: Ty::Primitive(Primitive::U32(Some(42))), - }, - Member { - name: "y".to_string(), - key: true, - ty: Ty::Primitive(Primitive::U32(Some(69))), - }, - ], - })) - .await - .unwrap(); - - // state - // .store_system_call( - // "Test".into(), - // FieldElement::from_str("0x4").unwrap(), - // &[FieldElement::ONE, FieldElement::TWO, FieldElement::THREE], - // ) - // .await - // .unwrap(); - - state - .store_event( - &Event { - from_address: FieldElement::ONE, - keys: Vec::from([FieldElement::TWO]), - data: Vec::from([FieldElement::TWO, FieldElement::THREE]), - }, - 0, - FieldElement::THREE, - ) - .await - .unwrap(); - - state.execute().await.unwrap(); + assert_eq!(id, "Moves"); + assert_eq!(name, "Moves"); + + db.store_event( + &Event { + from_address: FieldElement::ONE, + keys: Vec::from([FieldElement::TWO]), + data: Vec::from([FieldElement::TWO, FieldElement::THREE]), + }, + 0, + FieldElement::THREE, + ); + + db.execute().await.unwrap(); let keys = format!("{:#x}/", FieldElement::TWO); let query = format!("SELECT data, transaction_hash FROM events WHERE keys = '{}'", keys); @@ -220,18 +105,3 @@ async fn test_load_from_manifest(pool: SqlitePool) { assert_eq!(data, format!("{:#x}/{:#x}/", FieldElement::TWO, FieldElement::THREE)); assert_eq!(tx_hash, format!("{:#x}", FieldElement::THREE)) } - -#[tokio::test(flavor = "multi_thread")] -async fn test_load_from_remote() { - let pool = - SqlitePoolOptions::new().max_connections(5).connect("sqlite::memory:").await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let mut db = Sql::new(pool.clone(), FieldElement::ZERO).await.unwrap(); - let migration = prepare_migration("../../../examples/ecs/target/dev".into()).unwrap(); - let sequencer = - TestSequencer::start(SequencerConfig::default(), get_default_test_starknet_config()).await; - let provider = JsonRpcClient::new(HttpTransport::new(sequencer.url())); - let world = WorldContractReader::new(migration.world_address().unwrap(), &provider); - - let _ = bootstrap_engine(&world, &mut db, &provider, &migration, &sequencer).await; -} diff --git a/crates/torii/server/src/cli.rs b/crates/torii/server/src/cli.rs index d800d5bdb3..b0718e59b2 100644 --- a/crates/torii/server/src/cli.rs +++ b/crates/torii/server/src/cli.rs @@ -1,14 +1,7 @@ -use std::env; use std::net::SocketAddr; -use std::str::FromStr; use std::sync::Arc; -use anyhow::{anyhow, Context}; -use camino::Utf8PathBuf; use clap::Parser; -use dojo_world::manifest::Manifest; -use dojo_world::metadata::{dojo_metadata_from_workspace, Environment}; -use scarb::core::Config; use sqlx::sqlite::SqlitePoolOptions; use starknet::core::types::FieldElement; use starknet::providers::jsonrpc::HttpTransport; @@ -32,23 +25,20 @@ mod server; struct Args { /// The world to index #[arg(short, long = "world", env = "DOJO_WORLD_ADDRESS")] - world_address: Option, + world_address: FieldElement, /// The rpc endpoint to use #[arg(long, default_value = "http://localhost:5050")] rpc: String, /// Database url #[arg(short, long, default_value = "sqlite::memory:")] database_url: String, - /// Specify a local manifest to intiailize from - #[arg(short, long, env = "DOJO_MANIFEST_FILE")] - manifest: Option, /// Specify a block to start indexing from, ignored if stored head exists #[arg(short, long, default_value = "0")] start_block: u64, - /// Host address for GraphQL/gRPC endpoints + /// Host address for api endpoints #[arg(long, default_value = "0.0.0.0")] host: String, - /// Port number for GraphQL/gRPC endpoints + /// Port number for api endpoints #[arg(long, default_value = "8080")] port: u16, } @@ -80,15 +70,10 @@ async fn main() -> anyhow::Result<()> { let provider: Arc<_> = JsonRpcClient::new(HttpTransport::new(Url::parse(&args.rpc)?)).into(); - let (manifest, env) = get_manifest_and_env(args.manifest.as_ref()) - .with_context(|| "Failed to get manifest file".to_string())?; - // Get world address - let world_address = get_world_address(&args, &manifest, env.as_ref())?; - let world = WorldContractReader::new(world_address, &provider); + let world = WorldContractReader::new(args.world_address, &provider); - let mut db = Sql::new(pool.clone(), world_address).await?; - db.load_from_manifest(manifest.clone()).await?; + let mut db = Sql::new(pool.clone(), args.world_address).await?; let processors = Processors { event: vec![ Box::new(RegisterModelProcessor), @@ -119,7 +104,7 @@ async fn main() -> anyhow::Result<()> { } } - res = server::spawn_server(&addr, &pool, world_address, block_receiver, Arc::clone(&provider)) => { + res = server::spawn_server(&addr, &pool, args.world_address, block_receiver, Arc::clone(&provider)) => { if let Err(e) = res { error!("Server failed with error: {e}"); } @@ -132,69 +117,3 @@ async fn main() -> anyhow::Result<()> { Ok(()) } - -// Tries to find scarb manifest first for env variables -// -// Use manifest path from cli args, -// else uses scarb manifest to derive path of dojo manifest file, -// else try to derive manifest path from scarb manifest -// else try `./target/dev/manifest.json` as dojo manifest path -// -// If neither of this work return an error and exit -fn get_manifest_and_env( - args_path: Option<&Utf8PathBuf>, -) -> anyhow::Result<(Manifest, Option)> { - let config; - let ws = if let Ok(scarb_manifest_path) = scarb::ops::find_manifest_path(None) { - config = Config::builder(scarb_manifest_path) - .log_filter_directive(env::var_os("SCARB_LOG")) - .build() - .with_context(|| "Couldn't build scarb config".to_string())?; - scarb::ops::read_workspace(config.manifest_path(), &config).ok() - } else { - None - }; - - let manifest = if let Some(manifest_path) = args_path { - Manifest::load_from_path(manifest_path)? - } else if let Some(ref ws) = ws { - let target_dir = ws.target_dir().path_existent()?; - let target_dir = target_dir.join(ws.config().profile().as_str()); - let manifest_path = target_dir.join("manifest.json"); - Manifest::load_from_path(manifest_path)? - } else { - return Err(anyhow!( - "Cannot find Scarb manifest file. Either run this command from within a Scarb project \ - or specify it using `--manifest` argument" - )); - }; - let env = if let Some(ws) = ws { - dojo_metadata_from_workspace(&ws).and_then(|inner| inner.env().cloned()) - } else { - None - }; - Ok((manifest, env)) -} - -fn get_world_address( - args: &Args, - manifest: &Manifest, - env_metadata: Option<&Environment>, -) -> anyhow::Result { - if let Some(address) = args.world_address { - return Ok(address); - } - - if let Some(world_address) = env_metadata.and_then(|env| env.world_address()) { - return Ok(FieldElement::from_str(world_address)?); - } - - if let Some(address) = manifest.world.address { - Ok(address) - } else { - Err(anyhow!( - "Could not find World address. Please specify it with --world, or in manifest.json or \ - [tool.dojo.env] in Scarb.toml" - )) - } -}