diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 29e222e26e..6634b964e2 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -138,6 +138,9 @@ async fn main() -> anyhow::Result<()> { if args.events.raw { flags.insert(IndexingFlags::RAW_EVENTS); } + if args.indexing.pending { + flags.insert(IndexingFlags::PENDING_BLOCKS); + } let mut engine: Engine>> = Engine::new( world, @@ -146,10 +149,8 @@ async fn main() -> anyhow::Result<()> { processors, EngineConfig { max_concurrent_tasks: args.indexing.max_concurrent_tasks, - start_block: 0, blocks_chunk_size: args.indexing.blocks_chunk_size, events_chunk_size: args.indexing.events_chunk_size, - index_pending: args.indexing.pending, polling_interval: Duration::from_millis(args.indexing.polling_interval), flags, event_processor_config: EventProcessorConfig { diff --git a/crates/torii/cli/src/args.rs b/crates/torii/cli/src/args.rs index 13c43655b0..8b8a73175b 100644 --- a/crates/torii/cli/src/args.rs +++ b/crates/torii/cli/src/args.rs @@ -257,7 +257,7 @@ mod test { db_dir = "/tmp/torii-test" [events] - raw = false + raw = true historical = [ "ns-E", "ns-EH" @@ -291,7 +291,7 @@ mod test { assert_eq!(torii_args.world_address, Some(Felt::from_str("0x1234").unwrap())); assert_eq!(torii_args.rpc, Url::parse("http://0.0.0.0:2222").unwrap()); assert_eq!(torii_args.db_dir, Some(PathBuf::from("/tmp/torii-test"))); - assert!(!torii_args.events.raw); + assert!(torii_args.events.raw); assert_eq!(torii_args.events.historical, vec!["ns-E".to_string(), "ns-EH".to_string()]); assert_eq!(torii_args.indexing.events_chunk_size, 9999); assert_eq!(torii_args.indexing.blocks_chunk_size, 10240); diff --git a/crates/torii/cli/src/options.rs b/crates/torii/cli/src/options.rs index 008478b39a..48fc62915d 100644 --- a/crates/torii/cli/src/options.rs +++ b/crates/torii/cli/src/options.rs @@ -210,11 +210,11 @@ impl IndexingOptions { } } -#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq, Default)] #[command(next_help_heading = "Events indexing options")] pub struct EventsOptions { /// Whether or not to index raw events - #[arg(long = "events.raw", action = ArgAction::Set, default_value_t = true, help = "Whether or not to index raw events.")] + #[arg(long = "events.raw", action = ArgAction::Set, default_value_t = false, help = "Whether or not to index raw events.")] #[serde(default)] pub raw: bool, @@ -229,12 +229,6 @@ pub struct EventsOptions { pub historical: Vec, } -impl Default for EventsOptions { - fn default() -> Self { - Self { raw: true, historical: vec![] } - } -} - #[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)] #[command(next_help_heading = "HTTP server options")] pub struct ServerOptions { diff --git a/crates/torii/core/src/constants.rs b/crates/torii/core/src/constants.rs index 8248b09f7d..ce46d77d31 100644 --- a/crates/torii/core/src/constants.rs +++ b/crates/torii/core/src/constants.rs @@ -1,3 +1,16 @@ pub const TOKEN_BALANCE_TABLE: &str = "token_balances"; pub const TOKEN_TRANSFER_TABLE: &str = "token_transfers"; pub const TOKENS_TABLE: &str = "tokens"; + +pub(crate) const LOG_TARGET: &str = "torii_core::engine"; +pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000; + +pub const IPFS_URL: &str = "https://ipfs.io/ipfs/"; +pub const IPFS_CLIENT_MAX_RETRY: u8 = 3; + +pub const IPFS_CLIENT_URL: &str = "https://ipfs.infura.io:5001"; +pub const IPFS_CLIENT_USERNAME: &str = "2EBrzr7ZASQZKH32sl2xWauXPSA"; +pub const IPFS_CLIENT_PASSWORD: &str = "12290b883db9138a8ae3363b6739d220"; + +pub const WORLD_CONTRACT_TYPE: &str = "WORLD"; +pub const SQL_FELT_DELIMITER: &str = "/"; diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 0d91e95ed3..86730665f3 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -24,6 +24,7 @@ use tokio::task::JoinSet; use tokio::time::{sleep, Instant}; use tracing::{debug, error, info, trace, warn}; +use crate::constants::LOG_TARGET; use crate::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor; use crate::processors::erc20_transfer::Erc20TransferProcessor; use crate::processors::erc721_legacy_transfer::Erc721LegacyTransferProcessor; @@ -128,24 +129,21 @@ impl Processors

{ self.event_processors.get(&contract_type).unwrap() } } -pub(crate) const LOG_TARGET: &str = "torii_core::engine"; -pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000; bitflags! { #[derive(Debug, Clone)] pub struct IndexingFlags: u32 { const TRANSACTIONS = 0b00000001; const RAW_EVENTS = 0b00000010; + const PENDING_BLOCKS = 0b00000100; } } #[derive(Debug)] pub struct EngineConfig { pub polling_interval: Duration, - pub start_block: u64, pub blocks_chunk_size: u64, pub events_chunk_size: u64, - pub index_pending: bool, pub max_concurrent_tasks: usize, pub flags: IndexingFlags, pub event_processor_config: EventProcessorConfig, @@ -155,10 +153,8 @@ impl Default for EngineConfig { fn default() -> Self { Self { polling_interval: Duration::from_millis(500), - start_block: 0, blocks_chunk_size: 10240, events_chunk_size: 1024, - index_pending: true, max_concurrent_tasks: 100, flags: IndexingFlags::empty(), event_processor_config: EventProcessorConfig::default(), @@ -245,14 +241,6 @@ impl Engine

{ } pub async fn start(&mut self) -> Result<()> { - // use the start block provided by user if head is 0 - let (head, _, _) = self.db.head(self.world.address).await?; - if head == 0 { - self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?; - } else if self.config.start_block != 0 { - warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); - } - let mut backoff_delay = Duration::from_secs(1); let max_backoff_delay = Duration::from_secs(60); @@ -324,7 +312,7 @@ impl Engine

{ let data = self.fetch_range(from, to, &cursors.cursor_map).await?; debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range."); FetchDataResult::Range(data) - } else if self.config.index_pending { + } else if self.config.flags.contains(IndexingFlags::PENDING_BLOCKS) { let data = self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?; debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data."); diff --git a/crates/torii/core/src/executor/erc.rs b/crates/torii/core/src/executor/erc.rs index eb4a3c19bc..afa39c89c0 100644 --- a/crates/torii/core/src/executor/erc.rs +++ b/crates/torii/core/src/executor/erc.rs @@ -13,11 +13,10 @@ use starknet_crypto::Felt; use tracing::{debug, trace}; use super::{ApplyBalanceDiffQuery, Executor}; -use crate::constants::TOKEN_BALANCE_TABLE; +use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE}; use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256}; -use crate::sql::FELT_DELIMITER; use crate::types::ContractType; -use crate::utils::{fetch_content_from_ipfs, MAX_RETRY}; +use crate::utils::fetch_content_from_ipfs; #[derive(Debug, Clone)] pub struct RegisterErc721TokenQuery { @@ -50,7 +49,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { ) -> Result<()> { let erc_cache = apply_balance_diff.erc_cache; for ((contract_type, id_str), balance) in erc_cache.iter() { - let id = id_str.split(FELT_DELIMITER).collect::>(); + let id = id_str.split(SQL_FELT_DELIMITER).collect::>(); match contract_type { ContractType::WORLD => unreachable!(), ContractType::ERC721 => { @@ -228,7 +227,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { uri if uri.starts_with("ipfs") => { let cid = uri.strip_prefix("ipfs://").unwrap(); debug!(cid = %cid, "Fetching metadata from IPFS"); - let response = fetch_content_from_ipfs(cid, MAX_RETRY) + let response = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY) .await .context("Failed to fetch metadata from IPFS")?; diff --git a/crates/torii/core/src/model.rs b/crates/torii/core/src/model.rs index e2d06c10c8..2266993e9b 100644 --- a/crates/torii/core/src/model.rs +++ b/crates/torii/core/src/model.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use async_trait::async_trait; use crypto_bigint::U256; +use dojo_types::naming::get_tag; use dojo_types::primitive::Primitive; use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty}; use dojo_world::contracts::abigen::model::Layout; @@ -210,10 +211,10 @@ pub fn parse_sql_model_members( } Ty::Struct(Struct { - name: format!("{}-{}", namespace, model), + name: get_tag(namespace, model), children: model_members_all .iter() - .filter(|m| m.id == format!("{}-{}", namespace, model)) + .filter(|m| m.id == get_tag(namespace, model)) .map(|m| Member { key: m.key, name: m.name.to_owned(), diff --git a/crates/torii/core/src/processors/event_message.rs b/crates/torii/core/src/processors/event_message.rs index 2fbb982284..9d36368d3b 100644 --- a/crates/torii/core/src/processors/event_message.rs +++ b/crates/torii/core/src/processors/event_message.rs @@ -1,6 +1,7 @@ use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; +use dojo_world::contracts::naming::get_tag; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, Felt}; use starknet::providers::Provider; @@ -65,16 +66,12 @@ where "Store event message." ); - // TODO: check historical and keep the internal counter. - let mut keys_and_unpacked = [event.keys, event.values].concat(); let mut entity = model.schema.clone(); entity.deserialize(&mut keys_and_unpacked)?; - // TODO: this must come from some torii's configuration. - let historical = - config.historical_events.contains(&format!("{}-{}", model.namespace, model.name)); + let historical = config.is_historical(&get_tag(&model.namespace, &model.name)); db.set_event_message(entity, event_id, block_timestamp, historical).await?; Ok(()) } diff --git a/crates/torii/core/src/processors/metadata_update.rs b/crates/torii/core/src/processors/metadata_update.rs index 00cedd800a..f9cff569f8 100644 --- a/crates/torii/core/src/processors/metadata_update.rs +++ b/crates/torii/core/src/processors/metadata_update.rs @@ -12,8 +12,9 @@ use starknet::providers::Provider; use tracing::{error, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::constants::IPFS_CLIENT_MAX_RETRY; use crate::sql::Sql; -use crate::utils::{fetch_content_from_ipfs, MAX_RETRY}; +use crate::utils::fetch_content_from_ipfs; pub(crate) const LOG_TARGET: &str = "torii_core::processors::metadata_update"; @@ -106,7 +107,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option, Opt let uri = Uri::Ipfs(uri_str); let cid = uri.cid().ok_or("Uri is malformed").map_err(Error::msg)?; - let bytes = fetch_content_from_ipfs(cid, MAX_RETRY).await?; + let bytes = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY).await?; let metadata: WorldMetadata = serde_json::from_str(std::str::from_utf8(&bytes)?)?; let icon_img = fetch_image(&metadata.icon_uri).await; @@ -117,7 +118,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option, Opt async fn fetch_image(image_uri: &Option) -> Option { if let Some(uri) = image_uri { - let data = fetch_content_from_ipfs(uri.cid()?, MAX_RETRY).await.ok()?; + let data = fetch_content_from_ipfs(uri.cid()?, IPFS_CLIENT_MAX_RETRY).await.ok()?; let encoded = general_purpose::STANDARD.encode(data); return Some(encoded); } diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index 377d853f4d..4d4c5c637d 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -34,6 +34,16 @@ pub struct EventProcessorConfig { pub namespaces: HashSet, } +impl EventProcessorConfig { + pub fn is_historical(&self, tag: &str) -> bool { + self.historical_events.contains(tag) + } + + pub fn should_index(&self, namespace: &str) -> bool { + self.namespaces.is_empty() || self.namespaces.contains(namespace) + } +} + #[async_trait] pub trait EventProcessor

: Send + Sync where diff --git a/crates/torii/core/src/processors/register_event.rs b/crates/torii/core/src/processors/register_event.rs index 3ed132e866..be9919200c 100644 --- a/crates/torii/core/src/processors/register_event.rs +++ b/crates/torii/core/src/processors/register_event.rs @@ -60,7 +60,7 @@ where // If the namespace is not in the list of namespaces to index, silently ignore it. // If our config is empty, we index all namespaces. - if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) { + if !config.should_index(&namespace) { return Ok(()); } diff --git a/crates/torii/core/src/processors/register_model.rs b/crates/torii/core/src/processors/register_model.rs index 14e7a559d9..ed0f710f2e 100644 --- a/crates/torii/core/src/processors/register_model.rs +++ b/crates/torii/core/src/processors/register_model.rs @@ -60,7 +60,7 @@ where // If the namespace is not in the list of namespaces to index, silently ignore it. // If our config is empty, we index all namespaces. - if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) { + if !config.should_index(&namespace) { return Ok(()); } diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs index cef58f281a..cf94e005f0 100644 --- a/crates/torii/core/src/sql/erc.rs +++ b/crates/torii/core/src/sql/erc.rs @@ -8,7 +8,7 @@ use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; use starknet::providers::Provider; use super::utils::{u256_to_sql_string, I256}; -use super::{Sql, FELT_DELIMITER}; +use super::{Sql, SQL_FELT_DELIMITER}; use crate::constants::TOKEN_TRANSFER_TABLE; use crate::executor::{ ApplyBalanceDiffQuery, Argument, QueryMessage, QueryType, RegisterErc20TokenQuery, @@ -108,7 +108,7 @@ impl Sql { // from_address/contract_address:id if from_address != Felt::ZERO { let from_balance_id = - format!("{}{FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id); + format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id); let from_balance = self .local_cache .erc_cache @@ -119,7 +119,7 @@ impl Sql { if to_address != Felt::ZERO { let to_balance_id = - format!("{}{FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); + format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); let to_balance = self .local_cache .erc_cache diff --git a/crates/torii/core/src/sql/mod.rs b/crates/torii/core/src/sql/mod.rs index 9c61405d99..5f7a8f5093 100644 --- a/crates/torii/core/src/sql/mod.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -4,6 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; +use dojo_types::naming::get_tag; use dojo_types::primitive::Primitive; use dojo_types::schema::{EnumOption, Member, Struct, Ty}; use dojo_world::config::WorldMetadata; @@ -16,6 +17,7 @@ use starknet_crypto::poseidon_hash_many; use tokio::sync::mpsc::UnboundedSender; use utils::felts_to_sql_string; +use crate::constants::SQL_FELT_DELIMITER; use crate::executor::{ Argument, DeleteEntityQuery, EventMessageQuery, QueryMessage, QueryType, ResetCursorsQuery, SetHeadQuery, UpdateCursorsQuery, @@ -26,12 +28,8 @@ use crate::utils::utc_dt_string_from_timestamp; type IsEventMessage = bool; type IsStoreUpdate = bool; -pub const WORLD_CONTRACT_TYPE: &str = "WORLD"; -pub const FELT_DELIMITER: &str = "/"; - pub mod cache; pub mod erc; -pub mod query_queue; #[cfg(test)] #[path = "test.rs"] mod test; @@ -260,7 +258,7 @@ impl Sql { upgrade_diff: Option<&Ty>, ) -> Result<()> { let selector = compute_selector_from_names(namespace, &model.name()); - let namespaced_name = format!("{}-{}", namespace, model.name()); + let namespaced_name = get_tag(namespace, &model.name()); let insert_models = "INSERT INTO models (id, namespace, name, class_hash, contract_address, layout, \ @@ -751,7 +749,7 @@ impl Sql { std::iter::once(entity_id.to_string()) .chain(indexes.iter().map(|i| i.to_string())) .collect::>() - .join(FELT_DELIMITER), + .join(SQL_FELT_DELIMITER), )); } diff --git a/crates/torii/core/src/sql/query_queue.rs b/crates/torii/core/src/sql/query_queue.rs deleted file mode 100644 index 774f8fb6dd..0000000000 --- a/crates/torii/core/src/sql/query_queue.rs +++ /dev/null @@ -1,188 +0,0 @@ -use std::collections::VecDeque; - -use anyhow::{Context, Result}; -use dojo_types::schema::{Struct, Ty}; -use sqlx::{FromRow, Pool, Sqlite}; -use starknet::core::types::Felt; - -use super::utils::felt_to_sql_string; -use crate::simple_broker::SimpleBroker; -use crate::types::{ - Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, - Model as ModelRegistered, -}; - -#[derive(Debug, Clone)] -pub enum Argument { - Null, - Int(i64), - Bool(bool), - String(String), - FieldElement(Felt), -} - -#[derive(Debug, Clone)] -pub enum BrokerMessage { - ModelRegistered(ModelRegistered), - EntityUpdated(EntityUpdated), - EventMessageUpdated(EventMessageUpdated), - EventEmitted(EventEmitted), -} - -#[derive(Debug, Clone)] -pub struct QueryQueue { - pool: Pool, - pub queue: VecDeque<(String, Vec, QueryType)>, -} - -#[derive(Debug, Clone)] -pub struct DeleteEntityQuery { - pub entity_id: String, - pub event_id: String, - pub block_timestamp: String, - pub ty: Ty, -} - -#[derive(Debug, Clone)] -pub enum QueryType { - SetEntity(Ty), - DeleteEntity(DeleteEntityQuery), - EventMessage(Ty), - RegisterModel, - StoreEvent, - Other, -} - -impl QueryQueue { - pub fn new(pool: Pool) -> Self { - QueryQueue { pool, queue: VecDeque::new() } - } - - pub fn enqueue>( - &mut self, - statement: S, - arguments: Vec, - query_type: QueryType, - ) { - self.queue.push_back((statement.into(), arguments, query_type)); - } - - pub async fn execute_all(&mut self) -> Result<()> { - let mut tx = self.pool.begin().await?; - // publishes that are related to queries in the queue, they should be sent - // after the queries are executed - let mut publish_queue = VecDeque::new(); - - while let Some((statement, arguments, query_type)) = self.queue.pop_front() { - let mut query = sqlx::query(&statement); - - for arg in &arguments { - query = match arg { - Argument::Null => query.bind(None::), - Argument::Int(integer) => query.bind(integer), - Argument::Bool(bool) => query.bind(bool), - Argument::String(string) => query.bind(string), - Argument::FieldElement(felt) => query.bind(felt_to_sql_string(felt)), - } - } - - match query_type { - QueryType::SetEntity(entity) => { - let row = query.fetch_one(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - let mut entity_updated = EntityUpdated::from_row(&row)?; - entity_updated.updated_model = Some(entity); - entity_updated.deleted = false; - let broker_message = BrokerMessage::EntityUpdated(entity_updated); - publish_queue.push_back(broker_message); - } - QueryType::DeleteEntity(entity) => { - let delete_model = query.execute(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - if delete_model.rows_affected() == 0 { - continue; - } - - let row = sqlx::query( - "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \ - event_id=? WHERE id = ? RETURNING *", - ) - .bind(entity.block_timestamp) - .bind(entity.event_id) - .bind(entity.entity_id) - .fetch_one(&mut *tx) - .await?; - let mut entity_updated = EntityUpdated::from_row(&row)?; - entity_updated.updated_model = - Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] })); - - let count = sqlx::query_scalar::<_, i64>( - "SELECT count(*) FROM entity_model WHERE entity_id = ?", - ) - .bind(entity_updated.id.clone()) - .fetch_one(&mut *tx) - .await?; - - // Delete entity if all of its models are deleted - if count == 0 { - sqlx::query("DELETE FROM entities WHERE id = ?") - .bind(entity_updated.id.clone()) - .execute(&mut *tx) - .await?; - entity_updated.deleted = true; - } - - let broker_message = BrokerMessage::EntityUpdated(entity_updated); - publish_queue.push_back(broker_message); - } - QueryType::RegisterModel => { - let row = query.fetch_one(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - let model_registered = ModelRegistered::from_row(&row)?; - publish_queue.push_back(BrokerMessage::ModelRegistered(model_registered)); - } - QueryType::EventMessage(entity) => { - let row = query.fetch_one(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - let mut event_message = EventMessageUpdated::from_row(&row)?; - event_message.updated_model = Some(entity); - let broker_message = BrokerMessage::EventMessageUpdated(event_message); - publish_queue.push_back(broker_message); - } - QueryType::StoreEvent => { - let row = query.fetch_one(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - let event = EventEmitted::from_row(&row)?; - publish_queue.push_back(BrokerMessage::EventEmitted(event)); - } - QueryType::Other => { - query.execute(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - } - } - } - - tx.commit().await?; - - while let Some(message) = publish_queue.pop_front() { - send_broker_message(message); - } - - Ok(()) - } -} - -fn send_broker_message(message: BrokerMessage) { - match message { - BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), - BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), - BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), - BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), - } -} diff --git a/crates/torii/core/src/sql/utils.rs b/crates/torii/core/src/sql/utils.rs index 9b7d2dad33..d5a6c5cd75 100644 --- a/crates/torii/core/src/sql/utils.rs +++ b/crates/torii/core/src/sql/utils.rs @@ -5,11 +5,11 @@ use std::str::FromStr; use starknet::core::types::U256; use starknet_crypto::Felt; -use super::FELT_DELIMITER; +use crate::constants::SQL_FELT_DELIMITER; pub fn felts_to_sql_string(felts: &[Felt]) -> String { - felts.iter().map(|k| format!("{:#x}", k)).collect::>().join(FELT_DELIMITER) - + FELT_DELIMITER + felts.iter().map(|k| format!("{:#x}", k)).collect::>().join(SQL_FELT_DELIMITER) + + SQL_FELT_DELIMITER } pub fn felt_to_sql_string(felt: &Felt) -> String { @@ -30,7 +30,7 @@ pub fn sql_string_to_u256(sql_string: &str) -> U256 { } pub fn sql_string_to_felts(sql_string: &str) -> Vec { - sql_string.split(FELT_DELIMITER).map(|felt| Felt::from_str(felt).unwrap()).collect() + sql_string.split(SQL_FELT_DELIMITER).map(|felt| Felt::from_str(felt).unwrap()).collect() } // type used to do calculation on inmemory balances diff --git a/crates/torii/core/src/utils.rs b/crates/torii/core/src/utils.rs index 2556caa8f0..516e739e8a 100644 --- a/crates/torii/core/src/utils.rs +++ b/crates/torii/core/src/utils.rs @@ -7,12 +7,9 @@ use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; use tokio_util::bytes::Bytes; use tracing::info; -pub const IPFS_URL: &str = "https://ipfs.io/ipfs/"; -pub const MAX_RETRY: u8 = 3; - -pub const IPFS_CLIENT_URL: &str = "https://ipfs.infura.io:5001"; -pub const IPFS_USERNAME: &str = "2EBrzr7ZASQZKH32sl2xWauXPSA"; -pub const IPFS_PASSWORD: &str = "12290b883db9138a8ae3363b6739d220"; +use crate::constants::{ + IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, +}; pub fn must_utc_datetime_from_timestamp(timestamp: u64) -> DateTime { let naive_dt = DateTime::from_timestamp(timestamp as i64, 0) @@ -25,8 +22,8 @@ pub fn utc_dt_string_from_timestamp(timestamp: u64) -> String { } pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result { - let client = - IpfsClient::from_str(IPFS_CLIENT_URL)?.with_credentials(IPFS_USERNAME, IPFS_PASSWORD); + let client = IpfsClient::from_str(IPFS_CLIENT_URL)? + .with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD); while retries > 0 { let response = client.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await; match response { @@ -46,7 +43,7 @@ pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result Field { let data: ValueMapping = match model_data_recursive_query( &mut conn, ENTITY_ID_COLUMN, - vec![format!("{namespace}-{name}")], + vec![get_tag(&namespace, &name)], &entity_id, &[], &type_mapping, diff --git a/crates/torii/graphql/src/object/event.rs b/crates/torii/graphql/src/object/event.rs index e171e31227..e553f4be32 100644 --- a/crates/torii/graphql/src/object/event.rs +++ b/crates/torii/graphql/src/object/event.rs @@ -3,8 +3,8 @@ use async_graphql::dynamic::{ }; use async_graphql::{Name, Result, Value}; use tokio_stream::{Stream, StreamExt}; +use torii_core::constants::SQL_FELT_DELIMITER; use torii_core::simple_broker::SimpleBroker; -use torii_core::sql::FELT_DELIMITER; use torii_core::types::Event; use super::inputs::keys_input::{keys_argument, parse_keys_argument}; @@ -102,7 +102,7 @@ impl EventObject { // if all keys match or if a wildcard is present at the respective position. pub fn match_keys(input_keys: &[String], event: &Event) -> bool { let event_keys: Vec<&str> = - event.keys.split(FELT_DELIMITER).filter(|s| !s.is_empty()).collect(); + event.keys.split(SQL_FELT_DELIMITER).filter(|s| !s.is_empty()).collect(); if input_keys.len() > event_keys.len() { return false; diff --git a/crates/torii/graphql/src/object/event_message.rs b/crates/torii/graphql/src/object/event_message.rs index 0e2cec609e..2173140bd2 100644 --- a/crates/torii/graphql/src/object/event_message.rs +++ b/crates/torii/graphql/src/object/event_message.rs @@ -3,6 +3,7 @@ use async_graphql::dynamic::{ Field, FieldFuture, FieldValue, InputValue, SubscriptionField, SubscriptionFieldFuture, TypeRef, }; use async_graphql::{Name, Value}; +use dojo_types::naming::get_tag; use sqlx::{Pool, Sqlite}; use tokio_stream::StreamExt; use torii_core::simple_broker::SimpleBroker; @@ -19,7 +20,6 @@ use crate::mapping::ENTITY_TYPE_MAPPING; use crate::object::{resolve_many, resolve_one}; use crate::query::type_mapping_query; use crate::utils; - #[derive(Debug)] pub struct EventMessageObject; @@ -151,7 +151,7 @@ fn model_union_field() -> Field { let data: ValueMapping = match model_data_recursive_query( &mut conn, EVENT_MESSAGE_ID_COLUMN, - vec![format!("{namespace}-{name}")], + vec![get_tag(&namespace, &name)], &entity_id, &[], &type_mapping, diff --git a/crates/torii/graphql/src/object/model_data.rs b/crates/torii/graphql/src/object/model_data.rs index ca54ee5584..689226b55c 100644 --- a/crates/torii/graphql/src/object/model_data.rs +++ b/crates/torii/graphql/src/object/model_data.rs @@ -1,6 +1,7 @@ use async_graphql::dynamic::{Enum, Field, FieldFuture, InputObject, Object, TypeRef}; use async_graphql::Value; use chrono::{DateTime, Utc}; +use dojo_types::naming::get_tag; use serde::Deserialize; use sqlx::{FromRow, Pool, Sqlite}; @@ -69,7 +70,7 @@ impl BasicObject for ModelDataObject { let mut parts = self.type_name().split('_').collect::>(); let model = parts.pop().unwrap(); let namespace = parts.join("_"); - let type_name = utils::struct_name_from_names(&namespace, model); + let type_name = get_tag(&namespace, model); let mut objects = data_objects_recursion( &TypeData::Nested((TypeRef::named(self.type_name()), self.type_mapping.clone())), &vec![type_name], @@ -106,7 +107,7 @@ impl ResolvableObject for ModelDataObject { let mut parts = type_name.split('_').collect::>(); let model = parts.pop().unwrap(); let namespace = parts.join("_"); - let type_name = utils::struct_name_from_names(&namespace, model); + let table_name = get_tag(&namespace, model); FieldFuture::new(async move { let mut conn = ctx.data::>()?.acquire().await?; @@ -114,10 +115,10 @@ impl ResolvableObject for ModelDataObject { let filters = parse_where_argument(&ctx, &where_mapping)?; let connection = parse_connection_arguments(&ctx)?; - let total_count = count_rows(&mut conn, &type_name, &None, &filters).await?; + let total_count = count_rows(&mut conn, &table_name, &None, &filters).await?; let (data, page_info) = fetch_multiple_rows( &mut conn, - &type_name, + &table_name, EVENT_ID_COLUMN, &None, &order, diff --git a/crates/torii/graphql/src/query/data.rs b/crates/torii/graphql/src/query/data.rs index 0b34b2af15..5cf1fb5c76 100644 --- a/crates/torii/graphql/src/query/data.rs +++ b/crates/torii/graphql/src/query/data.rs @@ -1,7 +1,7 @@ use async_graphql::connection::PageInfo; use sqlx::sqlite::SqliteRow; use sqlx::{Result, Row, SqliteConnection}; -use torii_core::sql::WORLD_CONTRACT_TYPE; +use torii_core::constants::WORLD_CONTRACT_TYPE; use super::filter::{Filter, FilterValue}; use super::order::{CursorDirection, Direction, Order}; diff --git a/crates/torii/graphql/src/query/mod.rs b/crates/torii/graphql/src/query/mod.rs index caa6c5e948..6586b150c9 100644 --- a/crates/torii/graphql/src/query/mod.rs +++ b/crates/torii/graphql/src/query/mod.rs @@ -8,7 +8,7 @@ use dojo_types::primitive::{Primitive, SqlType}; use regex::Regex; use sqlx::sqlite::SqliteRow; use sqlx::{Row, SqliteConnection}; -use torii_core::sql::FELT_DELIMITER; +use torii_core::constants::SQL_FELT_DELIMITER; use crate::constants::{ BOOLEAN_TRUE, ENTITY_ID_COLUMN, EVENT_MESSAGE_ID_COLUMN, INTERNAL_ENTITY_ID_KEY, @@ -185,7 +185,7 @@ pub fn value_mapping_from_row( // handles felt arrays stored as string (ex: keys) if let (TypeRef::List(_), Value::String(s)) = (&type_data.type_ref(), &value) { - let mut felts: Vec<_> = s.split(FELT_DELIMITER).map(Value::from).collect(); + let mut felts: Vec<_> = s.split(SQL_FELT_DELIMITER).map(Value::from).collect(); felts.pop(); // removes empty item value = Value::List(felts); } diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index bc054f059c..ca332a4cf6 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -5,6 +5,7 @@ mod tests { use std::time::Duration; use async_graphql::value; + use dojo_types::naming::get_tag; use dojo_types::primitive::Primitive; use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty}; use dojo_world::contracts::abigen::model::Layout; @@ -81,7 +82,7 @@ mod tests { // 1. Open process and sleep.Go to execute subscription tokio::time::sleep(Duration::from_secs(1)).await; let ty = Ty::Struct(Struct { - name: utils::struct_name_from_names(&namespace, &model_name), + name: get_tag(&namespace, &model_name), children: vec![ Member { name: "depth".to_string(), @@ -233,7 +234,7 @@ mod tests { // 1. Open process and sleep.Go to execute subscription tokio::time::sleep(Duration::from_secs(1)).await; let ty = Ty::Struct(Struct { - name: utils::struct_name_from_names(&namespace, &model_name), + name: get_tag(&namespace, &model_name), children: vec![ Member { name: "depth".to_string(), diff --git a/crates/torii/graphql/src/utils.rs b/crates/torii/graphql/src/utils.rs index 949e3b9711..6ca48d8ad9 100644 --- a/crates/torii/graphql/src/utils.rs +++ b/crates/torii/graphql/src/utils.rs @@ -69,7 +69,3 @@ pub fn field_name_from_names(namespace: &str, model_name: &str) -> String { pub fn type_name_from_names(namespace: &str, model_name: &str) -> String { format!("{}_{}", namespace, model_name) } - -pub fn struct_name_from_names(namespace: &str, model_name: &str) -> String { - format!("{}-{}", namespace, model_name) -} diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index ec09301def..f8793b5109 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -13,9 +13,9 @@ use tokio::sync::mpsc::{ channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, }; use tokio::sync::RwLock; +use torii_core::constants::SQL_FELT_DELIMITER; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; -use torii_core::sql::FELT_DELIMITER; use torii_core::types::OptimisticEntity; use tracing::{error, trace}; @@ -116,8 +116,8 @@ impl Service { let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?; let keys = entity .keys - .trim_end_matches(FELT_DELIMITER) - .split(FELT_DELIMITER) + .trim_end_matches(SQL_FELT_DELIMITER) + .split(SQL_FELT_DELIMITER) .map(Felt::from_str) .collect::, _>>() .map_err(ParseError::FromStr)?; diff --git a/crates/torii/grpc/src/server/subscriptions/event.rs b/crates/torii/grpc/src/server/subscriptions/event.rs index 3d19c1fd1a..65a7cade52 100644 --- a/crates/torii/grpc/src/server/subscriptions/event.rs +++ b/crates/torii/grpc/src/server/subscriptions/event.rs @@ -13,9 +13,9 @@ use tokio::sync::mpsc::{ channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, }; use tokio::sync::RwLock; +use torii_core::constants::SQL_FELT_DELIMITER; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; -use torii_core::sql::FELT_DELIMITER; use torii_core::types::Event; use tracing::{error, trace}; @@ -95,15 +95,15 @@ impl Service { let mut closed_stream = Vec::new(); let keys = event .keys - .trim_end_matches(FELT_DELIMITER) - .split(FELT_DELIMITER) + .trim_end_matches(SQL_FELT_DELIMITER) + .split(SQL_FELT_DELIMITER) .map(Felt::from_str) .collect::, _>>() .map_err(ParseError::from)?; let data = event .data - .trim_end_matches(FELT_DELIMITER) - .split(FELT_DELIMITER) + .trim_end_matches(SQL_FELT_DELIMITER) + .split(SQL_FELT_DELIMITER) .map(Felt::from_str) .collect::, _>>() .map_err(ParseError::from)?; diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index 1fb578ed47..db1141cee8 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -13,9 +13,9 @@ use tokio::sync::mpsc::{ channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, }; use tokio::sync::RwLock; +use torii_core::constants::SQL_FELT_DELIMITER; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; -use torii_core::sql::FELT_DELIMITER; use torii_core::types::OptimisticEventMessage; use tracing::{error, trace}; @@ -128,8 +128,8 @@ impl Service { let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?; let keys = entity .keys - .trim_end_matches(FELT_DELIMITER) - .split(FELT_DELIMITER) + .trim_end_matches(SQL_FELT_DELIMITER) + .split(SQL_FELT_DELIMITER) .map(Felt::from_str) .collect::, _>>() .map_err(ParseError::FromStr)?; diff --git a/crates/torii/server/src/artifacts.rs b/crates/torii/server/src/artifacts.rs index f98a3e2108..c0d17687a5 100644 --- a/crates/torii/server/src/artifacts.rs +++ b/crates/torii/server/src/artifacts.rs @@ -15,8 +15,8 @@ use tokio::fs; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::broadcast::Receiver; -use torii_core::constants::TOKENS_TABLE; -use torii_core::utils::{fetch_content_from_ipfs, MAX_RETRY}; +use torii_core::constants::{IPFS_CLIENT_MAX_RETRY, TOKENS_TABLE}; +use torii_core::utils::fetch_content_from_ipfs; use tracing::{debug, error, trace}; use warp::http::Response; use warp::path::Tail; @@ -204,7 +204,7 @@ async fn fetch_and_process_image( uri if uri.starts_with("ipfs") => { debug!(image_uri = %uri, "Fetching image from IPFS"); let cid = uri.strip_prefix("ipfs://").unwrap(); - let response = fetch_content_from_ipfs(cid, MAX_RETRY) + let response = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY) .await .context("Failed to read image bytes from IPFS response")?;