diff --git a/Cargo.lock b/Cargo.lock index c8d6331ff7..6fe3bb40e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "alloy" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-contract 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -209,13 +209,14 @@ dependencies = [ [[package]] name = "alloy-consensus" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-eips 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-primitives", "alloy-rlp", "alloy-serde 0.3.6 (git+https://github.com/alloy-rs/alloy)", "c-kzg", + "derive_more 1.0.0", "serde", ] @@ -242,7 +243,7 @@ dependencies = [ [[package]] name = "alloy-contract" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -330,7 +331,7 @@ dependencies = [ [[package]] name = "alloy-eips" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-eip2930", "alloy-eip7702", @@ -347,7 +348,7 @@ dependencies = [ [[package]] name = "alloy-genesis" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "alloy-serde 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -383,7 +384,7 @@ dependencies = [ [[package]] name = "alloy-json-rpc" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -417,7 +418,7 @@ dependencies = [ [[package]] name = "alloy-network" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-eips 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -449,7 +450,7 @@ dependencies = [ [[package]] name = "alloy-network-primitives" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-eips 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-primitives", @@ -460,7 +461,7 @@ dependencies = [ [[package]] name = "alloy-node-bindings" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-genesis", "alloy-primitives", @@ -532,7 +533,7 @@ dependencies = [ [[package]] name = "alloy-provider" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-chains", "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -611,7 +612,7 @@ dependencies = [ [[package]] name = "alloy-rpc-client" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-json-rpc 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-transport 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -631,7 +632,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types-anvil" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "alloy-serde 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -662,7 +663,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-eips 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -693,7 +694,7 @@ dependencies = [ [[package]] name = "alloy-serde" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "serde", @@ -717,7 +718,7 @@ dependencies = [ [[package]] name = "alloy-signer" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "async-trait", @@ -730,7 +731,7 @@ dependencies = [ [[package]] name = "alloy-signer-local" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-network 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -837,7 +838,7 @@ dependencies = [ [[package]] name = "alloy-transport" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-json-rpc 0.3.6 (git+https://github.com/alloy-rs/alloy)", "base64 0.22.1", @@ -870,7 +871,7 @@ dependencies = [ [[package]] name = "alloy-transport-http" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-json-rpc 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-transport 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -12856,9 +12857,9 @@ dependencies = [ [[package]] name = "simdutf8" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" [[package]] name = "similar" diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 5feff3103a..ac9c510997 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -11,12 +11,14 @@ //! for more info. use std::cmp; +use std::collections::VecDeque; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use anyhow::Context; use clap::{ArgAction, Parser}; use dojo_metrics::{metrics_process, prometheus_exporter}; use dojo_utils::parse::{parse_socket_address, parse_url}; @@ -32,21 +34,10 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors}; -use torii_core::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor; -use torii_core::processors::erc20_transfer::Erc20TransferProcessor; -use torii_core::processors::erc721_transfer::Erc721TransferProcessor; -use torii_core::processors::event_message::EventMessageProcessor; -use torii_core::processors::generate_event_processors_map; -use torii_core::processors::metadata_update::MetadataUpdateProcessor; -use torii_core::processors::register_model::RegisterModelProcessor; -use torii_core::processors::store_del_record::StoreDelRecordProcessor; -use torii_core::processors::store_set_record::StoreSetRecordProcessor; use torii_core::processors::store_transaction::StoreTransactionProcessor; -use torii_core::processors::store_update_member::StoreUpdateMemberProcessor; -use torii_core::processors::store_update_record::StoreUpdateRecordProcessor; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::Sql; -use torii_core::types::{ErcContract, ErcType, Model, ToriiConfig}; +use torii_core::types::{Contract, ContractType, Model, ToriiConfig}; use torii_server::proxy::Proxy; use tracing::{error, info}; use tracing_subscriber::{fmt, EnvFilter}; @@ -60,7 +51,7 @@ pub(crate) const LOG_TARGET: &str = "torii::cli"; struct Args { /// The world to index #[arg(short, long = "world", env = "DOJO_WORLD_ADDRESS")] - world_address: Felt, + world_address: Option, /// The sequencer rpc endpoint to index. #[arg(long, value_name = "URL", default_value = ":5050", value_parser = parse_url)] @@ -71,10 +62,6 @@ struct Args { #[arg(short, long, default_value = ":memory:")] database: String, - /// Specify a block to start indexing from, ignored if stored head exists - #[arg(short, long, default_value = "0")] - start_block: u64, - /// Address to serve api endpoints at. #[arg(long, value_name = "SOCKET", default_value = "0.0.0.0:8080", value_parser = parse_socket_address)] addr: SocketAddr, @@ -148,7 +135,7 @@ struct Args { /// ERC contract addresses to index #[arg(long, value_parser = parse_erc_contracts)] #[arg(conflicts_with = "config")] - erc_contracts: Option>, + contracts: Option>, /// Configuration file #[arg(long)] @@ -159,23 +146,20 @@ struct Args { async fn main() -> anyhow::Result<()> { let args = Args::parse(); - let mut start_block = args.start_block; - let mut config = if let Some(path) = args.config { ToriiConfig::load_from_path(&path)? } else { - ToriiConfig::default() + let mut config = ToriiConfig::default(); + + if let Some(contracts) = args.contracts { + config.contracts = VecDeque::from(contracts); + } + + config }; - if let Some(erc_contracts) = args.erc_contracts { - config.erc_contracts = erc_contracts; - } + let world_address = verify_single_world_address(args.world_address, &mut config)?; - for address in &config.erc_contracts { - if address.start_block < start_block { - start_block = address.start_block; - } - } let filter_layer = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info,hyper_reverse_proxy=off")); @@ -214,29 +198,14 @@ async fn main() -> anyhow::Result<()> { let provider: Arc<_> = JsonRpcClient::new(HttpTransport::new(args.rpc)).into(); // Get world address - let world = WorldContractReader::new(args.world_address, provider.clone()); + let world = WorldContractReader::new(world_address, provider.clone()); - let erc_contracts = config - .erc_contracts - .iter() - .map(|contract| (contract.contract_address, contract.clone())) - .collect(); + let contracts = + config.contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(); - let db = Sql::new(pool.clone(), args.world_address, &erc_contracts).await?; + let db = Sql::new(pool.clone(), world_address, &contracts).await?; let processors = Processors { - event: generate_event_processors_map(vec![ - Box::new(RegisterModelProcessor), - Box::new(StoreSetRecordProcessor), - Box::new(MetadataUpdateProcessor), - Box::new(StoreDelRecordProcessor), - Box::new(EventMessageProcessor), - Box::new(StoreUpdateRecordProcessor), - Box::new(StoreUpdateMemberProcessor), - Box::new(Erc20LegacyTransferProcessor), - Box::new(Erc20TransferProcessor), - Box::new(Erc721TransferProcessor), - ])?, transaction: vec![Box::new(StoreTransactionProcessor)], ..Processors::default() }; @@ -258,7 +227,7 @@ async fn main() -> anyhow::Result<()> { processors, EngineConfig { max_concurrent_tasks: args.max_concurrent_tasks, - start_block: args.start_block, + start_block: 0, events_chunk_size: args.events_chunk_size, index_pending: args.index_pending, polling_interval: Duration::from_millis(args.polling_interval), @@ -266,18 +235,13 @@ async fn main() -> anyhow::Result<()> { }, shutdown_tx.clone(), Some(block_tx), - erc_contracts, + Arc::new(contracts), ); let shutdown_rx = shutdown_tx.subscribe(); - let (grpc_addr, grpc_server) = torii_grpc::server::new( - shutdown_rx, - &pool, - block_rx, - args.world_address, - Arc::clone(&provider), - ) - .await?; + let (grpc_addr, grpc_server) = + torii_grpc::server::new(shutdown_rx, &pool, block_rx, world_address, Arc::clone(&provider)) + .await?; let mut libp2p_relay_server = torii_relay::server::Relay::new( db, @@ -340,6 +304,26 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +// Verifies that the world address is defined at most once +// and returns the world address +fn verify_single_world_address( + world_address: Option, + config: &mut ToriiConfig, +) -> anyhow::Result { + let world_from_config = + config.contracts.iter().find(|c| c.r#type == ContractType::WORLD).map(|c| c.address); + + match (world_address, world_from_config) { + (Some(_), Some(_)) => Err(anyhow::anyhow!("World address specified multiple times")), + (Some(addr), _) => { + config.contracts.push_front(Contract { address: addr, r#type: ContractType::WORLD }); + Ok(addr) + } + (_, Some(addr)) => Ok(addr), + (None, None) => Err(anyhow::anyhow!("World address not specified")), + } +} + async fn spawn_rebuilding_graphql_server( shutdown_tx: Sender<()>, pool: Arc, @@ -367,24 +351,24 @@ async fn spawn_rebuilding_graphql_server( // Parses clap cli argument which is expected to be in the format: // - erc_type:address:start_block // - address:start_block (erc_type defaults to ERC20) -fn parse_erc_contracts(s: &str) -> anyhow::Result> { +fn parse_erc_contracts(s: &str) -> anyhow::Result> { let parts: Vec<&str> = s.split(',').collect(); let mut contracts = Vec::new(); for part in parts { match part.split(':').collect::>().as_slice() { - [r#type, address, start_block] => { - let contract_address = Felt::from_str(address).unwrap(); - let start_block = start_block.parse::()?; - let r#type = r#type.parse::()?; - contracts.push(ErcContract { contract_address, start_block, r#type }); + [r#type, address] => { + let r#type = r#type.parse::()?; + let address = Felt::from_str(address) + .with_context(|| format!("Expected address, found {}", address))?; + contracts.push(Contract { address, r#type }); } - [address, start_block] => { - let contract_address = Felt::from_str(address)?; - let start_block = start_block.parse::()?; - let r#type = ErcType::default(); - contracts.push(ErcContract { contract_address, start_block, r#type }); + [address] => { + let r#type = ContractType::WORLD; + let address = Felt::from_str(address) + .with_context(|| format!("Expected address, found {}", address))?; + contracts.push(Contract { address, r#type }); } - _ => return Err(anyhow::anyhow!("Invalid ERC contract format")), + _ => return Err(anyhow::anyhow!("Invalid contract format")), } } Ok(contracts) diff --git a/bin/torii/torii.toml b/bin/torii/torii.toml index 45305c0301..a45ecfe10e 100644 --- a/bin/torii/torii.toml +++ b/bin/torii/torii.toml @@ -1,8 +1,8 @@ # Example configuration file for Torii -# erc_contracts = [ -# { contract_address = "0x1234567890abcdef1234567890abcdef12345678", start_block = 0, type = "ERC20" }, -# { contract_address = "0xabcdef1234567890abcdef1234567890abcdef12", start_block = 1, type = "ERC721" }, -# ] -# erc_contracts = [ -# { type = "ERC20", contract_address = "0x07fc13cc1f43f0b0519f84df8bf13bea4d9fd5ce2d748c3baf27bf90a565f60a", start_block = 0 }, +# contracts = [ +# { type = "WORLD", address = "" }, +# { type = "ERC20", address = "" }, +# { type = "ERC20Legacy", address = "" }, +# { type = "ERC721", address = "" }, +# { type = "ERC721Legacy", address = "" }, # ] \ No newline at end of file diff --git a/crates/dojo-lang/src/manifest_test_data/compiler_cairo/Scarb.lock b/crates/dojo-lang/src/manifest_test_data/compiler_cairo/Scarb.lock index c6cfeb4783..16e168040a 100644 --- a/crates/dojo-lang/src/manifest_test_data/compiler_cairo/Scarb.lock +++ b/crates/dojo-lang/src/manifest_test_data/compiler_cairo/Scarb.lock @@ -11,11 +11,3 @@ dependencies = [ [[package]] name = "dojo" version = "1.0.0-alpha.4" -dependencies = [ - "dojo_plugin", -] - -[[package]] -name = "dojo_plugin" -version = "1.0.0-alpha.4" -source = "git+https://github.com/dojoengine/dojo?rev=f15def33#f15def330c0d099e79351d11c197f63e8cc1ff36" diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 2c25de0562..0a0476e6cb 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,4 +1,4 @@ -use std::collections::{btree_map, BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt::Debug; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; @@ -10,11 +10,13 @@ use dojo_world::contracts::world::WorldContractReader; use futures_util::future::join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ - BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, - MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt, - TransactionReceiptWithBlockInfo, TransactionWithReceipt, + BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, MaybePendingBlockWithReceipts, + MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, Transaction, TransactionReceipt, + TransactionWithReceipt, }; +use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; +use starknet_crypto::Felt; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; @@ -22,30 +24,94 @@ use tokio::task::JoinSet; use tokio::time::sleep; use tracing::{debug, error, info, trace, warn}; +use crate::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor; +use crate::processors::erc20_transfer::Erc20TransferProcessor; +use crate::processors::erc721_legacy_transfer::Erc721LegacyTransferProcessor; +use crate::processors::erc721_transfer::Erc721TransferProcessor; use crate::processors::event_message::EventMessageProcessor; +use crate::processors::metadata_update::MetadataUpdateProcessor; +use crate::processors::register_model::RegisterModelProcessor; +use crate::processors::store_del_record::StoreDelRecordProcessor; +use crate::processors::store_set_record::StoreSetRecordProcessor; +use crate::processors::store_update_member::StoreUpdateMemberProcessor; +use crate::processors::store_update_record::StoreUpdateRecordProcessor; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; use crate::sql::{Cursors, Sql}; -use crate::types::ErcContract; +use crate::types::ContractType; #[allow(missing_debug_implementations)] pub struct Processors { pub block: Vec>>, pub transaction: Vec>>, - pub event: HashMap>>>, pub catch_all_event: Box>, + pub event_processors: HashMap>>>, } impl Default for Processors

{ fn default() -> Self { Self { block: vec![], - event: HashMap::new(), transaction: vec![], catch_all_event: Box::new(EventMessageProcessor) as Box>, + event_processors: Self::initialize_event_processors(), } } } +impl Processors

{ + pub fn initialize_event_processors() + -> HashMap>>> { + let mut event_processors_map = + HashMap::>>>::new(); + + let event_processors = vec![ + ( + ContractType::WORLD, + vec![ + Box::new(RegisterModelProcessor) as Box>, + Box::new(StoreSetRecordProcessor), + Box::new(MetadataUpdateProcessor), + Box::new(StoreDelRecordProcessor), + Box::new(StoreUpdateRecordProcessor), + Box::new(StoreUpdateMemberProcessor), + ], + ), + ( + ContractType::ERC20, + vec![Box::new(Erc20TransferProcessor) as Box>], + ), + ( + ContractType::ERC721, + vec![Box::new(Erc721TransferProcessor) as Box>], + ), + ( + ContractType::ERC20Legacy, + vec![Box::new(Erc20LegacyTransferProcessor) as Box>], + ), + ( + ContractType::ERC721Legacy, + vec![Box::new(Erc721LegacyTransferProcessor) as Box>], + ), + ]; + + for (contract_type, processors) in event_processors { + for processor in processors { + let key = get_selector_from_name(processor.event_key().as_str()) + .expect("Event key is ASCII so this should never fail"); + event_processors_map.entry(contract_type).or_default().insert(key, processor); + } + } + + event_processors_map + } + + pub fn get_event_processor( + &self, + contract_type: ContractType, + ) -> &HashMap>> { + self.event_processors.get(&contract_type).unwrap() + } +} pub(crate) const LOG_TARGET: &str = "torii_core::engine"; pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000; @@ -115,14 +181,13 @@ pub struct ParallelizedEvent { pub struct Engine { world: Arc>, db: Sql, - provider: Box

, + provider: Arc

, processors: Arc>, config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - // ERC tokens to index - tokens: HashMap, - tasks: HashMap>, + tasks: HashMap>, + contracts: Arc>, } struct UnprocessedEvent { @@ -140,17 +205,17 @@ impl Engine

{ config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tokens: HashMap, + contracts: Arc>, ) -> Self { Self { world: Arc::new(world), db, - provider: Box::new(provider), + provider: Arc::new(provider), processors: Arc::new(processors), config, shutdown_tx, block_tx, - tokens, + contracts, tasks: HashMap::new(), } } @@ -243,30 +308,23 @@ impl Engine

{ cursor_map: &HashMap, ) -> Result { // Process all blocks from current to latest. - let world_events_filter = EventFilter { - from_block: Some(BlockId::Number(from)), - to_block: Some(BlockId::Number(to)), - address: Some(self.world.address), - keys: None, - }; + let mut fetch_all_events_tasks = VecDeque::new(); - let mut fetch_all_events_tasks = vec![]; - let world_events_pages = - get_all_events(&self.provider, world_events_filter, self.config.events_chunk_size); - - fetch_all_events_tasks.push(world_events_pages); - - for token in self.tokens.iter() { + for contract in self.contracts.iter() { let events_filter = EventFilter { from_block: Some(BlockId::Number(from)), to_block: Some(BlockId::Number(to)), - address: Some(*token.0), + address: Some(*contract.0), keys: None, }; let token_events_pages = get_all_events(&self.provider, events_filter, self.config.events_chunk_size); - fetch_all_events_tasks.push(token_events_pages); + // Prefer processing world events first + match contract.1 { + ContractType::WORLD => fetch_all_events_tasks.push_front(token_events_pages), + _ => fetch_all_events_tasks.push_back(token_events_pages), + } } let task_result = join_all(fetch_all_events_tasks).await; @@ -280,6 +338,7 @@ impl Engine

{ let events_pages = result.1; let last_contract_tx = cursor_map.get(&contract_address).cloned(); let mut last_contract_tx_tmp = last_contract_tx; + debug!(target: LOG_TARGET, "Total events pages fetched for contract ({:#x}): {}", &contract_address, &events_pages.len()); for events_page in events_pages { @@ -314,38 +373,15 @@ impl Engine

{ // Flatten events pages and events according to the pending block cursor // to array of (block_number, transaction_hash) let mut transactions = LinkedHashMap::new(); + + let mut block_set = HashSet::new(); for event in events { let block_number = match event.block_number { Some(block_number) => block_number, - // If the block number is not present, try to fetch it from the transaction - // receipt Should not/rarely happen. Thus the additional - // fetch is acceptable. - None => { - let TransactionReceiptWithBlockInfo { receipt, block } = - self.provider.get_transaction_receipt(event.transaction_hash).await?; - - match receipt { - TransactionReceipt::Invoke(_) | TransactionReceipt::L1Handler(_) => { - if let ReceiptBlock::Block { block_number, .. } = block { - block_number - } else { - // If the block is pending, we assume the block number is the - // latest + 1 - to + 1 - } - } - - _ => to + 1, - } - } + None => unreachable!("In fetch range all events should have block number"), }; - // Keep track of last block number and fetch block timestamp - if let btree_map::Entry::Vacant(v) = blocks.entry(block_number) { - debug!("Fetching block timestamp for block number: {}", block_number); - let block_timestamp = self.get_block_timestamp(block_number).await?; - v.insert(block_timestamp); - } + block_set.insert(block_number); transactions .entry((block_number, event.transaction_hash)) @@ -353,6 +389,25 @@ impl Engine

{ .push(event); } + let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); + let mut set: JoinSet> = JoinSet::new(); + + for block_number in block_set { + let semaphore = semaphore.clone(); + let provider = self.provider.clone(); + set.spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + debug!("Fetching block timestamp for block number: {}", block_number); + let block_timestamp = get_block_timestamp(&provider, block_number).await?; + Ok((block_number, block_timestamp)) + }); + } + + while let Some(result) = set.join_next().await { + let (block_number, block_timestamp) = result??; + blocks.insert(block_number, block_timestamp); + } + debug!("Transactions: {}", &transactions.len()); debug!("Blocks: {}", &blocks.len()); @@ -417,39 +472,16 @@ impl Engine

{ continue; } - match self + if let Err(e) = self .process_transaction_with_receipt(&t, data.block_number, timestamp, &mut cursor_map) .await { - Err(e) => { - match e.to_string().as_str() { - // TODO: remove this we now fetch the pending block with receipts so this - // error is no longer relevant - "TransactionHashNotFound" => { - // We failed to fetch the transaction, which is because - // the transaction might not have been processed fast enough by the - // provider. So we can fail silently and try - // again in the next iteration. - warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(self.world.address, data.block_number - 1); - if let Some(tx) = last_pending_block_tx { - self.db.set_last_pending_block_tx(Some(tx)); - } - - self.db.execute().await?; - return Ok(()); - } - _ => { - error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction_hash), "Processing pending transaction."); - return Err(e); - } - } - } - Ok(_) => { - last_pending_block_tx = Some(*transaction_hash); - debug!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.") - } + error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction_hash), "Processing pending transaction."); + return Err(e); } + + last_pending_block_tx = Some(*transaction_hash); + debug!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction."); } // Process parallelized events @@ -518,23 +550,22 @@ impl Engine

{ for (task_id, events) in self.tasks.drain() { let db = self.db.clone(); let world = self.world.clone(); - let processors = self.processors.clone(); let semaphore = semaphore.clone(); + let processors = self.processors.clone(); set.spawn(async move { let _permit = semaphore.acquire().await.unwrap(); let mut local_db = db.clone(); - for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events { - if let Some(event_processors) = processors.event.get(&event.keys[0]) { - for processor in event_processors.iter() { - debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); - - if let Err(e) = processor - .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); - } + for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events { + let contract_processors = processors.get_event_processor(contract_type); + if let Some(processor) = contract_processors.get(&event.keys[0]) { + debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); + + if let Err(e) = processor + .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); } } } @@ -551,13 +582,6 @@ impl Engine

{ Ok(()) } - async fn get_block_timestamp(&self, block_number: u64) -> Result { - match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { - MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), - MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), - } - } - async fn process_transaction_with_events( &mut self, transaction_hash: Felt, @@ -576,6 +600,11 @@ impl Engine

{ keys: event.keys.clone(), data: event.data.clone(), }; + + let Some(&contract_type) = self.contracts.get(&event.from_address) else { + continue; + }; + Self::process_event( self, block_number, @@ -583,6 +612,7 @@ impl Engine

{ &event_id, &event, transaction_hash, + contract_type, ) .await?; } @@ -619,11 +649,9 @@ impl Engine

{ if let Some(events) = events { for (event_idx, event) in events.iter().enumerate() { - if event.from_address != self.world.address - && !self.tokens.contains_key(&event.from_address) - { + let Some(&contract_type) = self.contracts.get(&event.from_address) else { continue; - } + }; cursor_map.insert(event.from_address, *transaction_hash); let event_id = @@ -636,11 +664,12 @@ impl Engine

{ &event_id, event, *transaction_hash, + contract_type, ) .await?; } - if world_event && self.config.flags.contains(IndexingFlags::TRANSACTIONS) { + if self.config.flags.contains(IndexingFlags::TRANSACTIONS) { Self::process_transaction( self, block_number, @@ -696,6 +725,7 @@ impl Engine

{ event_id: &str, event: &Event, transaction_hash: Felt, + contract_type: ContractType, ) -> Result<()> { if self.config.flags.contains(IndexingFlags::RAW_EVENTS) { self.db.store_event(event_id, event, transaction_hash, block_timestamp); @@ -703,7 +733,8 @@ impl Engine

{ let event_key = event.keys[0]; - let Some(processors) = self.processors.event.get(&event_key) else { + let processors = self.processors.get_event_processor(contract_type); + let Some(processor) = processors.get(&event_key) else { // if we dont have a processor for this event, we try the catch all processor if self.processors.catch_all_event.validate(event) { if let Err(e) = self @@ -738,36 +769,30 @@ impl Engine

{ return Ok(()); }; - // For now we only have 1 processor for store* events - let task_identifier = if processors.len() == 1 { - match processors[0].event_key().as_str() { - "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { - let mut hasher = DefaultHasher::new(); - event.data[0].hash(&mut hasher); - event.data[1].hash(&mut hasher); - hasher.finish() - } - _ => 0, + let task_identifier = match processor.event_key().as_str() { + "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { + let mut hasher = DefaultHasher::new(); + event.data[0].hash(&mut hasher); + event.data[1].hash(&mut hasher); + hasher.finish() } - } else { - 0 + _ => 0, }; // if we have a task identifier, we queue the event to be parallelized if task_identifier != 0 { - self.tasks.entry(task_identifier).or_default().push(ParallelizedEvent { - event_id: event_id.to_string(), - event: event.clone(), - block_number, - block_timestamp, - }); + self.tasks.entry(task_identifier).or_default().push(( + contract_type, + ParallelizedEvent { + event_id: event_id.to_string(), + event: event.clone(), + block_number, + block_timestamp, + }, + )); } else { // if we dont have a task identifier, we process the event immediately - for processor in processors.iter() { - if !processor.validate(event) { - continue; - } - + if processor.validate(event) { if let Err(e) = processor .process( &self.world, @@ -781,6 +806,8 @@ impl Engine

{ { error!(target: LOG_TARGET, event_name = processor.event_key(), error = ?e, "Processing event."); } + } else { + warn!(target: LOG_TARGET, event_name = processor.event_key(), "Event not validated."); } } @@ -800,6 +827,10 @@ where let mut continuation_token = None; loop { + debug!( + "Fetching events page with continuation token: {:?}, for contract: {:?}", + continuation_token, events_filter.address + ); let events_page = provider .get_events(events_filter.clone(), continuation_token.clone(), events_chunk_size) .await?; @@ -814,3 +845,13 @@ where Ok((events_filter.address, events_pages)) } + +async fn get_block_timestamp

(provider: &P, block_number: u64) -> Result +where + P: Provider + Sync, +{ + match provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { + MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), + MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), + } +} diff --git a/crates/torii/core/src/processors/erc20_legacy_transfer.rs b/crates/torii/core/src/processors/erc20_legacy_transfer.rs index 41852cd89e..4cef0dc19d 100644 --- a/crates/torii/core/src/processors/erc20_legacy_transfer.rs +++ b/crates/torii/core/src/processors/erc20_legacy_transfer.rs @@ -4,7 +4,7 @@ use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, U256}; use starknet::providers::Provider; -use tracing::info; +use tracing::debug; use super::EventProcessor; use crate::sql::Sql; @@ -24,6 +24,7 @@ where } fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/1f9359219a92cdb1576f953db71ee993b8ef5f70/src/openzeppelin/token/erc20/library.cairo#L19-L21 // key: [hash(Transfer)] // data: [from, to, value.0, value.1] if event.keys.len() == 1 && event.data.len() == 4 { @@ -51,7 +52,7 @@ where db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) .await?; - info!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer"); + debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer"); Ok(()) } diff --git a/crates/torii/core/src/processors/erc20_transfer.rs b/crates/torii/core/src/processors/erc20_transfer.rs index a98e288780..10022d9eb0 100644 --- a/crates/torii/core/src/processors/erc20_transfer.rs +++ b/crates/torii/core/src/processors/erc20_transfer.rs @@ -4,7 +4,7 @@ use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, U256}; use starknet::providers::Provider; -use tracing::info; +use tracing::debug; use super::EventProcessor; use crate::sql::Sql; @@ -24,6 +24,7 @@ where } fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/ba00ce76a93dcf25c081ab2698da20690b5a1cfb/packages/token/src/erc20/erc20.cairo#L38-L46 // key: [hash(Transfer), from, to] // data: [value.0, value.1] if event.keys.len() == 3 && event.data.len() == 2 { @@ -51,7 +52,7 @@ where db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) .await?; - info!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer"); + debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer"); Ok(()) } diff --git a/crates/torii/core/src/processors/erc721_legacy_transfer.rs b/crates/torii/core/src/processors/erc721_legacy_transfer.rs new file mode 100644 index 0000000000..89a88f04a3 --- /dev/null +++ b/crates/torii/core/src/processors/erc721_legacy_transfer.rs @@ -0,0 +1,66 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::debug; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_legacy_transfer"; + +#[derive(Default, Debug)] +pub struct Erc721LegacyTransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc721LegacyTransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/1f9359219a92cdb1576f953db71ee993b8ef5f70/src/openzeppelin/token/erc721/library.cairo#L27-L29 + // key: [hash(Transfer)] + // data: [from, to, token_id.0, token_id.1] + if event.keys.len() == 1 && event.data.len() == 4 { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.data[0]; + let to = event.data[1]; + + let token_id = U256Cainome::cairo_deserialize(&event.data, 2)?; + let token_id = U256::from_words(token_id.low, token_id.high); + + db.handle_erc721_transfer( + token_address, + from, + to, + token_id, + world.provider(), + block_timestamp, + ) + .await?; + debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/erc721_transfer.rs b/crates/torii/core/src/processors/erc721_transfer.rs index 665de6424a..319ea81833 100644 --- a/crates/torii/core/src/processors/erc721_transfer.rs +++ b/crates/torii/core/src/processors/erc721_transfer.rs @@ -4,7 +4,7 @@ use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, U256}; use starknet::providers::Provider; -use tracing::info; +use tracing::debug; use super::EventProcessor; use crate::sql::Sql; @@ -24,7 +24,7 @@ where } fn validate(&self, event: &Event) -> bool { - // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/eabfa029b7b681d9e83bf171f723081b07891016/packages/token/src/erc721/erc721.cairo#L44-L53 + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/ba00ce76a93dcf25c081ab2698da20690b5a1cfb/packages/token/src/erc721/erc721.cairo#L40-L49 // key: [hash(Transfer), from, to, token_id.low, token_id.high] // data: [] if event.keys.len() == 5 && event.data.is_empty() { @@ -59,7 +59,7 @@ where block_timestamp, ) .await?; - info!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); + debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); Ok(()) } diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index f4bfddffd3..cf25f36ca6 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -1,19 +1,14 @@ -use std::collections::HashMap; - use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, Felt, Transaction}; -use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; use crate::sql::Sql; -// pub mod erc20_legacy_transfer; -// pub mod erc20_transfer; -// pub mod erc721_transfer; pub mod erc20_legacy_transfer; pub mod erc20_transfer; +pub mod erc721_legacy_transfer; pub mod erc721_transfer; pub mod event_message; pub mod metadata_update; @@ -78,19 +73,3 @@ pub trait TransactionProcessor: Send + Sync { transaction: &Transaction, ) -> Result<(), Error>; } - -type EventProcessors

= Vec>>; - -/// Given a list of event processors, generate a map of event keys to the event processor -pub fn generate_event_processors_map( - event_processor: EventProcessors

, -) -> Result>> { - let mut event_processors = HashMap::new(); - - for processor in event_processor { - let key = get_selector_from_name(processor.event_key().as_str())?; - event_processors.entry(key).or_insert(vec![]).push(processor); - } - - Ok(event_processors) -} diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs index 507e629b81..f856c2f21a 100644 --- a/crates/torii/core/src/sql/erc.rs +++ b/crates/torii/core/src/sql/erc.rs @@ -5,8 +5,9 @@ use cainome::cairo_serde::{ByteArray, CairoSerde}; use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall, U256}; use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; use starknet::providers::Provider; +use tracing::debug; -use super::query_queue::{Argument, QueryQueue, QueryType}; +use super::query_queue::{Argument, QueryType}; use super::utils::{sql_string_to_u256, u256_to_sql_string}; use super::{Sql, FELT_DELIMITER}; use crate::sql::utils::{felt_and_u256_to_sql_string, felt_to_sql_string, felts_to_sql_string}; @@ -32,23 +33,16 @@ impl Sql { .await?; if !token_exists { - register_erc20_token_metadata( - contract_address, - &mut self.query_queue, - &token_id, - provider, - ) - .await?; + self.register_erc20_token_metadata(contract_address, &token_id, provider).await?; } - register_erc_transfer_event( + self.register_erc_transfer_event( contract_address, from_address, to_address, amount, &token_id, block_timestamp, - &mut self.query_queue, ); // Update balances in erc20_balance table @@ -149,23 +143,16 @@ impl Sql { .await?; if !token_exists { - register_erc721_token_metadata( - contract_address, - &mut self.query_queue, - &token_id, - provider, - ) - .await?; + self.register_erc721_token_metadata(contract_address, &token_id, provider).await?; } - register_erc_transfer_event( + self.register_erc_transfer_event( contract_address, from_address, to_address, U256::from(1u8), &token_id, block_timestamp, - &mut self.query_queue, ); // Update balances in erc721_balances table @@ -216,173 +203,204 @@ impl Sql { Ok(()) } -} -async fn register_erc20_token_metadata( - contract_address: Felt, - queue: &mut QueryQueue, - token_id: &str, - provider: &P, -) -> Result<()> { - // Fetch token information from the chain - let name = provider - .call( - FunctionCall { - contract_address, - entry_point_selector: get_selector_from_name("name").unwrap(), - calldata: vec![], - }, - BlockId::Tag(BlockTag::Pending), - ) - .await?; - - // len = 1 => return value felt (i.e. legacy erc20 token) - // len > 1 => return value ByteArray (i.e. new erc20 token) - let name = if name.len() == 1 { - parse_cairo_short_string(&name[0]).unwrap() - } else { - ByteArray::cairo_deserialize(&name, 0) - .expect("Return value not ByteArray") - .to_string() - .expect("Return value not String") - }; - - let symbol = provider - .call( - FunctionCall { - contract_address, - entry_point_selector: get_selector_from_name("symbol").unwrap(), - calldata: vec![], - }, - BlockId::Tag(BlockTag::Pending), - ) - .await?; - - let symbol = if symbol.len() == 1 { - parse_cairo_short_string(&symbol[0]).unwrap() - } else { - ByteArray::cairo_deserialize(&symbol, 0) - .expect("Return value not ByteArray") - .to_string() - .expect("Return value not String") - }; - - let decimals = provider - .call( - FunctionCall { - contract_address, - entry_point_selector: get_selector_from_name("decimals").unwrap(), - calldata: vec![], - }, - BlockId::Tag(BlockTag::Pending), - ) - .await?; - let decimals = u8::cairo_deserialize(&decimals, 0).expect("Return value not u8"); - - // Insert the token into the tokens table - queue.enqueue( - "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, ?, ?)", - vec![ - Argument::String(token_id.to_string()), - Argument::FieldElement(contract_address), - Argument::String(name), - Argument::String(symbol), - Argument::Int(decimals.into()), - ], - QueryType::Other, - ); - - Ok(()) -} + async fn register_erc20_token_metadata( + &mut self, + contract_address: Felt, + token_id: &str, + provider: &P, + ) -> Result<()> { + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; -async fn register_erc721_token_metadata( - contract_address: Felt, - queue: &mut QueryQueue, - token_id: &str, - provider: &P, -) -> Result<()> { - // Fetch token information from the chain - let name = provider - .call( - FunctionCall { - contract_address, - entry_point_selector: get_selector_from_name("name").unwrap(), - calldata: vec![], - }, - BlockId::Tag(BlockTag::Pending), - ) - .await?; - - // len = 1 => return value felt (i.e. legacy erc721 token) - // len > 1 => return value ByteArray (i.e. new erc721 token) - let name = if name.len() == 1 { - parse_cairo_short_string(&name[0]).unwrap() - } else { - ByteArray::cairo_deserialize(&name, 0) - .expect("Return value not ByteArray") - .to_string() - .expect("Return value not String") - }; - - let symbol = provider - .call( - FunctionCall { - contract_address, - entry_point_selector: get_selector_from_name("symbol").unwrap(), - calldata: vec![], - }, - BlockId::Tag(BlockTag::Pending), + // len = 1 => return value felt (i.e. legacy erc20 token) + // len > 1 => return value ByteArray (i.e. new erc20 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let decimals = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("decimals").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let decimals = u8::cairo_deserialize(&decimals, 0).expect("Return value not u8"); + + // Insert the token into the tokens table + self.query_queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, \ + ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + + Ok(()) + } + + async fn register_erc721_token_metadata( + &mut self, + contract_address: Felt, + token_id: &str, + provider: &P, + ) -> Result<()> { + let res = sqlx::query_as::<_, (String, String, u8)>( + "SELECT name, symbol, decimals FROM tokens WHERE contract_address = ?", ) - .await?; - let symbol = if symbol.len() == 1 { - parse_cairo_short_string(&symbol[0]).unwrap() - } else { - ByteArray::cairo_deserialize(&symbol, 0) - .expect("Return value not ByteArray") - .to_string() - .expect("Return value not String") - }; - - let decimals = 0; - - // Insert the token into the tokens table - queue.enqueue( - "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, ?, ?)", - vec![ - Argument::String(token_id.to_string()), - Argument::FieldElement(contract_address), - Argument::String(name), - Argument::String(symbol), - Argument::Int(decimals.into()), - ], - QueryType::Other, - ); - - Ok(()) -} + .bind(felt_to_sql_string(&contract_address)) + .fetch_one(&self.pool) + .await; + + // If we find a token already registered for this contract_address we dont need to refetch + // the data since its same for all ERC721 tokens + if let Ok((name, symbol, decimals)) = res { + debug!( + contract_address = %felt_to_sql_string(&contract_address), + "Token already registered for contract_address, so reusing fetched data", + ); + self.query_queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, \ + ?, ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + return Ok(()); + } + + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + // len = 1 => return value felt (i.e. legacy erc721 token) + // len > 1 => return value ByteArray (i.e. new erc721 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let decimals = 0; + + // Insert the token into the tokens table + self.query_queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, \ + ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + + Ok(()) + } -fn register_erc_transfer_event( - contract_address: Felt, - from: Felt, - to: Felt, - amount: U256, - token_id: &str, - block_timestamp: u64, - queue: &mut QueryQueue, -) { - let insert_query = "INSERT INTO erc_transfers (contract_address, from_address, to_address, \ - amount, token_id, executed_at) VALUES (?, ?, ?, ?, ?, ?)"; - - queue.enqueue( - insert_query, - vec![ - Argument::FieldElement(contract_address), - Argument::FieldElement(from), - Argument::FieldElement(to), - Argument::String(u256_to_sql_string(&amount)), - Argument::String(token_id.to_string()), - Argument::String(utc_dt_string_from_timestamp(block_timestamp)), - ], - QueryType::Other, - ); + fn register_erc_transfer_event( + &mut self, + contract_address: Felt, + from: Felt, + to: Felt, + amount: U256, + token_id: &str, + block_timestamp: u64, + ) { + let insert_query = "INSERT INTO erc_transfers (contract_address, from_address, \ + to_address, amount, token_id, executed_at) VALUES (?, ?, ?, ?, ?, ?)"; + + self.query_queue.enqueue( + insert_query, + vec![ + Argument::FieldElement(contract_address), + Argument::FieldElement(from), + Argument::FieldElement(to), + Argument::String(u256_to_sql_string(&amount)), + Argument::String(token_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + QueryType::Other, + ); + } } diff --git a/crates/torii/core/src/sql/mod.rs b/crates/torii/core/src/sql/mod.rs index 4257a3b920..aea8a3ce3e 100644 --- a/crates/torii/core/src/sql/mod.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use std::str::FromStr; use std::sync::Arc; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use chrono::Utc; use dojo_types::primitive::Primitive; use dojo_types::schema::{EnumOption, Member, Struct, Ty}; @@ -20,7 +20,7 @@ use utils::felts_to_sql_string; use crate::cache::{Model, ModelCache}; use crate::types::{ - ErcContract, Event as EventEmitted, EventMessage as EventMessageUpdated, + ContractType, Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, }; use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; @@ -68,29 +68,18 @@ impl Sql { pub async fn new( pool: Pool, world_address: Felt, - erc_contracts: &HashMap, + contracts: &HashMap, ) -> Result { let mut query_queue = QueryQueue::new(pool.clone()); - query_queue.enqueue( - "INSERT OR IGNORE INTO contracts (id, contract_address, contract_type) VALUES (?, ?, \ - ?)", - vec![ - Argument::FieldElement(world_address), - Argument::FieldElement(world_address), - Argument::String(WORLD_CONTRACT_TYPE.to_string()), - ], - QueryType::Other, - ); - - for contract in erc_contracts.values() { + for contract in contracts { query_queue.enqueue( "INSERT OR IGNORE INTO contracts (id, contract_address, contract_type) VALUES (?, \ ?, ?)", vec![ - Argument::FieldElement(contract.contract_address), - Argument::FieldElement(contract.contract_address), - Argument::String(contract.r#type.to_string()), + Argument::FieldElement(*contract.0), + Argument::FieldElement(*contract.0), + Argument::String(contract.1.to_string()), ], QueryType::Other, ); @@ -131,8 +120,10 @@ impl Sql { ) .bind(format!("{:#x}", contract)); - let indexer: (Option, Option, Option, String) = - indexer_query.fetch_one(&mut *conn).await?; + let indexer: (Option, Option, Option, String) = indexer_query + .fetch_one(&mut *conn) + .await + .with_context(|| format!("Failed to fetch head for contract: {:#x}", contract))?; Ok(( indexer.0.map(|h| h.try_into().expect("doesn't fit in u64")).unwrap_or(0), indexer.1.map(|f| Felt::from_str(&f)).transpose()?, diff --git a/crates/torii/core/src/sql/test.rs b/crates/torii/core/src/sql/test.rs index 857790d157..e4b9d2a458 100644 --- a/crates/torii/core/src/sql/test.rs +++ b/crates/torii/core/src/sql/test.rs @@ -21,13 +21,8 @@ use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast; use crate::engine::{Engine, EngineConfig, Processors}; -use crate::processors::generate_event_processors_map; -use crate::processors::register_model::RegisterModelProcessor; -use crate::processors::store_del_record::StoreDelRecordProcessor; -use crate::processors::store_set_record::StoreSetRecordProcessor; -use crate::processors::store_update_member::StoreUpdateMemberProcessor; -use crate::processors::store_update_record::StoreUpdateRecordProcessor; use crate::sql::Sql; +use crate::types::ContractType; pub async fn bootstrap_engine

( world: WorldContractReader

, @@ -39,24 +34,16 @@ where { let (shutdown_tx, _) = broadcast::channel(1); let to = provider.block_hash_and_number().await?.block_number; + let world_address = world.address; let mut engine = Engine::new( world, db, provider, - Processors { - event: generate_event_processors_map(vec![ - Box::new(RegisterModelProcessor), - Box::new(StoreSetRecordProcessor), - Box::new(StoreUpdateRecordProcessor), - Box::new(StoreUpdateMemberProcessor), - Box::new(StoreDelRecordProcessor), - ])?, - ..Processors::default() - }, + Processors { ..Processors::default() }, EngineConfig::default(), shutdown_tx, None, - HashMap::new(), + Arc::new(HashMap::from([(world_address, ContractType::WORLD)])), ); let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); @@ -125,7 +112,13 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address, &HashMap::new()).await.unwrap(); + let mut db = Sql::new( + pool.clone(), + world_reader.address, + &HashMap::from([(world_reader.address, ContractType::WORLD)]), + ) + .await + .unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), provider).await.unwrap(); @@ -282,7 +275,13 @@ async fn test_load_from_remote_del(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address, &HashMap::new()).await.unwrap(); + let mut db = Sql::new( + pool.clone(), + world_reader.address, + &HashMap::from([(world_reader.address, ContractType::WORLD)]), + ) + .await + .unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), provider).await; @@ -369,7 +368,13 @@ async fn test_update_with_set_record(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address, &HashMap::new()).await.unwrap(); + let mut db = Sql::new( + pool.clone(), + world_reader.address, + &HashMap::from([(world_reader.address, ContractType::WORLD)]), + ) + .await + .unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); diff --git a/crates/torii/core/src/sql/utils.rs b/crates/torii/core/src/sql/utils.rs index 5a6b97b860..26476b0837 100644 --- a/crates/torii/core/src/sql/utils.rs +++ b/crates/torii/core/src/sql/utils.rs @@ -8,19 +8,19 @@ pub fn felts_to_sql_string(felts: &[Felt]) -> String { + FELT_DELIMITER } -pub(crate) fn felt_to_sql_string(felt: &Felt) -> String { +pub fn felt_to_sql_string(felt: &Felt) -> String { format!("{:#x}", felt) } -pub(crate) fn felt_and_u256_to_sql_string(felt: &Felt, u256: &U256) -> String { +pub fn felt_and_u256_to_sql_string(felt: &Felt, u256: &U256) -> String { format!("{}:{}", felt_to_sql_string(felt), u256_to_sql_string(u256)) } -pub(crate) fn u256_to_sql_string(u256: &U256) -> String { +pub fn u256_to_sql_string(u256: &U256) -> String { format!("{:#064x}", u256) } -pub(crate) fn sql_string_to_u256(sql_string: &str) -> U256 { +pub fn sql_string_to_u256(sql_string: &str) -> U256 { let sql_string = sql_string.strip_prefix("0x").unwrap_or(sql_string); U256::from(crypto_bigint::U256::from_be_hex(sql_string)) } diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index 0e9031ae0b..e3c814b9a2 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -1,4 +1,5 @@ use core::fmt; +use std::collections::VecDeque; use std::path::PathBuf; use std::str::FromStr; @@ -89,8 +90,8 @@ pub struct Event { #[derive(Default, Deserialize, Debug, Clone)] pub struct ToriiConfig { - /// ERC contract addresses to index - pub erc_contracts: Vec, + /// contract addresses to index + pub contracts: VecDeque, } impl ToriiConfig { @@ -101,37 +102,44 @@ impl ToriiConfig { } } -#[derive(Default, Deserialize, Debug, Clone)] -pub struct ErcContract { - pub contract_address: Felt, - pub start_block: u64, - pub r#type: ErcType, +#[derive(Deserialize, Debug, Clone, Copy)] +pub struct Contract { + pub address: Felt, + pub r#type: ContractType, } -#[derive(Default, Deserialize, Debug, Clone)] -pub enum ErcType { - #[default] +#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ContractType { + WORLD, ERC20, + ERC20Legacy, ERC721, + ERC721Legacy, } -impl FromStr for ErcType { +impl FromStr for ContractType { type Err = anyhow::Error; fn from_str(input: &str) -> Result { match input.to_lowercase().as_str() { - "erc20" => Ok(ErcType::ERC20), - "erc721" => Ok(ErcType::ERC721), + "world" => Ok(ContractType::WORLD), + "erc20" => Ok(ContractType::ERC20), + "erc721" => Ok(ContractType::ERC721), + "erc20legacy" | "erc20_legacy" => Ok(ContractType::ERC20Legacy), + "erc721legacy" | "erc721_legacy" => Ok(ContractType::ERC721Legacy), _ => Err(anyhow::anyhow!("Invalid ERC type: {}", input)), } } } -impl std::fmt::Display for ErcType { +impl std::fmt::Display for ContractType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ErcType::ERC20 => write!(f, "ERC20"), - ErcType::ERC721 => write!(f, "ERC721"), + ContractType::WORLD => write!(f, "WORLD"), + ContractType::ERC20 => write!(f, "ERC20"), + ContractType::ERC721 => write!(f, "ERC721"), + ContractType::ERC20Legacy => write!(f, "ERC20Legacy"), + ContractType::ERC721Legacy => write!(f, "ERC721Legacy"), } } } diff --git a/crates/torii/graphql/Cargo.toml b/crates/torii/graphql/Cargo.toml index 464dddefb6..8c239ef9b4 100644 --- a/crates/torii/graphql/Cargo.toml +++ b/crates/torii/graphql/Cargo.toml @@ -26,6 +26,7 @@ serde.workspace = true serde_json.workspace = true sozo-ops.workspace = true sqlx.workspace = true +starknet-crypto.workspace = true strum.workspace = true strum_macros.workspace = true thiserror.workspace = true @@ -46,5 +47,4 @@ dojo-world.workspace = true katana-runner.workspace = true scarb.workspace = true serial_test = "2.0.0" -starknet-crypto.workspace = true starknet.workspace = true diff --git a/crates/torii/graphql/src/constants.rs b/crates/torii/graphql/src/constants.rs index e09c8de6d2..2d851f07b1 100644 --- a/crates/torii/graphql/src/constants.rs +++ b/crates/torii/graphql/src/constants.rs @@ -33,6 +33,9 @@ pub const QUERY_TYPE_NAME: &str = "World__Query"; pub const SUBSCRIPTION_TYPE_NAME: &str = "World__Subscription"; pub const MODEL_ORDER_TYPE_NAME: &str = "World__ModelOrder"; pub const MODEL_ORDER_FIELD_TYPE_NAME: &str = "World__ModelOrderField"; +pub const ERC_BALANCE_TYPE_NAME: &str = "ERC__Balance"; +pub const ERC_TRANSFER_TYPE_NAME: &str = "ERC__Transfer"; +pub const ERC_TOKEN_TYPE_NAME: &str = "ERC__Token"; // objects' single and plural names pub const ENTITY_NAMES: (&str, &str) = ("entity", "entities"); @@ -45,6 +48,10 @@ pub const METADATA_NAMES: (&str, &str) = ("metadata", "metadatas"); pub const TRANSACTION_NAMES: (&str, &str) = ("transaction", "transactions"); pub const PAGE_INFO_NAMES: (&str, &str) = ("pageInfo", ""); +pub const ERC_BALANCE_NAME: (&str, &str) = ("ercBalance", ""); +pub const ERC_TOKEN_NAME: (&str, &str) = ("ercToken", ""); +pub const ERC_TRANSFER_NAME: (&str, &str) = ("ercTransfer", ""); + // misc pub const ORDER_DIR_TYPE_NAME: &str = "OrderDirection"; pub const ORDER_ASC: &str = "ASC"; diff --git a/crates/torii/graphql/src/error.rs b/crates/torii/graphql/src/error.rs index d00969f98b..83834c9f35 100644 --- a/crates/torii/graphql/src/error.rs +++ b/crates/torii/graphql/src/error.rs @@ -9,6 +9,8 @@ pub enum ExtractError { NotList(String), #[error("Not a string: {0}")] NotString(String), + #[error("Not a felt: {0}")] + NotFelt(String), #[error("Not a number: {0}")] NotNumber(String), } diff --git a/crates/torii/graphql/src/mapping.rs b/crates/torii/graphql/src/mapping.rs index 1086373bca..47f7d8e1b1 100644 --- a/crates/torii/graphql/src/mapping.rs +++ b/crates/torii/graphql/src/mapping.rs @@ -4,7 +4,7 @@ use async_graphql::Name; use dojo_types::primitive::Primitive; use lazy_static::lazy_static; -use crate::constants::{CONTENT_TYPE_NAME, SOCIAL_TYPE_NAME}; +use crate::constants::{CONTENT_TYPE_NAME, ERC_TOKEN_TYPE_NAME, SOCIAL_TYPE_NAME}; use crate::types::{GraphqlType, TypeData, TypeMapping}; lazy_static! { @@ -144,4 +144,27 @@ lazy_static! { TypeData::Simple(TypeRef::named(GraphqlType::DateTime.to_string())) ), ]); + + pub static ref ERC_BALANCE_TYPE_MAPPING: TypeMapping = IndexMap::from([ + (Name::new("balance"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("type"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("token_metadata"), TypeData::Simple(TypeRef::named(ERC_TOKEN_TYPE_NAME))), + ]); + + pub static ref ERC_TRANSFER_TYPE_MAPPING: TypeMapping = IndexMap::from([ + (Name::new("from"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("to"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("amount"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("type"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("executed_at"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("token_metadata"), TypeData::Simple(TypeRef::named(ERC_TOKEN_TYPE_NAME))), + ]); + + pub static ref ERC_TOKEN_TYPE_MAPPING: TypeMapping = IndexMap::from([ + (Name::new("name"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("symbol"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("token_id"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("decimals"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("contract_address"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + ]); } diff --git a/crates/torii/graphql/src/object/erc/erc_balance.rs b/crates/torii/graphql/src/object/erc/erc_balance.rs new file mode 100644 index 0000000000..77cc492dc2 --- /dev/null +++ b/crates/torii/graphql/src/object/erc/erc_balance.rs @@ -0,0 +1,143 @@ +use async_graphql::dynamic::{Field, FieldFuture, InputValue, TypeRef}; +use async_graphql::{Name, Value}; +use convert_case::{Case, Casing}; +use serde::Deserialize; +use sqlx::{FromRow, Pool, Sqlite, SqliteConnection}; +use starknet_crypto::Felt; +use torii_core::sql::utils::felt_to_sql_string; +use tracing::warn; + +use crate::constants::{ERC_BALANCE_NAME, ERC_BALANCE_TYPE_NAME}; +use crate::mapping::ERC_BALANCE_TYPE_MAPPING; +use crate::object::{BasicObject, ResolvableObject}; +use crate::types::{TypeMapping, ValueMapping}; +use crate::utils::extract; + +#[derive(Debug)] +pub struct ErcBalanceObject; + +impl BasicObject for ErcBalanceObject { + fn name(&self) -> (&str, &str) { + ERC_BALANCE_NAME + } + + fn type_name(&self) -> &str { + ERC_BALANCE_TYPE_NAME + } + + fn type_mapping(&self) -> &TypeMapping { + &ERC_BALANCE_TYPE_MAPPING + } +} + +impl ResolvableObject for ErcBalanceObject { + fn resolvers(&self) -> Vec { + let account_address = "account_address"; + let argument = InputValue::new( + account_address.to_case(Case::Camel), + TypeRef::named_nn(TypeRef::STRING), + ); + + let field = Field::new(self.name().0, TypeRef::named_list(self.type_name()), move |ctx| { + FieldFuture::new(async move { + let mut conn = ctx.data::>()?.acquire().await?; + let address = extract::( + ctx.args.as_index_map(), + &account_address.to_case(Case::Camel), + )?; + + let erc_balances = fetch_erc_balances(&mut conn, address).await?; + + Ok(Some(Value::List(erc_balances))) + }) + }) + .argument(argument); + vec![field] + } +} + +async fn fetch_erc_balances( + conn: &mut SqliteConnection, + address: Felt, +) -> sqlx::Result> { + let query = "SELECT t.contract_address, t.name, t.symbol, t.decimals, b.balance, b.token_id, \ + c.contract_type + FROM balances b + JOIN tokens t ON b.token_id = t.id + JOIN contracts c ON t.contract_address = c.contract_address + WHERE b.account_address = ?"; + + let rows = sqlx::query(query).bind(felt_to_sql_string(&address)).fetch_all(conn).await?; + + let mut erc_balances = Vec::new(); + + for row in rows { + let row = BalanceQueryResultRaw::from_row(&row)?; + + let balance_value = match row.contract_type.as_str() { + "ERC20" | "Erc20" | "erc20" => { + let token_metadata = Value::Object(ValueMapping::from([ + (Name::new("name"), Value::String(row.name)), + (Name::new("symbol"), Value::String(row.symbol)), + // for erc20 there is no token_id + (Name::new("token_id"), Value::Null), + (Name::new("decimals"), Value::String(row.decimals.to_string())), + (Name::new("contract_address"), Value::String(row.contract_address.clone())), + ])); + + Value::Object(ValueMapping::from([ + (Name::new("balance"), Value::String(row.balance)), + (Name::new("type"), Value::String(row.contract_type)), + (Name::new("token_metadata"), token_metadata), + ])) + } + "ERC721" | "Erc721" | "erc721" => { + // contract_address:token_id + let token_id = row.token_id.split(':').collect::>(); + assert!(token_id.len() == 2); + + let token_metadata = Value::Object(ValueMapping::from([ + (Name::new("contract_address"), Value::String(row.contract_address.clone())), + (Name::new("name"), Value::String(row.name)), + (Name::new("symbol"), Value::String(row.symbol)), + (Name::new("token_id"), Value::String(row.token_id)), + (Name::new("decimals"), Value::String(row.decimals.to_string())), + ])); + + Value::Object(ValueMapping::from([ + (Name::new("balance"), Value::String(row.balance)), + (Name::new("type"), Value::String(row.contract_type)), + (Name::new("token_metadata"), token_metadata), + ])) + } + _ => { + warn!("Unknown contract type: {}", row.contract_type); + continue; + } + }; + + erc_balances.push(balance_value); + } + + Ok(erc_balances) +} + +// TODO: This would be required when subscriptions are needed +// impl ErcBalanceObject { +// pub fn value_mapping(entity: ErcBalance) -> ValueMapping { +// IndexMap::from([ +// ]) +// } +// } + +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +struct BalanceQueryResultRaw { + pub contract_address: String, + pub name: String, + pub symbol: String, + pub decimals: u8, + pub token_id: String, + pub balance: String, + pub contract_type: String, +} diff --git a/crates/torii/graphql/src/object/erc/erc_token.rs b/crates/torii/graphql/src/object/erc/erc_token.rs new file mode 100644 index 0000000000..14b8de7877 --- /dev/null +++ b/crates/torii/graphql/src/object/erc/erc_token.rs @@ -0,0 +1,21 @@ +use crate::constants::{ERC_TOKEN_NAME, ERC_TOKEN_TYPE_NAME}; +use crate::mapping::ERC_TOKEN_TYPE_MAPPING; +use crate::object::BasicObject; +use crate::types::TypeMapping; + +#[derive(Debug)] +pub struct ErcTokenObject; + +impl BasicObject for ErcTokenObject { + fn name(&self) -> (&str, &str) { + ERC_TOKEN_NAME + } + + fn type_name(&self) -> &str { + ERC_TOKEN_TYPE_NAME + } + + fn type_mapping(&self) -> &TypeMapping { + &ERC_TOKEN_TYPE_MAPPING + } +} diff --git a/crates/torii/graphql/src/object/erc/erc_transfer.rs b/crates/torii/graphql/src/object/erc/erc_transfer.rs new file mode 100644 index 0000000000..056f0c224b --- /dev/null +++ b/crates/torii/graphql/src/object/erc/erc_transfer.rs @@ -0,0 +1,181 @@ +use async_graphql::dynamic::{Field, FieldFuture, InputValue, TypeRef}; +use async_graphql::{Name, Value}; +use convert_case::{Case, Casing}; +use serde::Deserialize; +use sqlx::{FromRow, Pool, Sqlite, SqliteConnection}; +use starknet_crypto::Felt; +use torii_core::sql::utils::felt_to_sql_string; +use tracing::warn; + +use crate::constants::{ERC_TRANSFER_NAME, ERC_TRANSFER_TYPE_NAME}; +use crate::mapping::ERC_TRANSFER_TYPE_MAPPING; +use crate::object::{BasicObject, ResolvableObject}; +use crate::types::{TypeMapping, ValueMapping}; +use crate::utils::extract; + +#[derive(Debug)] +pub struct ErcTransferObject; + +impl BasicObject for ErcTransferObject { + fn name(&self) -> (&str, &str) { + ERC_TRANSFER_NAME + } + + fn type_name(&self) -> &str { + ERC_TRANSFER_TYPE_NAME + } + + fn type_mapping(&self) -> &TypeMapping { + &ERC_TRANSFER_TYPE_MAPPING + } +} + +impl ResolvableObject for ErcTransferObject { + fn resolvers(&self) -> Vec { + let account_address = "account_address"; + let limit = "limit"; + let arg_addr = InputValue::new( + account_address.to_case(Case::Camel), + TypeRef::named_nn(TypeRef::STRING), + ); + let arg_limit = + InputValue::new(limit.to_case(Case::Camel), TypeRef::named_nn(TypeRef::INT)); + + let field = Field::new(self.name().0, TypeRef::named_list(self.type_name()), move |ctx| { + FieldFuture::new(async move { + let mut conn = ctx.data::>()?.acquire().await?; + let address = extract::( + ctx.args.as_index_map(), + &account_address.to_case(Case::Camel), + )?; + let limit = extract::(ctx.args.as_index_map(), &limit.to_case(Case::Camel))?; + let limit: u32 = limit.try_into()?; + + let erc_transfers = fetch_erc_transfers(&mut conn, address, limit).await?; + + Ok(Some(Value::List(erc_transfers))) + }) + }) + .argument(arg_addr) + .argument(arg_limit); + vec![field] + } +} + +async fn fetch_erc_transfers( + conn: &mut SqliteConnection, + address: Felt, + limit: u32, +) -> sqlx::Result> { + let query = format!( + r#" +SELECT + et.contract_address, + et.from_address, + et.to_address, + et.amount, + et.token_id, + et.executed_at, + t.name, + t.symbol, + t.decimals, + c.contract_type +FROM + erc_transfers et +JOIN + tokens t ON et.token_id = t.id +JOIN + contracts c ON t.contract_address = c.contract_address +WHERE + et.from_address = ? OR et.to_address = ? +ORDER BY + et.executed_at DESC +LIMIT {}; +"#, + limit + ); + + let address = felt_to_sql_string(&address); + let rows = sqlx::query(&query).bind(&address).bind(&address).fetch_all(conn).await?; + + let mut erc_balances = Vec::new(); + + for row in rows { + let row = TransferQueryResultRaw::from_row(&row)?; + + let transfer_value = match row.contract_type.as_str() { + "ERC20" | "Erc20" | "erc20" => { + let token_metadata = Value::Object(ValueMapping::from([ + (Name::new("name"), Value::String(row.name)), + (Name::new("symbol"), Value::String(row.symbol)), + // for erc20 there is no token_id + (Name::new("token_id"), Value::Null), + (Name::new("decimals"), Value::String(row.decimals.to_string())), + (Name::new("contract_address"), Value::String(row.contract_address.clone())), + ])); + + Value::Object(ValueMapping::from([ + (Name::new("from"), Value::String(row.from_address)), + (Name::new("to"), Value::String(row.to_address)), + (Name::new("amount"), Value::String(row.amount)), + (Name::new("type"), Value::String(row.contract_type)), + (Name::new("executed_at"), Value::String(row.executed_at)), + (Name::new("token_metadata"), token_metadata), + ])) + } + "ERC721" | "Erc721" | "erc721" => { + // contract_address:token_id + let token_id = row.token_id.split(':').collect::>(); + assert!(token_id.len() == 2); + + let token_metadata = Value::Object(ValueMapping::from([ + (Name::new("name"), Value::String(row.name)), + (Name::new("symbol"), Value::String(row.symbol)), + (Name::new("token_id"), Value::String(token_id[1].to_string())), + (Name::new("decimals"), Value::String(row.decimals.to_string())), + (Name::new("contract_address"), Value::String(row.contract_address.clone())), + ])); + + Value::Object(ValueMapping::from([ + (Name::new("from"), Value::String(row.from_address)), + (Name::new("to"), Value::String(row.to_address)), + (Name::new("amount"), Value::String(row.amount)), + (Name::new("type"), Value::String(row.contract_type)), + (Name::new("executed_at"), Value::String(row.executed_at)), + (Name::new("token_metadata"), token_metadata), + ])) + } + _ => { + warn!("Unknown contract type: {}", row.contract_type); + continue; + } + }; + + erc_balances.push(transfer_value); + } + + Ok(erc_balances) +} + +// TODO: This would be required when subscriptions are needed +// impl ErcTransferObject { +// pub fn value_mapping(entity: ErcBalance) -> ValueMapping { +// IndexMap::from([ +// ]) +// } +// } + +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +struct TransferQueryResultRaw { + pub contract_address: String, + pub from_address: String, + pub to_address: String, + pub token_id: String, + pub amount: String, + pub executed_at: String, + pub name: String, + pub symbol: String, + pub decimals: u8, + pub contract_type: String, +} diff --git a/crates/torii/graphql/src/object/erc/mod.rs b/crates/torii/graphql/src/object/erc/mod.rs new file mode 100644 index 0000000000..eac2c5510b --- /dev/null +++ b/crates/torii/graphql/src/object/erc/mod.rs @@ -0,0 +1,3 @@ +pub mod erc_balance; +pub mod erc_token; +pub mod erc_transfer; diff --git a/crates/torii/graphql/src/object/mod.rs b/crates/torii/graphql/src/object/mod.rs index c1046ffbe4..8997cdabe3 100644 --- a/crates/torii/graphql/src/object/mod.rs +++ b/crates/torii/graphql/src/object/mod.rs @@ -1,5 +1,6 @@ pub mod connection; pub mod entity; +pub mod erc; pub mod event; pub mod event_message; pub mod inputs; diff --git a/crates/torii/graphql/src/schema.rs b/crates/torii/graphql/src/schema.rs index 48a915345b..5f70c49908 100644 --- a/crates/torii/graphql/src/schema.rs +++ b/crates/torii/graphql/src/schema.rs @@ -10,6 +10,9 @@ use super::object::model_data::ModelDataObject; use super::types::ScalarType; use super::utils; use crate::constants::{QUERY_TYPE_NAME, SUBSCRIPTION_TYPE_NAME}; +use crate::object::erc::erc_balance::ErcBalanceObject; +use crate::object::erc::erc_token::ErcTokenObject; +use crate::object::erc::erc_transfer::ErcTransferObject; use crate::object::event_message::EventMessageObject; use crate::object::metadata::content::ContentObject; use crate::object::metadata::social::SocialObject; @@ -28,6 +31,7 @@ pub async fn build_schema(pool: &SqlitePool) -> Result { let (objects, unions) = build_objects(pool).await?; let mut schema_builder = Schema::build(QUERY_TYPE_NAME, None, Some(SUBSCRIPTION_TYPE_NAME)); + //? why we need to provide QUERY_TYPE_NAME object here when its already passed to Schema? let mut query_root = Object::new(QUERY_TYPE_NAME); let mut subscription_root = Subscription::new(SUBSCRIPTION_TYPE_NAME); @@ -112,9 +116,12 @@ async fn build_objects(pool: &SqlitePool) -> Result<(Vec, Vec Result { .await .unwrap(); - TransactionWaiter::new(transaction_hash, &provider).await?; + TransactionWaiter::new(transaction_hash, &account.provider()).await?; // Execute `delete` and delete Record with id 20 let InvokeTransactionResult { transaction_hash } = account @@ -351,26 +348,25 @@ pub async fn spinup_types_test() -> Result { let world = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let db = Sql::new(pool.clone(), strat.world_address, &HashMap::new()).await.unwrap(); + let db = Sql::new( + pool.clone(), + strat.world_address, + &HashMap::from([(strat.world_address, ContractType::WORLD)]), + ) + .await + .unwrap(); + let world_address = strat.world_address; let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( world, db, Arc::clone(&provider), - Processors { - event: generate_event_processors_map(vec![ - Box::new(RegisterModelProcessor), - Box::new(StoreSetRecordProcessor), - Box::new(StoreDelRecordProcessor), - ]) - .unwrap(), - ..Processors::default() - }, + Processors { ..Processors::default() }, EngineConfig::default(), shutdown_tx, None, - HashMap::new(), + Arc::new(HashMap::from([(world_address, ContractType::WORLD)])), ); let to = account.provider().block_hash_and_number().await?.block_number; diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index ccb7dbf114..efe9a7fd47 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -16,6 +16,7 @@ mod tests { use tokio::sync::mpsc; use torii_core::sql::utils::felts_to_sql_string; use torii_core::sql::Sql; + use torii_core::types::ContractType; use crate::tests::{model_fixtures, run_graphql_subscription}; use crate::utils; @@ -23,7 +24,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); model_fixtures(&mut db).await; // 0. Preprocess expected entity value @@ -158,7 +162,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription_with_id(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); model_fixtures(&mut db).await; // 0. Preprocess expected entity value @@ -273,7 +280,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); // 0. Preprocess model value let namespace = "types_test".to_string(); let model_name = "Subrecord".to_string(); @@ -338,7 +348,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription_with_id(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); // 0. Preprocess model value let namespace = "types_test".to_string(); let model_name = "Subrecord".to_string(); @@ -404,7 +417,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_event_emitted(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); let block_timestamp: u64 = 1710754478_u64; let (tx, mut rx) = mpsc::channel(7); tokio::spawn(async move { diff --git a/crates/torii/graphql/src/utils.rs b/crates/torii/graphql/src/utils.rs index 8f49990d4a..949e3b9711 100644 --- a/crates/torii/graphql/src/utils.rs +++ b/crates/torii/graphql/src/utils.rs @@ -1,5 +1,8 @@ +use std::str::FromStr; + use async_graphql::{Result, Value}; use convert_case::{Case, Casing}; +use starknet_crypto::Felt; use crate::error::ExtractError; use crate::types::ValueMapping; @@ -28,6 +31,18 @@ impl ExtractFromIndexMap for String { } } +impl ExtractFromIndexMap for Felt { + fn extract(indexmap: &ValueMapping, input: &str) -> Result { + let value = indexmap.get(input).ok_or_else(|| ExtractError::NotFound(input.to_string()))?; + match value { + Value::String(s) => { + Ok(Felt::from_str(s).map_err(|_| ExtractError::NotFelt(input.to_string()))?) + } + _ => Err(ExtractError::NotString(input.to_string())), + } + } +} + impl ExtractFromIndexMap for Vec { fn extract(indexmap: &ValueMapping, input: &str) -> Result { let value = indexmap.get(input).ok_or_else(|| ExtractError::NotFound(input.to_string()))?; diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index 971bcfd933..c84e23775a 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -21,10 +21,8 @@ use starknet::providers::{JsonRpcClient, Provider}; use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast; use torii_core::engine::{Engine, EngineConfig, Processors}; -use torii_core::processors::generate_event_processors_map; -use torii_core::processors::register_model::RegisterModelProcessor; -use torii_core::processors::store_set_record::StoreSetRecordProcessor; use torii_core::sql::Sql; +use torii_core::types::ContractType; use crate::proto::types::KeysClause; use crate::server::DojoWorld; @@ -93,25 +91,25 @@ async fn test_entities_queries(sequencer: &RunnerCtx) { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); - let db = Sql::new(pool.clone(), strat.world_address, &HashMap::new()).await.unwrap(); + let db = Sql::new( + pool.clone(), + strat.world_address, + &HashMap::from([(strat.world_address, ContractType::WORLD)]), + ) + .await + .unwrap(); + let world_address = strat.world_address; let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( world_reader, db.clone(), Arc::clone(&provider), - Processors { - event: generate_event_processors_map(vec![ - Box::new(RegisterModelProcessor), - Box::new(StoreSetRecordProcessor), - ]) - .unwrap(), - ..Processors::default() - }, + Processors { ..Processors::default() }, EngineConfig::default(), shutdown_tx, None, - HashMap::new(), + Arc::new(HashMap::from([(world_address, ContractType::WORLD)])), ); let to = provider.block_hash_and_number().await.unwrap().block_number; diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index dec12e23c6..6aa1ba9858 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -538,6 +538,7 @@ mod test { use tokio::select; use tokio::time::sleep; use torii_core::sql::Sql; + use torii_core::types::ContractType; use crate::server::Relay; use crate::typed_data::{Domain, Field, SimpleField, TypedData}; @@ -560,7 +561,10 @@ mod test { let account = sequencer.account_data(0); - let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await?; + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); // Register the model of our Message db.register_model( diff --git a/crates/torii/types-test/Scarb.lock b/crates/torii/types-test/Scarb.lock index ae6ec350d2..8720cc6b90 100644 --- a/crates/torii/types-test/Scarb.lock +++ b/crates/torii/types-test/Scarb.lock @@ -4,14 +4,6 @@ version = 1 [[package]] name = "dojo" version = "1.0.0-alpha.4" -dependencies = [ - "dojo_plugin", -] - -[[package]] -name = "dojo_plugin" -version = "1.0.0-alpha.4" -source = "git+https://github.com/dojoengine/dojo?rev=f15def33#f15def330c0d099e79351d11c197f63e8cc1ff36" [[package]] name = "types_test" diff --git a/scripts/compare-torii-data.py b/scripts/compare-torii-data.py new file mode 100644 index 0000000000..04798185cd --- /dev/null +++ b/scripts/compare-torii-data.py @@ -0,0 +1,87 @@ +# This script compares data across 'events', 'entities', and 'transactions' tables between two SQLite databases. +# Helpful to make sure any changes made in torii doesn't affect the resulting data. + +import sqlite3 +import argparse + +def fetch_table_data(db_path, table_name, columns): + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute(f"SELECT {', '.join(columns)} FROM {table_name}") + data = cursor.fetchall() + conn.close() + return {row[0]: row[1:] for row in data} + +def get_table_row_count(db_path, table_name): + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + count = cursor.fetchone()[0] + conn.close() + return count + +def compare_data(data1, data2, table_name): + differences_found = False + for id, values in data1.items(): + if id in data2: + if values != data2[id]: + print(f"Mismatch found in {table_name} for ID {id}:") + print(f" Database 1: {values}") + print(f" Database 2: {data2[id]}") + differences_found = True + else: + print(f"ID {id} found in {table_name} of Database 1 but not in Database 2") + differences_found = True + + for id in data2: + if id not in data1: + print(f"ID {id} found in {table_name} of Database 2 but not in Database 1") + differences_found = True + + if not differences_found: + print(f"No differences found in {table_name}") + +def compare_databases(db_path1, db_path2): + # Columns to compare, ignoring time-dependent and event_id columns + events_columns = ["id", "keys", "data", "transaction_hash"] + entities_columns = ["id", "keys"] + transactions_columns = ["id", "transaction_hash", "sender_address", "calldata", "max_fee", "signature", "nonce", "transaction_type"] + + # Fetch data from both databases + events_data_db1 = fetch_table_data(db_path1, "events", events_columns) + events_data_db2 = fetch_table_data(db_path2, "events", events_columns) + entities_data_db1 = fetch_table_data(db_path1, "entities", entities_columns) + entities_data_db2 = fetch_table_data(db_path2, "entities", entities_columns) + transactions_data_db1 = fetch_table_data(db_path1, "transactions", transactions_columns) + transactions_data_db2 = fetch_table_data(db_path2, "transactions", transactions_columns) + + # Get row counts from both databases + events_count_db1 = get_table_row_count(db_path1, "events") + events_count_db2 = get_table_row_count(db_path2, "events") + entities_count_db1 = get_table_row_count(db_path1, "entities") + entities_count_db2 = get_table_row_count(db_path2, "entities") + transactions_count_db1 = get_table_row_count(db_path1, "transactions") + transactions_count_db2 = get_table_row_count(db_path2, "transactions") + + # Print row counts + print(f"Number of rows in events table: Database 1 = {events_count_db1}, Database 2 = {events_count_db2}") + print(f"Number of rows in entities table: Database 1 = {entities_count_db1}, Database 2 = {entities_count_db2}") + print(f"Number of rows in transactions table: Database 1 = {transactions_count_db1}, Database 2 = {transactions_count_db2}") + + # Compare data + print("\nComparing events table:") + compare_data(events_data_db1, events_data_db2, "events") + + print("\nComparing entities table:") + compare_data(entities_data_db1, entities_data_db2, "entities") + + print("\nComparing transactions table:") + compare_data(transactions_data_db1, transactions_data_db2, "transactions") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Compare data in events, entities, and transactions tables between two SQLite databases.") + parser.add_argument("db_path1", help="Path to the first SQLite database") + parser.add_argument("db_path2", help="Path to the second SQLite database") + args = parser.parse_args() + + compare_databases(args.db_path1, args.db_path2) diff --git a/spawn-and-move-db.tar.gz b/spawn-and-move-db.tar.gz index ac6788ec41..2dfed8d7fd 100644 Binary files a/spawn-and-move-db.tar.gz and b/spawn-and-move-db.tar.gz differ diff --git a/types-test-db.tar.gz b/types-test-db.tar.gz index b5a45bd0d7..22468eb3ce 100644 Binary files a/types-test-db.tar.gz and b/types-test-db.tar.gz differ