diff --git a/Cargo.lock b/Cargo.lock index 5611c8de0e..6fe3bb40e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "alloy" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-contract 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -209,13 +209,14 @@ dependencies = [ [[package]] name = "alloy-consensus" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-eips 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-primitives", "alloy-rlp", "alloy-serde 0.3.6 (git+https://github.com/alloy-rs/alloy)", "c-kzg", + "derive_more 1.0.0", "serde", ] @@ -242,7 +243,7 @@ dependencies = [ [[package]] name = "alloy-contract" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -330,7 +331,7 @@ dependencies = [ [[package]] name = "alloy-eips" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-eip2930", "alloy-eip7702", @@ -347,7 +348,7 @@ dependencies = [ [[package]] name = "alloy-genesis" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "alloy-serde 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -383,7 +384,7 @@ dependencies = [ [[package]] name = "alloy-json-rpc" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -417,7 +418,7 @@ dependencies = [ [[package]] name = "alloy-network" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-eips 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -449,7 +450,7 @@ dependencies = [ [[package]] name = "alloy-network-primitives" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-eips 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-primitives", @@ -460,7 +461,7 @@ dependencies = [ [[package]] name = "alloy-node-bindings" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-genesis", "alloy-primitives", @@ -532,7 +533,7 @@ dependencies = [ [[package]] name = "alloy-provider" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-chains", "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -611,7 +612,7 @@ dependencies = [ [[package]] name = "alloy-rpc-client" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-json-rpc 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-transport 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -631,7 +632,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types-anvil" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "alloy-serde 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -662,7 +663,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-eips 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -693,7 +694,7 @@ dependencies = [ [[package]] name = "alloy-serde" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "serde", @@ -717,7 +718,7 @@ dependencies = [ [[package]] name = "alloy-signer" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-primitives", "async-trait", @@ -730,7 +731,7 @@ dependencies = [ [[package]] name = "alloy-signer-local" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-consensus 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-network 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -837,7 +838,7 @@ dependencies = [ [[package]] name = "alloy-transport" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-json-rpc 0.3.6 (git+https://github.com/alloy-rs/alloy)", "base64 0.22.1", @@ -870,7 +871,7 @@ dependencies = [ [[package]] name = "alloy-transport-http" version = "0.3.6" -source = "git+https://github.com/alloy-rs/alloy#57dd4c538293421c3d1a793cba79ad6f46d6444f" +source = "git+https://github.com/alloy-rs/alloy#04b1e0984b8661ef910b0ae88e1ef218db66b636" dependencies = [ "alloy-json-rpc 0.3.6 (git+https://github.com/alloy-rs/alloy)", "alloy-transport 0.3.6 (git+https://github.com/alloy-rs/alloy)", @@ -12856,9 +12857,9 @@ dependencies = [ [[package]] name = "simdutf8" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" [[package]] name = "similar" @@ -14872,6 +14873,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "toml 0.8.19", "tracing", ] diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index cf568429ce..ac9c510997 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -11,11 +11,14 @@ //! for more info. use std::cmp; +use std::collections::VecDeque; use std::net::SocketAddr; +use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use anyhow::Context; use clap::{ArgAction, Parser}; use dojo_metrics::{metrics_process, prometheus_exporter}; use dojo_utils::parse::{parse_socket_address, parse_url}; @@ -31,18 +34,10 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors}; -use torii_core::processors::event_message::EventMessageProcessor; -use torii_core::processors::generate_event_processors_map; -use torii_core::processors::metadata_update::MetadataUpdateProcessor; -use torii_core::processors::register_model::RegisterModelProcessor; -use torii_core::processors::store_del_record::StoreDelRecordProcessor; -use torii_core::processors::store_set_record::StoreSetRecordProcessor; use torii_core::processors::store_transaction::StoreTransactionProcessor; -use torii_core::processors::store_update_member::StoreUpdateMemberProcessor; -use torii_core::processors::store_update_record::StoreUpdateRecordProcessor; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::Sql; -use torii_core::types::Model; +use torii_core::types::{Contract, ContractType, Model, ToriiConfig}; use torii_server::proxy::Proxy; use tracing::{error, info}; use tracing_subscriber::{fmt, EnvFilter}; @@ -56,7 +51,7 @@ pub(crate) const LOG_TARGET: &str = "torii::cli"; struct Args { /// The world to index #[arg(short, long = "world", env = "DOJO_WORLD_ADDRESS")] - world_address: Felt, + world_address: Option, /// The sequencer rpc endpoint to index. #[arg(long, value_name = "URL", default_value = ":5050", value_parser = parse_url)] @@ -67,10 +62,6 @@ struct Args { #[arg(short, long, default_value = ":memory:")] database: String, - /// Specify a block to start indexing from, ignored if stored head exists - #[arg(short, long, default_value = "0")] - start_block: u64, - /// Address to serve api endpoints at. #[arg(long, value_name = "SOCKET", default_value = "0.0.0.0:8080", value_parser = parse_socket_address)] addr: SocketAddr, @@ -140,11 +131,35 @@ struct Args { /// Whether or not to index raw events #[arg(long, action = ArgAction::Set, default_value_t = true)] index_raw_events: bool, + + /// ERC contract addresses to index + #[arg(long, value_parser = parse_erc_contracts)] + #[arg(conflicts_with = "config")] + contracts: Option>, + + /// Configuration file + #[arg(long)] + config: Option, } #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); + + let mut config = if let Some(path) = args.config { + ToriiConfig::load_from_path(&path)? + } else { + let mut config = ToriiConfig::default(); + + if let Some(contracts) = args.contracts { + config.contracts = VecDeque::from(contracts); + } + + config + }; + + let world_address = verify_single_world_address(args.world_address, &mut config)?; + let filter_layer = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info,hyper_reverse_proxy=off")); @@ -183,20 +198,14 @@ async fn main() -> anyhow::Result<()> { let provider: Arc<_> = JsonRpcClient::new(HttpTransport::new(args.rpc)).into(); // Get world address - let world = WorldContractReader::new(args.world_address, provider.clone()); + let world = WorldContractReader::new(world_address, provider.clone()); - let db = Sql::new(pool.clone(), args.world_address).await?; + let contracts = + config.contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(); + + let db = Sql::new(pool.clone(), world_address, &contracts).await?; let processors = Processors { - event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), - Arc::new(MetadataUpdateProcessor), - Arc::new(StoreDelRecordProcessor), - Arc::new(EventMessageProcessor), - Arc::new(StoreUpdateRecordProcessor), - Arc::new(StoreUpdateMemberProcessor), - ])?, transaction: vec![Box::new(StoreTransactionProcessor)], ..Processors::default() }; @@ -218,7 +227,7 @@ async fn main() -> anyhow::Result<()> { processors, EngineConfig { max_concurrent_tasks: args.max_concurrent_tasks, - start_block: args.start_block, + start_block: 0, events_chunk_size: args.events_chunk_size, index_pending: args.index_pending, polling_interval: Duration::from_millis(args.polling_interval), @@ -226,17 +235,13 @@ async fn main() -> anyhow::Result<()> { }, shutdown_tx.clone(), Some(block_tx), + Arc::new(contracts), ); let shutdown_rx = shutdown_tx.subscribe(); - let (grpc_addr, grpc_server) = torii_grpc::server::new( - shutdown_rx, - &pool, - block_rx, - args.world_address, - Arc::clone(&provider), - ) - .await?; + let (grpc_addr, grpc_server) = + torii_grpc::server::new(shutdown_rx, &pool, block_rx, world_address, Arc::clone(&provider)) + .await?; let mut libp2p_relay_server = torii_relay::server::Relay::new( db, @@ -299,6 +304,26 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +// Verifies that the world address is defined at most once +// and returns the world address +fn verify_single_world_address( + world_address: Option, + config: &mut ToriiConfig, +) -> anyhow::Result { + let world_from_config = + config.contracts.iter().find(|c| c.r#type == ContractType::WORLD).map(|c| c.address); + + match (world_address, world_from_config) { + (Some(_), Some(_)) => Err(anyhow::anyhow!("World address specified multiple times")), + (Some(addr), _) => { + config.contracts.push_front(Contract { address: addr, r#type: ContractType::WORLD }); + Ok(addr) + } + (_, Some(addr)) => Ok(addr), + (None, None) => Err(anyhow::anyhow!("World address not specified")), + } +} + async fn spawn_rebuilding_graphql_server( shutdown_tx: Sender<()>, pool: Arc, @@ -322,3 +347,29 @@ async fn spawn_rebuilding_graphql_server( } } } + +// Parses clap cli argument which is expected to be in the format: +// - erc_type:address:start_block +// - address:start_block (erc_type defaults to ERC20) +fn parse_erc_contracts(s: &str) -> anyhow::Result> { + let parts: Vec<&str> = s.split(',').collect(); + let mut contracts = Vec::new(); + for part in parts { + match part.split(':').collect::>().as_slice() { + [r#type, address] => { + let r#type = r#type.parse::()?; + let address = Felt::from_str(address) + .with_context(|| format!("Expected address, found {}", address))?; + contracts.push(Contract { address, r#type }); + } + [address] => { + let r#type = ContractType::WORLD; + let address = Felt::from_str(address) + .with_context(|| format!("Expected address, found {}", address))?; + contracts.push(Contract { address, r#type }); + } + _ => return Err(anyhow::anyhow!("Invalid contract format")), + } + } + Ok(contracts) +} diff --git a/bin/torii/torii.toml b/bin/torii/torii.toml new file mode 100644 index 0000000000..a45ecfe10e --- /dev/null +++ b/bin/torii/torii.toml @@ -0,0 +1,8 @@ +# Example configuration file for Torii +# contracts = [ +# { type = "WORLD", address = "" }, +# { type = "ERC20", address = "" }, +# { type = "ERC20Legacy", address = "" }, +# { type = "ERC721", address = "" }, +# { type = "ERC721Legacy", address = "" }, +# ] \ No newline at end of file diff --git a/crates/dojo-lang/src/manifest_test_data/compiler_cairo/Scarb.lock b/crates/dojo-lang/src/manifest_test_data/compiler_cairo/Scarb.lock index c6cfeb4783..16e168040a 100644 --- a/crates/dojo-lang/src/manifest_test_data/compiler_cairo/Scarb.lock +++ b/crates/dojo-lang/src/manifest_test_data/compiler_cairo/Scarb.lock @@ -11,11 +11,3 @@ dependencies = [ [[package]] name = "dojo" version = "1.0.0-alpha.4" -dependencies = [ - "dojo_plugin", -] - -[[package]] -name = "dojo_plugin" -version = "1.0.0-alpha.4" -source = "git+https://github.com/dojoengine/dojo?rev=f15def33#f15def330c0d099e79351d11c197f63e8cc1ff36" diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index d21c4c06b0..94c2c599e2 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -38,6 +38,7 @@ thiserror.workspace = true tokio = { version = "1.32.0", features = [ "sync" ], default-features = true } tokio-stream = "0.1.11" tokio-util = "0.7.7" +toml.workspace = true tracing.workspace = true clap.workspace = true bitflags = "2.6.0" diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index f24180ad44..0a0476e6cb 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt::Debug; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; @@ -7,13 +7,16 @@ use std::time::Duration; use anyhow::Result; use bitflags::bitflags; use dojo_world::contracts::world::WorldContractReader; +use futures_util::future::join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ - BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, - MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, Transaction, - TransactionReceipt, TransactionReceiptWithBlockInfo, TransactionWithReceipt, + BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, MaybePendingBlockWithReceipts, + MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, Transaction, TransactionReceipt, + TransactionWithReceipt, }; +use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; +use starknet_crypto::Felt; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; @@ -21,29 +24,94 @@ use tokio::task::JoinSet; use tokio::time::sleep; use tracing::{debug, error, info, trace, warn}; +use crate::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor; +use crate::processors::erc20_transfer::Erc20TransferProcessor; +use crate::processors::erc721_legacy_transfer::Erc721LegacyTransferProcessor; +use crate::processors::erc721_transfer::Erc721TransferProcessor; use crate::processors::event_message::EventMessageProcessor; +use crate::processors::metadata_update::MetadataUpdateProcessor; +use crate::processors::register_model::RegisterModelProcessor; +use crate::processors::store_del_record::StoreDelRecordProcessor; +use crate::processors::store_set_record::StoreSetRecordProcessor; +use crate::processors::store_update_member::StoreUpdateMemberProcessor; +use crate::processors::store_update_record::StoreUpdateRecordProcessor; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; -use crate::sql::Sql; +use crate::sql::{Cursors, Sql}; +use crate::types::ContractType; #[allow(missing_debug_implementations)] pub struct Processors { pub block: Vec>>, pub transaction: Vec>>, - pub event: HashMap>>, pub catch_all_event: Box>, + pub event_processors: HashMap>>>, } impl Default for Processors

{ fn default() -> Self { Self { block: vec![], - event: HashMap::new(), transaction: vec![], catch_all_event: Box::new(EventMessageProcessor) as Box>, + event_processors: Self::initialize_event_processors(), } } } +impl Processors

{ + pub fn initialize_event_processors() + -> HashMap>>> { + let mut event_processors_map = + HashMap::>>>::new(); + + let event_processors = vec![ + ( + ContractType::WORLD, + vec![ + Box::new(RegisterModelProcessor) as Box>, + Box::new(StoreSetRecordProcessor), + Box::new(MetadataUpdateProcessor), + Box::new(StoreDelRecordProcessor), + Box::new(StoreUpdateRecordProcessor), + Box::new(StoreUpdateMemberProcessor), + ], + ), + ( + ContractType::ERC20, + vec![Box::new(Erc20TransferProcessor) as Box>], + ), + ( + ContractType::ERC721, + vec![Box::new(Erc721TransferProcessor) as Box>], + ), + ( + ContractType::ERC20Legacy, + vec![Box::new(Erc20LegacyTransferProcessor) as Box>], + ), + ( + ContractType::ERC721Legacy, + vec![Box::new(Erc721LegacyTransferProcessor) as Box>], + ), + ]; + + for (contract_type, processors) in event_processors { + for processor in processors { + let key = get_selector_from_name(processor.event_key().as_str()) + .expect("Event key is ASCII so this should never fail"); + event_processors_map.entry(contract_type).or_default().insert(key, processor); + } + } + + event_processors_map + } + + pub fn get_event_processor( + &self, + contract_type: ContractType, + ) -> &HashMap>> { + self.event_processors.get(&contract_type).unwrap() + } +} pub(crate) const LOG_TARGET: &str = "torii_core::engine"; pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000; @@ -88,6 +156,7 @@ pub enum FetchDataResult { #[derive(Debug)] pub struct FetchRangeResult { // (block_number, transaction_hash) -> events + // NOTE: LinkedList might contains blocks in different order pub transactions: LinkedHashMap<(u64, Felt), Vec>, pub blocks: BTreeMap, pub latest_block_number: u64, @@ -112,12 +181,13 @@ pub struct ParallelizedEvent { pub struct Engine { world: Arc>, db: Sql, - provider: Box

, + provider: Arc

, processors: Arc>, config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tasks: HashMap>, + tasks: HashMap>, + contracts: Arc>, } struct UnprocessedEvent { @@ -126,6 +196,7 @@ struct UnprocessedEvent { } impl Engine

{ + #[allow(clippy::too_many_arguments)] pub fn new( world: WorldContractReader

, db: Sql, @@ -134,24 +205,26 @@ impl Engine

{ config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, + contracts: Arc>, ) -> Self { Self { world: Arc::new(world), db, - provider: Box::new(provider), + provider: Arc::new(provider), processors: Arc::new(processors), config, shutdown_tx, block_tx, + contracts, tasks: HashMap::new(), } } pub async fn start(&mut self) -> Result<()> { // use the start block provided by user if head is 0 - let (head, _, _) = self.db.head().await?; + let (head, _, _) = self.db.head(self.world.address).await?; if head == 0 { - self.db.set_head(self.config.start_block); + self.db.set_head(self.world.address, self.config.start_block); } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -163,12 +236,12 @@ impl Engine

{ let mut erroring_out = false; loop { - let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?; + let cursors = self.db.cursors().await?; tokio::select! { _ = shutdown_rx.recv() => { break Ok(()); } - res = self.fetch_data(head, last_pending_block_world_tx, last_pending_block_tx) => { + res = self.fetch_data(&cursors) => { match res { Ok(fetch_result) => { if erroring_out { @@ -204,22 +277,18 @@ impl Engine

{ } } - pub async fn fetch_data( - &mut self, - from: u64, - last_pending_block_world_tx: Option, - last_pending_block_tx: Option, - ) -> Result { + pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result { let latest_block_number = self.provider.block_hash_and_number().await?.block_number; + let from = cursors.head.unwrap_or(0); let result = if from < latest_block_number { let from = if from == 0 { from } else { from + 1 }; debug!(target: LOG_TARGET, from = %from, to = %latest_block_number, "Fetching data for range."); - let data = - self.fetch_range(from, latest_block_number, last_pending_block_world_tx).await?; + let data = self.fetch_range(from, latest_block_number, &cursors.cursor_map).await?; FetchDataResult::Range(data) } else if self.config.index_pending { - let data = self.fetch_pending(latest_block_number + 1, last_pending_block_tx).await?; + let data = + self.fetch_pending(latest_block_number + 1, cursors.last_pending_block_tx).await?; if let Some(data) = data { FetchDataResult::Pending(data) } else { @@ -236,98 +305,107 @@ impl Engine

{ &mut self, from: u64, to: u64, - last_pending_block_world_tx: Option, + cursor_map: &HashMap, ) -> Result { // Process all blocks from current to latest. - let get_events = |token: Option| { - self.provider.get_events( - EventFilter { - from_block: Some(BlockId::Number(from)), - to_block: Some(BlockId::Number(to)), - address: Some(self.world.address), - keys: None, - }, - token, - self.config.events_chunk_size, - ) - }; + let mut fetch_all_events_tasks = VecDeque::new(); + + for contract in self.contracts.iter() { + let events_filter = EventFilter { + from_block: Some(BlockId::Number(from)), + to_block: Some(BlockId::Number(to)), + address: Some(*contract.0), + keys: None, + }; + let token_events_pages = + get_all_events(&self.provider, events_filter, self.config.events_chunk_size); + + // Prefer processing world events first + match contract.1 { + ContractType::WORLD => fetch_all_events_tasks.push_front(token_events_pages), + _ => fetch_all_events_tasks.push_back(token_events_pages), + } + } + + let task_result = join_all(fetch_all_events_tasks).await; + + let mut events = vec![]; - // handle next events pages - let mut events_pages = vec![get_events(None).await?]; + for result in task_result { + let result = result?; + let contract_address = + result.0.expect("EventFilters that we use always have an address"); + let events_pages = result.1; + let last_contract_tx = cursor_map.get(&contract_address).cloned(); + let mut last_contract_tx_tmp = last_contract_tx; - while let Some(token) = &events_pages.last().unwrap().continuation_token { - debug!(target: LOG_TARGET, "Fetching events page with continuation token: {}", &token); - events_pages.push(get_events(Some(token.clone())).await?); + debug!(target: LOG_TARGET, "Total events pages fetched for contract ({:#x}): {}", &contract_address, &events_pages.len()); + + for events_page in events_pages { + debug!("Processing events page with events: {}", &events_page.events.len()); + for event in events_page.events { + // Then we skip all transactions until we reach the last pending processed + // transaction (if any) + if let Some(last_contract_tx) = last_contract_tx_tmp { + if event.transaction_hash != last_contract_tx { + continue; + } + + last_contract_tx_tmp = None; + } + + // Skip the latest pending block transaction events + // * as we might have multiple events for the same transaction + if let Some(last_contract_tx) = last_contract_tx { + if event.transaction_hash == last_contract_tx { + continue; + } + } + + events.push(event); + } + } } - debug!(target: LOG_TARGET, "Total events pages fetched: {}", &events_pages.len()); // Transactions & blocks to process - let mut last_block = 0_u64; let mut blocks = BTreeMap::new(); // Flatten events pages and events according to the pending block cursor // to array of (block_number, transaction_hash) - let mut last_pending_block_world_tx_cursor = last_pending_block_world_tx; let mut transactions = LinkedHashMap::new(); - for events_page in events_pages { - debug!("Processing events page with events: {}", &events_page.events.len()); - for event in events_page.events { - let block_number = match event.block_number { - Some(block_number) => block_number, - // If the block number is not present, try to fetch it from the transaction - // receipt Should not/rarely happen. Thus the additional - // fetch is acceptable. - None => { - let TransactionReceiptWithBlockInfo { receipt, block } = - self.provider.get_transaction_receipt(event.transaction_hash).await?; - - match receipt { - TransactionReceipt::Invoke(_) | TransactionReceipt::L1Handler(_) => { - if let ReceiptBlock::Block { block_number, .. } = block { - block_number - } else { - // If the block is pending, we assume the block number is the - // latest + 1 - to + 1 - } - } - - _ => to + 1, - } - } - }; - // Keep track of last block number and fetch block timestamp - if block_number > last_block { - let block_timestamp = self.get_block_timestamp(block_number).await?; - blocks.insert(block_number, block_timestamp); + let mut block_set = HashSet::new(); + for event in events { + let block_number = match event.block_number { + Some(block_number) => block_number, + None => unreachable!("In fetch range all events should have block number"), + }; - last_block = block_number; - } + block_set.insert(block_number); - // Then we skip all transactions until we reach the last pending processed - // transaction (if any) - if let Some(tx) = last_pending_block_world_tx_cursor { - if event.transaction_hash != tx { - continue; - } + transactions + .entry((block_number, event.transaction_hash)) + .or_insert(vec![]) + .push(event); + } - last_pending_block_world_tx_cursor = None; - } + let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); + let mut set: JoinSet> = JoinSet::new(); - // Skip the latest pending block transaction events - // * as we might have multiple events for the same transaction - if let Some(tx) = last_pending_block_world_tx { - if event.transaction_hash == tx { - continue; - } - } + for block_number in block_set { + let semaphore = semaphore.clone(); + let provider = self.provider.clone(); + set.spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + debug!("Fetching block timestamp for block number: {}", block_number); + let block_timestamp = get_block_timestamp(&provider, block_number).await?; + Ok((block_number, block_timestamp)) + }); + } - transactions - .entry((block_number, event.transaction_hash)) - .or_insert(vec![]) - .push(event); - } + while let Some(result) = set.join_next().await { + let (block_number, block_timestamp) = result??; + blocks.insert(block_number, block_timestamp); } debug!("Transactions: {}", &transactions.len()); @@ -379,10 +457,10 @@ impl Engine

{ let mut last_pending_block_tx_cursor = data.last_pending_block_tx; let mut last_pending_block_tx = data.last_pending_block_tx; - let mut last_pending_block_world_tx = None; let timestamp = data.pending_block.timestamp; + let mut cursor_map = HashMap::new(); for t in data.pending_block.transactions { let transaction_hash = t.transaction.transaction_hash(); if let Some(tx) = last_pending_block_tx_cursor { @@ -394,58 +472,23 @@ impl Engine

{ continue; } - match self.process_transaction_with_receipt(&t, data.block_number, timestamp).await { - Err(e) => { - match e.to_string().as_str() { - "TransactionHashNotFound" => { - // We failed to fetch the transaction, which is because - // the transaction might not have been processed fast enough by the - // provider. So we can fail silently and try - // again in the next iteration. - warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(data.block_number - 1); - if let Some(tx) = last_pending_block_tx { - self.db.set_last_pending_block_tx(Some(tx)); - } - - if let Some(tx) = last_pending_block_world_tx { - self.db.set_last_pending_block_world_tx(Some(tx)); - } - self.db.execute().await?; - return Ok(()); - } - _ => { - error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction_hash), "Processing pending transaction."); - return Err(e); - } - } - } - Ok(true) => { - last_pending_block_world_tx = Some(*transaction_hash); - last_pending_block_tx = Some(*transaction_hash); - info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction."); - } - Ok(_) => { - last_pending_block_tx = Some(*transaction_hash); - debug!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.") - } + if let Err(e) = self + .process_transaction_with_receipt(&t, data.block_number, timestamp, &mut cursor_map) + .await + { + error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction_hash), "Processing pending transaction."); + return Err(e); } + + last_pending_block_tx = Some(*transaction_hash); + debug!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction."); } // Process parallelized events self.process_tasks().await?; - // Set the head to the last processed pending transaction // Head block number should still be latest block number - self.db.set_head(data.block_number - 1); - - if let Some(tx) = last_pending_block_tx { - self.db.set_last_pending_block_tx(Some(tx)); - } - - if let Some(tx) = last_pending_block_world_tx { - self.db.set_last_pending_block_world_tx(Some(tx)); - } + self.db.update_cursors(data.block_number - 1, last_pending_block_tx, cursor_map); self.db.execute().await?; @@ -454,7 +497,7 @@ impl Engine

{ pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> { // Process all transactions - let mut last_block = 0; + let mut processed_blocks = HashSet::new(); for ((block_number, transaction_hash), events) in data.transactions { debug!("Processing transaction hash: {:#x}", transaction_hash); // Process transaction @@ -474,13 +517,13 @@ impl Engine

{ .await?; // Process block - if block_number > last_block { + if !processed_blocks.contains(&block_number) { if let Some(ref block_tx) = self.block_tx { block_tx.send(block_number).await?; } self.process_block(block_number, data.blocks[&block_number]).await?; - last_block = block_number; + processed_blocks.insert(block_number); } if self.db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { @@ -491,9 +534,7 @@ impl Engine

{ // Process parallelized events self.process_tasks().await?; - self.db.set_head(data.latest_block_number); - self.db.set_last_pending_block_world_tx(None); - self.db.set_last_pending_block_tx(None); + self.db.reset_cursors(data.latest_block_number); self.db.execute().await?; @@ -509,14 +550,15 @@ impl Engine

{ for (task_id, events) in self.tasks.drain() { let db = self.db.clone(); let world = self.world.clone(); - let processors = self.processors.clone(); let semaphore = semaphore.clone(); + let processors = self.processors.clone(); set.spawn(async move { let _permit = semaphore.acquire().await.unwrap(); let mut local_db = db.clone(); - for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events { - if let Some(processor) = processors.event.get(&event.keys[0]) { + for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events { + let contract_processors = processors.get_event_processor(contract_type); + if let Some(processor) = contract_processors.get(&event.keys[0]) { debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); if let Err(e) = processor @@ -540,13 +582,6 @@ impl Engine

{ Ok(()) } - async fn get_block_timestamp(&self, block_number: u64) -> Result { - match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { - MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), - MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), - } - } - async fn process_transaction_with_events( &mut self, transaction_hash: Felt, @@ -555,6 +590,7 @@ impl Engine

{ block_timestamp: u64, transaction: Option, ) -> Result<()> { + // Contract -> Cursor for (event_idx, event) in events.iter().enumerate() { let event_id = format!("{:#064x}:{:#x}:{:#04x}", block_number, transaction_hash, event_idx); @@ -564,6 +600,11 @@ impl Engine

{ keys: event.keys.clone(), data: event.data.clone(), }; + + let Some(&contract_type) = self.contracts.get(&event.from_address) else { + continue; + }; + Self::process_event( self, block_number, @@ -571,6 +612,7 @@ impl Engine

{ &event_id, &event, transaction_hash, + contract_type, ) .await?; } @@ -596,7 +638,8 @@ impl Engine

{ transaction_with_receipt: &TransactionWithReceipt, block_number: u64, block_timestamp: u64, - ) -> Result { + cursor_map: &mut HashMap, + ) -> Result<()> { let transaction_hash = transaction_with_receipt.transaction.transaction_hash(); let events = match &transaction_with_receipt.receipt { TransactionReceipt::Invoke(receipt) => Some(&receipt.events), @@ -604,14 +647,13 @@ impl Engine

{ _ => None, }; - let mut world_event = false; if let Some(events) = events { for (event_idx, event) in events.iter().enumerate() { - if event.from_address != self.world.address { + let Some(&contract_type) = self.contracts.get(&event.from_address) else { continue; - } + }; - world_event = true; + cursor_map.insert(event.from_address, *transaction_hash); let event_id = format!("{:#064x}:{:#x}:{:#04x}", block_number, *transaction_hash, event_idx); @@ -622,11 +664,12 @@ impl Engine

{ &event_id, event, *transaction_hash, + contract_type, ) .await?; } - if world_event && self.config.flags.contains(IndexingFlags::TRANSACTIONS) { + if self.config.flags.contains(IndexingFlags::TRANSACTIONS) { Self::process_transaction( self, block_number, @@ -638,7 +681,7 @@ impl Engine

{ } } - Ok(world_event) + Ok(()) } async fn process_block(&mut self, block_number: u64, block_timestamp: u64) -> Result<()> { @@ -682,6 +725,7 @@ impl Engine

{ event_id: &str, event: &Event, transaction_hash: Felt, + contract_type: ContractType, ) -> Result<()> { if self.config.flags.contains(IndexingFlags::RAW_EVENTS) { self.db.store_event(event_id, event, transaction_hash, block_timestamp); @@ -689,7 +733,8 @@ impl Engine

{ let event_key = event.keys[0]; - let Some(processor) = self.processors.event.get(&event_key) else { + let processors = self.processors.get_event_processor(contract_type); + let Some(processor) = processors.get(&event_key) else { // if we dont have a processor for this event, we try the catch all processor if self.processors.catch_all_event.validate(event) { if let Err(e) = self @@ -736,22 +781,77 @@ impl Engine

{ // if we have a task identifier, we queue the event to be parallelized if task_identifier != 0 { - self.tasks.entry(task_identifier).or_default().push(ParallelizedEvent { - event_id: event_id.to_string(), - event: event.clone(), - block_number, - block_timestamp, - }); + self.tasks.entry(task_identifier).or_default().push(( + contract_type, + ParallelizedEvent { + event_id: event_id.to_string(), + event: event.clone(), + block_number, + block_timestamp, + }, + )); } else { // if we dont have a task identifier, we process the event immediately - if let Err(e) = processor - .process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + if processor.validate(event) { + if let Err(e) = processor + .process( + &self.world, + &mut self.db, + block_number, + block_timestamp, + event_id, + event, + ) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = ?e, "Processing event."); + } + } else { + warn!(target: LOG_TARGET, event_name = processor.event_key(), "Event not validated."); } } Ok(()) } } + +async fn get_all_events

( + provider: &P, + events_filter: EventFilter, + events_chunk_size: u64, +) -> Result<(Option, Vec)> +where + P: Provider + Sync, +{ + let mut events_pages = Vec::new(); + let mut continuation_token = None; + + loop { + debug!( + "Fetching events page with continuation token: {:?}, for contract: {:?}", + continuation_token, events_filter.address + ); + let events_page = provider + .get_events(events_filter.clone(), continuation_token.clone(), events_chunk_size) + .await?; + + continuation_token = events_page.continuation_token.clone(); + events_pages.push(events_page); + + if continuation_token.is_none() { + break; + } + } + + Ok((events_filter.address, events_pages)) +} + +async fn get_block_timestamp

(provider: &P, block_number: u64) -> Result +where + P: Provider + Sync, +{ + match provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { + MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), + MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), + } +} diff --git a/crates/torii/core/src/lib.rs b/crates/torii/core/src/lib.rs index df6e8b3adc..c415bec0f8 100644 --- a/crates/torii/core/src/lib.rs +++ b/crates/torii/core/src/lib.rs @@ -3,7 +3,6 @@ pub mod engine; pub mod error; pub mod model; pub mod processors; -pub mod query_queue; pub mod simple_broker; pub mod sql; pub mod types; diff --git a/crates/torii/core/src/processors/erc20_legacy_transfer.rs b/crates/torii/core/src/processors/erc20_legacy_transfer.rs new file mode 100644 index 0000000000..4cef0dc19d --- /dev/null +++ b/crates/torii/core/src/processors/erc20_legacy_transfer.rs @@ -0,0 +1,59 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::debug; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_legacy_transfer"; + +#[derive(Default, Debug)] +pub struct Erc20LegacyTransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc20LegacyTransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/1f9359219a92cdb1576f953db71ee993b8ef5f70/src/openzeppelin/token/erc20/library.cairo#L19-L21 + // key: [hash(Transfer)] + // data: [from, to, value.0, value.1] + if event.keys.len() == 1 && event.data.len() == 4 { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.data[0]; + let to = event.data[1]; + + let value = U256Cainome::cairo_deserialize(&event.data, 2)?; + let value = U256::from_words(value.low, value.high); + + db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) + .await?; + debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/erc20_transfer.rs b/crates/torii/core/src/processors/erc20_transfer.rs new file mode 100644 index 0000000000..10022d9eb0 --- /dev/null +++ b/crates/torii/core/src/processors/erc20_transfer.rs @@ -0,0 +1,59 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::debug; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_transfer"; + +#[derive(Default, Debug)] +pub struct Erc20TransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc20TransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/ba00ce76a93dcf25c081ab2698da20690b5a1cfb/packages/token/src/erc20/erc20.cairo#L38-L46 + // key: [hash(Transfer), from, to] + // data: [value.0, value.1] + if event.keys.len() == 3 && event.data.len() == 2 { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.keys[1]; + let to = event.keys[2]; + + let value = U256Cainome::cairo_deserialize(&event.data, 0)?; + let value = U256::from_words(value.low, value.high); + + db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) + .await?; + debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/erc721_legacy_transfer.rs b/crates/torii/core/src/processors/erc721_legacy_transfer.rs new file mode 100644 index 0000000000..89a88f04a3 --- /dev/null +++ b/crates/torii/core/src/processors/erc721_legacy_transfer.rs @@ -0,0 +1,66 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::debug; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_legacy_transfer"; + +#[derive(Default, Debug)] +pub struct Erc721LegacyTransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc721LegacyTransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/1f9359219a92cdb1576f953db71ee993b8ef5f70/src/openzeppelin/token/erc721/library.cairo#L27-L29 + // key: [hash(Transfer)] + // data: [from, to, token_id.0, token_id.1] + if event.keys.len() == 1 && event.data.len() == 4 { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.data[0]; + let to = event.data[1]; + + let token_id = U256Cainome::cairo_deserialize(&event.data, 2)?; + let token_id = U256::from_words(token_id.low, token_id.high); + + db.handle_erc721_transfer( + token_address, + from, + to, + token_id, + world.provider(), + block_timestamp, + ) + .await?; + debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/erc721_transfer.rs b/crates/torii/core/src/processors/erc721_transfer.rs new file mode 100644 index 0000000000..319ea81833 --- /dev/null +++ b/crates/torii/core/src/processors/erc721_transfer.rs @@ -0,0 +1,66 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::debug; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_transfer"; + +#[derive(Default, Debug)] +pub struct Erc721TransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc721TransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/ba00ce76a93dcf25c081ab2698da20690b5a1cfb/packages/token/src/erc721/erc721.cairo#L40-L49 + // key: [hash(Transfer), from, to, token_id.low, token_id.high] + // data: [] + if event.keys.len() == 5 && event.data.is_empty() { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.keys[1]; + let to = event.keys[2]; + + let token_id = U256Cainome::cairo_deserialize(&event.keys, 3)?; + let token_id = U256::from_words(token_id.low, token_id.high); + + db.handle_erc721_transfer( + token_address, + from, + to, + token_id, + world.provider(), + block_timestamp, + ) + .await?; + debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index c6a8f13af5..cf25f36ca6 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -1,15 +1,15 @@ -use std::collections::HashMap; -use std::sync::Arc; - use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, Felt, Transaction}; -use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; use crate::sql::Sql; +pub mod erc20_legacy_transfer; +pub mod erc20_transfer; +pub mod erc721_legacy_transfer; +pub mod erc721_transfer; pub mod event_message; pub mod metadata_update; pub mod register_model; @@ -73,17 +73,3 @@ pub trait TransactionProcessor: Send + Sync { transaction: &Transaction, ) -> Result<(), Error>; } - -/// Given a list of event processors, generate a map of event keys to the event processor -pub fn generate_event_processors_map( - event_processor: Vec>>, -) -> Result>>> { - let mut event_processors = HashMap::new(); - - for processor in event_processor { - let key = get_selector_from_name(processor.event_key().as_str())?; - event_processors.insert(key, processor); - } - - Ok(event_processors) -} diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index c5f70a2a54..fa1351b156 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -8,7 +8,8 @@ use tracing::info; use super::EventProcessor; use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX, NUM_KEYS_INDEX}; -use crate::sql::{felts_sql_string, Sql}; +use crate::sql::utils::felts_to_sql_string; +use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_set_record"; @@ -60,7 +61,7 @@ where let keys_end: usize = keys_start + event.data[NUM_KEYS_INDEX].to_usize().context("invalid usize")?; let keys = event.data[keys_start..keys_end].to_vec(); - let keys_str = felts_sql_string(&keys); + let keys_str = felts_to_sql_string(&keys); // keys_end is already the length of the values array. diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs new file mode 100644 index 0000000000..f856c2f21a --- /dev/null +++ b/crates/torii/core/src/sql/erc.rs @@ -0,0 +1,406 @@ +use std::ops::{Add, Sub}; + +use anyhow::Result; +use cainome::cairo_serde::{ByteArray, CairoSerde}; +use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall, U256}; +use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; +use starknet::providers::Provider; +use tracing::debug; + +use super::query_queue::{Argument, QueryType}; +use super::utils::{sql_string_to_u256, u256_to_sql_string}; +use super::{Sql, FELT_DELIMITER}; +use crate::sql::utils::{felt_and_u256_to_sql_string, felt_to_sql_string, felts_to_sql_string}; +use crate::utils::utc_dt_string_from_timestamp; + +impl Sql { + pub async fn handle_erc20_transfer( + &mut self, + contract_address: Felt, + from_address: Felt, + to_address: Felt, + amount: U256, + provider: &P, + block_timestamp: u64, + ) -> Result<()> { + // unique token identifier in DB + let token_id = felt_to_sql_string(&contract_address); + + let token_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tokens WHERE id = ?)") + .bind(token_id.clone()) + .fetch_one(&self.pool) + .await?; + + if !token_exists { + self.register_erc20_token_metadata(contract_address, &token_id, provider).await?; + } + + self.register_erc_transfer_event( + contract_address, + from_address, + to_address, + amount, + &token_id, + block_timestamp, + ); + + // Update balances in erc20_balance table + { + // NOTE: formatting here should match the format we use for Argument type in QueryQueue + // TODO: abstract this so they cannot mismatch + + // Since balance are stored as TEXT in db, we cannot directly use INSERT OR UPDATE + // statements. + // Fetch balances for both `from` and `to` addresses, update them and write back to db + let query = sqlx::query_as::<_, (String, String)>( + "SELECT account_address, balance FROM balances WHERE contract_address = ? AND \ + account_address IN (?, ?)", + ) + .bind(felt_to_sql_string(&contract_address)) + .bind(felt_to_sql_string(&from_address)) + .bind(felt_to_sql_string(&to_address)); + + // (address, balance) + let balances: Vec<(String, String)> = query.fetch_all(&self.pool).await?; + // (address, balance) is primary key in DB, and we are fetching for 2 addresses so there + // should be at most 2 rows returned + assert!(balances.len() <= 2); + + let from_balance = balances + .iter() + .find(|(address, _)| address == &felt_to_sql_string(&from_address)) + .map(|(_, balance)| balance.clone()) + .unwrap_or_else(|| u256_to_sql_string(&U256::from(0u8))); + + let to_balance = balances + .iter() + .find(|(address, _)| address == &felt_to_sql_string(&to_address)) + .map(|(_, balance)| balance.clone()) + .unwrap_or_else(|| u256_to_sql_string(&U256::from(0u8))); + + let from_balance = sql_string_to_u256(&from_balance); + let to_balance = sql_string_to_u256(&to_balance); + + let new_from_balance = + if from_address != Felt::ZERO { from_balance.sub(amount) } else { from_balance }; + let new_to_balance = + if to_address != Felt::ZERO { to_balance.add(amount) } else { to_balance }; + + let update_query = " + INSERT INTO balances (id, balance, account_address, contract_address, token_id) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (id) + DO UPDATE SET balance = excluded.balance"; + + if from_address != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(felts_to_sql_string(&[from_address, contract_address])), + Argument::String(u256_to_sql_string(&new_from_balance)), + Argument::FieldElement(from_address), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + + if to_address != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(felts_to_sql_string(&[to_address, contract_address])), + Argument::String(u256_to_sql_string(&new_to_balance)), + Argument::FieldElement(to_address), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + } + self.query_queue.execute_all().await?; + + Ok(()) + } + + pub async fn handle_erc721_transfer( + &mut self, + contract_address: Felt, + from_address: Felt, + to_address: Felt, + token_id: U256, + provider: &P, + block_timestamp: u64, + ) -> Result<()> { + let token_id = felt_and_u256_to_sql_string(&contract_address, &token_id); + let token_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tokens WHERE id = ?)") + .bind(token_id.clone()) + .fetch_one(&self.pool) + .await?; + + if !token_exists { + self.register_erc721_token_metadata(contract_address, &token_id, provider).await?; + } + + self.register_erc_transfer_event( + contract_address, + from_address, + to_address, + U256::from(1u8), + &token_id, + block_timestamp, + ); + + // Update balances in erc721_balances table + { + let update_query = " + INSERT INTO balances (id, balance, account_address, contract_address, token_id) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (id) + DO UPDATE SET balance = excluded.balance"; + + if from_address != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!( + "{}{FELT_DELIMITER}{}", + felt_to_sql_string(&from_address), + &token_id + )), + Argument::String(u256_to_sql_string(&U256::from(0u8))), + Argument::FieldElement(from_address), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + + if to_address != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!( + "{}{FELT_DELIMITER}{}", + felt_to_sql_string(&to_address), + &token_id + )), + Argument::String(u256_to_sql_string(&U256::from(1u8))), + Argument::FieldElement(to_address), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + } + self.query_queue.execute_all().await?; + + Ok(()) + } + + async fn register_erc20_token_metadata( + &mut self, + contract_address: Felt, + token_id: &str, + provider: &P, + ) -> Result<()> { + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + // len = 1 => return value felt (i.e. legacy erc20 token) + // len > 1 => return value ByteArray (i.e. new erc20 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let decimals = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("decimals").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let decimals = u8::cairo_deserialize(&decimals, 0).expect("Return value not u8"); + + // Insert the token into the tokens table + self.query_queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, \ + ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + + Ok(()) + } + + async fn register_erc721_token_metadata( + &mut self, + contract_address: Felt, + token_id: &str, + provider: &P, + ) -> Result<()> { + let res = sqlx::query_as::<_, (String, String, u8)>( + "SELECT name, symbol, decimals FROM tokens WHERE contract_address = ?", + ) + .bind(felt_to_sql_string(&contract_address)) + .fetch_one(&self.pool) + .await; + + // If we find a token already registered for this contract_address we dont need to refetch + // the data since its same for all ERC721 tokens + if let Ok((name, symbol, decimals)) = res { + debug!( + contract_address = %felt_to_sql_string(&contract_address), + "Token already registered for contract_address, so reusing fetched data", + ); + self.query_queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, \ + ?, ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + return Ok(()); + } + + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + // len = 1 => return value felt (i.e. legacy erc721 token) + // len > 1 => return value ByteArray (i.e. new erc721 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let decimals = 0; + + // Insert the token into the tokens table + self.query_queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, \ + ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + + Ok(()) + } + + fn register_erc_transfer_event( + &mut self, + contract_address: Felt, + from: Felt, + to: Felt, + amount: U256, + token_id: &str, + block_timestamp: u64, + ) { + let insert_query = "INSERT INTO erc_transfers (contract_address, from_address, \ + to_address, amount, token_id, executed_at) VALUES (?, ?, ?, ?, ?, ?)"; + + self.query_queue.enqueue( + insert_query, + vec![ + Argument::FieldElement(contract_address), + Argument::FieldElement(from), + Argument::FieldElement(to), + Argument::String(u256_to_sql_string(&amount)), + Argument::String(token_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + QueryType::Other, + ); + } +} diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql/mod.rs similarity index 88% rename from crates/torii/core/src/sql.rs rename to crates/torii/core/src/sql/mod.rs index 249a3c4fef..aea8a3ce3e 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -1,24 +1,27 @@ +use std::collections::HashMap; use std::convert::TryInto; use std::str::FromStr; use std::sync::Arc; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use chrono::Utc; use dojo_types::primitive::Primitive; use dojo_types::schema::{EnumOption, Member, Struct, Ty}; use dojo_world::contracts::abi::model::Layout; use dojo_world::contracts::naming::compute_selector_from_names; use dojo_world::metadata::WorldMetadata; +use query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; use tracing::{debug, warn}; +use utils::felts_to_sql_string; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; use crate::types::{ - Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, + ContractType, Event as EventEmitted, EventMessage as EventMessageUpdated, + Model as ModelRegistered, }; use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; @@ -28,9 +31,12 @@ type IsStoreUpdate = bool; pub const WORLD_CONTRACT_TYPE: &str = "WORLD"; pub const FELT_DELIMITER: &str = "/"; +pub mod erc; +pub mod query_queue; #[cfg(test)] -#[path = "sql_test.rs"] +#[path = "test.rs"] mod test; +pub mod utils; #[derive(Debug)] pub struct Sql { @@ -40,6 +46,13 @@ pub struct Sql { model_cache: Arc, } +#[derive(Debug, Clone)] +pub struct Cursors { + pub cursor_map: HashMap, + pub last_pending_block_tx: Option, + pub head: Option, +} + impl Clone for Sql { fn clone(&self) -> Self { Self { @@ -52,19 +65,25 @@ impl Clone for Sql { } impl Sql { - pub async fn new(pool: Pool, world_address: Felt) -> Result { + pub async fn new( + pool: Pool, + world_address: Felt, + contracts: &HashMap, + ) -> Result { let mut query_queue = QueryQueue::new(pool.clone()); - query_queue.enqueue( - "INSERT OR IGNORE INTO contracts (id, contract_address, contract_type) VALUES (?, ?, \ - ?)", - vec![ - Argument::FieldElement(world_address), - Argument::FieldElement(world_address), - Argument::String(WORLD_CONTRACT_TYPE.to_string()), - ], - QueryType::Other, - ); + for contract in contracts { + query_queue.enqueue( + "INSERT OR IGNORE INTO contracts (id, contract_address, contract_type) VALUES (?, \ + ?, ?)", + vec![ + Argument::FieldElement(*contract.0), + Argument::FieldElement(*contract.0), + Argument::String(contract.1.to_string()), + ], + QueryType::Other, + ); + } query_queue.execute_all().await?; @@ -92,17 +111,19 @@ impl Sql { Ok(()) } - pub async fn head(&self) -> Result<(u64, Option, Option)> { + pub async fn head(&self, contract: Felt) -> Result<(u64, Option, Option)> { let mut conn: PoolConnection = self.pool.acquire().await?; let indexer_query = sqlx::query_as::<_, (Option, Option, Option, String)>( - "SELECT head, last_pending_block_world_tx, last_pending_block_tx, contract_type \ - FROM contracts WHERE id = ?", + "SELECT head, last_pending_block_contract_tx, last_pending_block_tx, \ + contract_type FROM contracts WHERE id = ?", ) - .bind(format!("{:#x}", self.world_address)); + .bind(format!("{:#x}", contract)); - let indexer: (Option, Option, Option, String) = - indexer_query.fetch_one(&mut *conn).await?; + let indexer: (Option, Option, Option, String) = indexer_query + .fetch_one(&mut *conn) + .await + .with_context(|| format!("Failed to fetch head for contract: {:#x}", contract))?; Ok(( indexer.0.map(|h| h.try_into().expect("doesn't fit in u64")).unwrap_or(0), indexer.1.map(|f| Felt::from_str(&f)).transpose()?, @@ -110,9 +131,9 @@ impl Sql { )) } - pub fn set_head(&mut self, head: u64) { + pub fn set_head(&mut self, contract: Felt, head: u64) { let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); - let id = Argument::FieldElement(self.world_address); + let id = Argument::FieldElement(contract); self.query_queue.enqueue( "UPDATE contracts SET head = ? WHERE id = ?", vec![head, id], @@ -120,18 +141,22 @@ impl Sql { ); } - pub fn set_last_pending_block_world_tx(&mut self, last_pending_block_world_tx: Option) { - let last_pending_block_world_tx = if let Some(f) = last_pending_block_world_tx { + pub fn set_last_pending_block_contract_tx( + &mut self, + contract: Felt, + last_pending_block_contract_tx: Option, + ) { + let last_pending_block_contract_tx = if let Some(f) = last_pending_block_contract_tx { Argument::String(format!("{:#x}", f)) } else { Argument::Null }; - let id = Argument::FieldElement(self.world_address); + let id = Argument::FieldElement(contract); self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?", - vec![last_pending_block_world_tx, id], + "UPDATE contracts SET last_pending_block_contract_tx = ? WHERE id = ?", + vec![last_pending_block_contract_tx, id], QueryType::Other, ); } @@ -142,11 +167,86 @@ impl Sql { } else { Argument::Null }; - let id = Argument::FieldElement(self.world_address); self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_tx = ? WHERE id = ?", - vec![last_pending_block_tx, id], + "UPDATE contracts SET last_pending_block_tx = ? WHERE 1=1", + vec![last_pending_block_tx], + QueryType::Other, + ) + } + + pub(crate) async fn cursors(&self) -> Result { + let mut conn: PoolConnection = self.pool.acquire().await?; + let cursors = sqlx::query_as::<_, (String, String)>( + "SELECT contract_address, last_pending_block_contract_tx FROM contracts WHERE \ + last_pending_block_contract_tx IS NOT NULL", + ) + .fetch_all(&mut *conn) + .await?; + + let (head, last_pending_block_tx) = sqlx::query_as::<_, (Option, Option)>( + "SELECT head, last_pending_block_tx FROM contracts WHERE 1=1", + ) + .fetch_one(&mut *conn) + .await?; + + let head = head.map(|h| h.try_into().expect("doesn't fit in u64")); + let last_pending_block_tx = + last_pending_block_tx.map(|t| Felt::from_str(&t).expect("its a valid felt")); + Ok(Cursors { + cursor_map: cursors + .into_iter() + .map(|(c, t)| { + ( + Felt::from_str(&c).expect("its a valid felt"), + Felt::from_str(&t).expect("its a valid felt"), + ) + }) + .collect(), + last_pending_block_tx, + head, + }) + } + + pub fn update_cursors( + &mut self, + head: u64, + last_pending_block_tx: Option, + cursor_map: HashMap, + ) { + let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); + let last_pending_block_tx = if let Some(f) = last_pending_block_tx { + Argument::String(format!("{:#x}", f)) + } else { + Argument::Null + }; + + self.query_queue.enqueue( + "UPDATE contracts SET head = ?, last_pending_block_tx = ? WHERE 1=1", + vec![head, last_pending_block_tx], + QueryType::Other, + ); + + for cursor in cursor_map { + let tx = Argument::FieldElement(cursor.1); + let contract = Argument::FieldElement(cursor.0); + + self.query_queue.enqueue( + "UPDATE contracts SET last_pending_block_contract_tx = ? WHERE id = ?", + vec![tx, contract], + QueryType::Other, + ); + } + } + + // For a given contract address, sets head to the passed value and sets + // last_pending_block_contract_tx and last_pending_block_tx to null + pub fn reset_cursors(&mut self, head: u64) { + let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); + self.query_queue.enqueue( + "UPDATE contracts SET head = ?, last_pending_block_contract_tx = ?, \ + last_pending_block_tx = ? WHERE 1=1", + vec![head, Argument::Null, Argument::Null], QueryType::Other, ); } @@ -311,7 +411,7 @@ impl Sql { QueryType::Other, ); - let keys_str = felts_sql_string(&keys); + let keys_str = felts_to_sql_string(&keys); let insert_entities = "INSERT INTO event_messages (id, keys, event_id, executed_at) \ VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ @@ -452,15 +552,15 @@ impl Sql { Transaction::Invoke(InvokeTransaction::V1(invoke_v1_transaction)) => ( Argument::FieldElement(invoke_v1_transaction.transaction_hash), Argument::FieldElement(invoke_v1_transaction.sender_address), - Argument::String(felts_sql_string(&invoke_v1_transaction.calldata)), + Argument::String(felts_to_sql_string(&invoke_v1_transaction.calldata)), Argument::FieldElement(invoke_v1_transaction.max_fee), - Argument::String(felts_sql_string(&invoke_v1_transaction.signature)), + Argument::String(felts_to_sql_string(&invoke_v1_transaction.signature)), Argument::FieldElement(invoke_v1_transaction.nonce), ), Transaction::L1Handler(l1_handler_transaction) => ( Argument::FieldElement(l1_handler_transaction.transaction_hash), Argument::FieldElement(l1_handler_transaction.contract_address), - Argument::String(felts_sql_string(&l1_handler_transaction.calldata)), + Argument::String(felts_to_sql_string(&l1_handler_transaction.calldata)), Argument::FieldElement(Felt::ZERO), // has no max_fee Argument::String("".to_string()), // has no signature Argument::FieldElement((l1_handler_transaction.nonce).into()), @@ -495,8 +595,8 @@ impl Sql { block_timestamp: u64, ) { let id = Argument::String(event_id.to_string()); - let keys = Argument::String(felts_sql_string(&event.keys)); - let data = Argument::String(felts_sql_string(&event.data)); + let keys = Argument::String(felts_to_sql_string(&event.keys)); + let data = Argument::String(felts_to_sql_string(&event.data)); let hash = Argument::FieldElement(transaction_hash); let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp)); @@ -509,8 +609,8 @@ impl Sql { let emitted = EventEmitted { id: event_id.to_string(), - keys: felts_sql_string(&event.keys), - data: felts_sql_string(&event.data), + keys: felts_to_sql_string(&event.keys), + data: felts_to_sql_string(&event.data), transaction_hash: format!("{:#x}", transaction_hash), created_at: Utc::now(), executed_at: must_utc_datetime_from_timestamp(block_timestamp), @@ -1151,8 +1251,3 @@ impl Sql { Ok(()) } } - -pub fn felts_sql_string(felts: &[Felt]) -> String { - felts.iter().map(|k| format!("{:#x}", k)).collect::>().join(FELT_DELIMITER) - + FELT_DELIMITER -} diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/sql/query_queue.rs similarity index 98% rename from crates/torii/core/src/query_queue.rs rename to crates/torii/core/src/sql/query_queue.rs index 5dfad77113..a9625e6259 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/sql/query_queue.rs @@ -5,6 +5,7 @@ use dojo_types::schema::{Struct, Ty}; use sqlx::{FromRow, Pool, Sqlite}; use starknet::core::types::Felt; +use super::utils::felt_to_sql_string; use crate::simple_broker::SimpleBroker; use crate::types::{ Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, @@ -82,7 +83,7 @@ impl QueryQueue { Argument::Int(integer) => query.bind(integer), Argument::Bool(bool) => query.bind(bool), Argument::String(string) => query.bind(string), - Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), + Argument::FieldElement(felt) => query.bind(felt_to_sql_string(felt)), } } diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql/test.rs similarity index 92% rename from crates/torii/core/src/sql_test.rs rename to crates/torii/core/src/sql/test.rs index f31cacbb66..e4b9d2a458 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql/test.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -20,13 +21,8 @@ use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast; use crate::engine::{Engine, EngineConfig, Processors}; -use crate::processors::generate_event_processors_map; -use crate::processors::register_model::RegisterModelProcessor; -use crate::processors::store_del_record::StoreDelRecordProcessor; -use crate::processors::store_set_record::StoreSetRecordProcessor; -use crate::processors::store_update_member::StoreUpdateMemberProcessor; -use crate::processors::store_update_record::StoreUpdateRecordProcessor; use crate::sql::Sql; +use crate::types::ContractType; pub async fn bootstrap_engine

( world: WorldContractReader

, @@ -38,26 +34,19 @@ where { let (shutdown_tx, _) = broadcast::channel(1); let to = provider.block_hash_and_number().await?.block_number; + let world_address = world.address; let mut engine = Engine::new( world, db, provider, - Processors { - event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), - Arc::new(StoreUpdateRecordProcessor), - Arc::new(StoreUpdateMemberProcessor), - Arc::new(StoreDelRecordProcessor), - ])?, - ..Processors::default() - }, + Processors { ..Processors::default() }, EngineConfig::default(), shutdown_tx, None, + Arc::new(HashMap::from([(world_address, ContractType::WORLD)])), ); - let data = engine.fetch_range(0, to, None).await.unwrap(); + let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); Ok(engine) @@ -123,7 +112,13 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let mut db = Sql::new( + pool.clone(), + world_reader.address, + &HashMap::from([(world_reader.address, ContractType::WORLD)]), + ) + .await + .unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), provider).await.unwrap(); @@ -280,7 +275,13 @@ async fn test_load_from_remote_del(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let mut db = Sql::new( + pool.clone(), + world_reader.address, + &HashMap::from([(world_reader.address, ContractType::WORLD)]), + ) + .await + .unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), provider).await; @@ -367,7 +368,13 @@ async fn test_update_with_set_record(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let mut db = Sql::new( + pool.clone(), + world_reader.address, + &HashMap::from([(world_reader.address, ContractType::WORLD)]), + ) + .await + .unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); diff --git a/crates/torii/core/src/sql/utils.rs b/crates/torii/core/src/sql/utils.rs new file mode 100644 index 0000000000..26476b0837 --- /dev/null +++ b/crates/torii/core/src/sql/utils.rs @@ -0,0 +1,26 @@ +use starknet::core::types::U256; +use starknet_crypto::Felt; + +use super::FELT_DELIMITER; + +pub fn felts_to_sql_string(felts: &[Felt]) -> String { + felts.iter().map(|k| format!("{:#x}", k)).collect::>().join(FELT_DELIMITER) + + FELT_DELIMITER +} + +pub fn felt_to_sql_string(felt: &Felt) -> String { + format!("{:#x}", felt) +} + +pub fn felt_and_u256_to_sql_string(felt: &Felt, u256: &U256) -> String { + format!("{}:{}", felt_to_sql_string(felt), u256_to_sql_string(u256)) +} + +pub fn u256_to_sql_string(u256: &U256) -> String { + format!("{:#064x}", u256) +} + +pub fn sql_string_to_u256(sql_string: &str) -> U256 { + let sql_string = sql_string.strip_prefix("0x").unwrap_or(sql_string); + U256::from(crypto_bigint::U256::from_be_hex(sql_string)) +} diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index de75fca94a..e3c814b9a2 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -1,4 +1,7 @@ use core::fmt; +use std::collections::VecDeque; +use std::path::PathBuf; +use std::str::FromStr; use chrono::{DateTime, Utc}; use dojo_types::schema::Ty; @@ -84,3 +87,59 @@ pub struct Event { pub executed_at: DateTime, pub created_at: DateTime, } + +#[derive(Default, Deserialize, Debug, Clone)] +pub struct ToriiConfig { + /// contract addresses to index + pub contracts: VecDeque, +} + +impl ToriiConfig { + pub fn load_from_path(path: &PathBuf) -> Result { + let config = std::fs::read_to_string(path)?; + let config: Self = toml::from_str(&config)?; + Ok(config) + } +} + +#[derive(Deserialize, Debug, Clone, Copy)] +pub struct Contract { + pub address: Felt, + pub r#type: ContractType, +} + +#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ContractType { + WORLD, + ERC20, + ERC20Legacy, + ERC721, + ERC721Legacy, +} + +impl FromStr for ContractType { + type Err = anyhow::Error; + + fn from_str(input: &str) -> Result { + match input.to_lowercase().as_str() { + "world" => Ok(ContractType::WORLD), + "erc20" => Ok(ContractType::ERC20), + "erc721" => Ok(ContractType::ERC721), + "erc20legacy" | "erc20_legacy" => Ok(ContractType::ERC20Legacy), + "erc721legacy" | "erc721_legacy" => Ok(ContractType::ERC721Legacy), + _ => Err(anyhow::anyhow!("Invalid ERC type: {}", input)), + } + } +} + +impl std::fmt::Display for ContractType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ContractType::WORLD => write!(f, "WORLD"), + ContractType::ERC20 => write!(f, "ERC20"), + ContractType::ERC721 => write!(f, "ERC721"), + ContractType::ERC20Legacy => write!(f, "ERC20Legacy"), + ContractType::ERC721Legacy => write!(f, "ERC721Legacy"), + } + } +} diff --git a/crates/torii/graphql/Cargo.toml b/crates/torii/graphql/Cargo.toml index 464dddefb6..8c239ef9b4 100644 --- a/crates/torii/graphql/Cargo.toml +++ b/crates/torii/graphql/Cargo.toml @@ -26,6 +26,7 @@ serde.workspace = true serde_json.workspace = true sozo-ops.workspace = true sqlx.workspace = true +starknet-crypto.workspace = true strum.workspace = true strum_macros.workspace = true thiserror.workspace = true @@ -46,5 +47,4 @@ dojo-world.workspace = true katana-runner.workspace = true scarb.workspace = true serial_test = "2.0.0" -starknet-crypto.workspace = true starknet.workspace = true diff --git a/crates/torii/graphql/src/constants.rs b/crates/torii/graphql/src/constants.rs index e09c8de6d2..2d851f07b1 100644 --- a/crates/torii/graphql/src/constants.rs +++ b/crates/torii/graphql/src/constants.rs @@ -33,6 +33,9 @@ pub const QUERY_TYPE_NAME: &str = "World__Query"; pub const SUBSCRIPTION_TYPE_NAME: &str = "World__Subscription"; pub const MODEL_ORDER_TYPE_NAME: &str = "World__ModelOrder"; pub const MODEL_ORDER_FIELD_TYPE_NAME: &str = "World__ModelOrderField"; +pub const ERC_BALANCE_TYPE_NAME: &str = "ERC__Balance"; +pub const ERC_TRANSFER_TYPE_NAME: &str = "ERC__Transfer"; +pub const ERC_TOKEN_TYPE_NAME: &str = "ERC__Token"; // objects' single and plural names pub const ENTITY_NAMES: (&str, &str) = ("entity", "entities"); @@ -45,6 +48,10 @@ pub const METADATA_NAMES: (&str, &str) = ("metadata", "metadatas"); pub const TRANSACTION_NAMES: (&str, &str) = ("transaction", "transactions"); pub const PAGE_INFO_NAMES: (&str, &str) = ("pageInfo", ""); +pub const ERC_BALANCE_NAME: (&str, &str) = ("ercBalance", ""); +pub const ERC_TOKEN_NAME: (&str, &str) = ("ercToken", ""); +pub const ERC_TRANSFER_NAME: (&str, &str) = ("ercTransfer", ""); + // misc pub const ORDER_DIR_TYPE_NAME: &str = "OrderDirection"; pub const ORDER_ASC: &str = "ASC"; diff --git a/crates/torii/graphql/src/error.rs b/crates/torii/graphql/src/error.rs index d00969f98b..83834c9f35 100644 --- a/crates/torii/graphql/src/error.rs +++ b/crates/torii/graphql/src/error.rs @@ -9,6 +9,8 @@ pub enum ExtractError { NotList(String), #[error("Not a string: {0}")] NotString(String), + #[error("Not a felt: {0}")] + NotFelt(String), #[error("Not a number: {0}")] NotNumber(String), } diff --git a/crates/torii/graphql/src/mapping.rs b/crates/torii/graphql/src/mapping.rs index 1086373bca..47f7d8e1b1 100644 --- a/crates/torii/graphql/src/mapping.rs +++ b/crates/torii/graphql/src/mapping.rs @@ -4,7 +4,7 @@ use async_graphql::Name; use dojo_types::primitive::Primitive; use lazy_static::lazy_static; -use crate::constants::{CONTENT_TYPE_NAME, SOCIAL_TYPE_NAME}; +use crate::constants::{CONTENT_TYPE_NAME, ERC_TOKEN_TYPE_NAME, SOCIAL_TYPE_NAME}; use crate::types::{GraphqlType, TypeData, TypeMapping}; lazy_static! { @@ -144,4 +144,27 @@ lazy_static! { TypeData::Simple(TypeRef::named(GraphqlType::DateTime.to_string())) ), ]); + + pub static ref ERC_BALANCE_TYPE_MAPPING: TypeMapping = IndexMap::from([ + (Name::new("balance"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("type"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("token_metadata"), TypeData::Simple(TypeRef::named(ERC_TOKEN_TYPE_NAME))), + ]); + + pub static ref ERC_TRANSFER_TYPE_MAPPING: TypeMapping = IndexMap::from([ + (Name::new("from"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("to"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("amount"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("type"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("executed_at"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("token_metadata"), TypeData::Simple(TypeRef::named(ERC_TOKEN_TYPE_NAME))), + ]); + + pub static ref ERC_TOKEN_TYPE_MAPPING: TypeMapping = IndexMap::from([ + (Name::new("name"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("symbol"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("token_id"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("decimals"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + (Name::new("contract_address"), TypeData::Simple(TypeRef::named(TypeRef::STRING))), + ]); } diff --git a/crates/torii/graphql/src/object/erc/erc_balance.rs b/crates/torii/graphql/src/object/erc/erc_balance.rs new file mode 100644 index 0000000000..77cc492dc2 --- /dev/null +++ b/crates/torii/graphql/src/object/erc/erc_balance.rs @@ -0,0 +1,143 @@ +use async_graphql::dynamic::{Field, FieldFuture, InputValue, TypeRef}; +use async_graphql::{Name, Value}; +use convert_case::{Case, Casing}; +use serde::Deserialize; +use sqlx::{FromRow, Pool, Sqlite, SqliteConnection}; +use starknet_crypto::Felt; +use torii_core::sql::utils::felt_to_sql_string; +use tracing::warn; + +use crate::constants::{ERC_BALANCE_NAME, ERC_BALANCE_TYPE_NAME}; +use crate::mapping::ERC_BALANCE_TYPE_MAPPING; +use crate::object::{BasicObject, ResolvableObject}; +use crate::types::{TypeMapping, ValueMapping}; +use crate::utils::extract; + +#[derive(Debug)] +pub struct ErcBalanceObject; + +impl BasicObject for ErcBalanceObject { + fn name(&self) -> (&str, &str) { + ERC_BALANCE_NAME + } + + fn type_name(&self) -> &str { + ERC_BALANCE_TYPE_NAME + } + + fn type_mapping(&self) -> &TypeMapping { + &ERC_BALANCE_TYPE_MAPPING + } +} + +impl ResolvableObject for ErcBalanceObject { + fn resolvers(&self) -> Vec { + let account_address = "account_address"; + let argument = InputValue::new( + account_address.to_case(Case::Camel), + TypeRef::named_nn(TypeRef::STRING), + ); + + let field = Field::new(self.name().0, TypeRef::named_list(self.type_name()), move |ctx| { + FieldFuture::new(async move { + let mut conn = ctx.data::>()?.acquire().await?; + let address = extract::( + ctx.args.as_index_map(), + &account_address.to_case(Case::Camel), + )?; + + let erc_balances = fetch_erc_balances(&mut conn, address).await?; + + Ok(Some(Value::List(erc_balances))) + }) + }) + .argument(argument); + vec![field] + } +} + +async fn fetch_erc_balances( + conn: &mut SqliteConnection, + address: Felt, +) -> sqlx::Result> { + let query = "SELECT t.contract_address, t.name, t.symbol, t.decimals, b.balance, b.token_id, \ + c.contract_type + FROM balances b + JOIN tokens t ON b.token_id = t.id + JOIN contracts c ON t.contract_address = c.contract_address + WHERE b.account_address = ?"; + + let rows = sqlx::query(query).bind(felt_to_sql_string(&address)).fetch_all(conn).await?; + + let mut erc_balances = Vec::new(); + + for row in rows { + let row = BalanceQueryResultRaw::from_row(&row)?; + + let balance_value = match row.contract_type.as_str() { + "ERC20" | "Erc20" | "erc20" => { + let token_metadata = Value::Object(ValueMapping::from([ + (Name::new("name"), Value::String(row.name)), + (Name::new("symbol"), Value::String(row.symbol)), + // for erc20 there is no token_id + (Name::new("token_id"), Value::Null), + (Name::new("decimals"), Value::String(row.decimals.to_string())), + (Name::new("contract_address"), Value::String(row.contract_address.clone())), + ])); + + Value::Object(ValueMapping::from([ + (Name::new("balance"), Value::String(row.balance)), + (Name::new("type"), Value::String(row.contract_type)), + (Name::new("token_metadata"), token_metadata), + ])) + } + "ERC721" | "Erc721" | "erc721" => { + // contract_address:token_id + let token_id = row.token_id.split(':').collect::>(); + assert!(token_id.len() == 2); + + let token_metadata = Value::Object(ValueMapping::from([ + (Name::new("contract_address"), Value::String(row.contract_address.clone())), + (Name::new("name"), Value::String(row.name)), + (Name::new("symbol"), Value::String(row.symbol)), + (Name::new("token_id"), Value::String(row.token_id)), + (Name::new("decimals"), Value::String(row.decimals.to_string())), + ])); + + Value::Object(ValueMapping::from([ + (Name::new("balance"), Value::String(row.balance)), + (Name::new("type"), Value::String(row.contract_type)), + (Name::new("token_metadata"), token_metadata), + ])) + } + _ => { + warn!("Unknown contract type: {}", row.contract_type); + continue; + } + }; + + erc_balances.push(balance_value); + } + + Ok(erc_balances) +} + +// TODO: This would be required when subscriptions are needed +// impl ErcBalanceObject { +// pub fn value_mapping(entity: ErcBalance) -> ValueMapping { +// IndexMap::from([ +// ]) +// } +// } + +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +struct BalanceQueryResultRaw { + pub contract_address: String, + pub name: String, + pub symbol: String, + pub decimals: u8, + pub token_id: String, + pub balance: String, + pub contract_type: String, +} diff --git a/crates/torii/graphql/src/object/erc/erc_token.rs b/crates/torii/graphql/src/object/erc/erc_token.rs new file mode 100644 index 0000000000..14b8de7877 --- /dev/null +++ b/crates/torii/graphql/src/object/erc/erc_token.rs @@ -0,0 +1,21 @@ +use crate::constants::{ERC_TOKEN_NAME, ERC_TOKEN_TYPE_NAME}; +use crate::mapping::ERC_TOKEN_TYPE_MAPPING; +use crate::object::BasicObject; +use crate::types::TypeMapping; + +#[derive(Debug)] +pub struct ErcTokenObject; + +impl BasicObject for ErcTokenObject { + fn name(&self) -> (&str, &str) { + ERC_TOKEN_NAME + } + + fn type_name(&self) -> &str { + ERC_TOKEN_TYPE_NAME + } + + fn type_mapping(&self) -> &TypeMapping { + &ERC_TOKEN_TYPE_MAPPING + } +} diff --git a/crates/torii/graphql/src/object/erc/erc_transfer.rs b/crates/torii/graphql/src/object/erc/erc_transfer.rs new file mode 100644 index 0000000000..056f0c224b --- /dev/null +++ b/crates/torii/graphql/src/object/erc/erc_transfer.rs @@ -0,0 +1,181 @@ +use async_graphql::dynamic::{Field, FieldFuture, InputValue, TypeRef}; +use async_graphql::{Name, Value}; +use convert_case::{Case, Casing}; +use serde::Deserialize; +use sqlx::{FromRow, Pool, Sqlite, SqliteConnection}; +use starknet_crypto::Felt; +use torii_core::sql::utils::felt_to_sql_string; +use tracing::warn; + +use crate::constants::{ERC_TRANSFER_NAME, ERC_TRANSFER_TYPE_NAME}; +use crate::mapping::ERC_TRANSFER_TYPE_MAPPING; +use crate::object::{BasicObject, ResolvableObject}; +use crate::types::{TypeMapping, ValueMapping}; +use crate::utils::extract; + +#[derive(Debug)] +pub struct ErcTransferObject; + +impl BasicObject for ErcTransferObject { + fn name(&self) -> (&str, &str) { + ERC_TRANSFER_NAME + } + + fn type_name(&self) -> &str { + ERC_TRANSFER_TYPE_NAME + } + + fn type_mapping(&self) -> &TypeMapping { + &ERC_TRANSFER_TYPE_MAPPING + } +} + +impl ResolvableObject for ErcTransferObject { + fn resolvers(&self) -> Vec { + let account_address = "account_address"; + let limit = "limit"; + let arg_addr = InputValue::new( + account_address.to_case(Case::Camel), + TypeRef::named_nn(TypeRef::STRING), + ); + let arg_limit = + InputValue::new(limit.to_case(Case::Camel), TypeRef::named_nn(TypeRef::INT)); + + let field = Field::new(self.name().0, TypeRef::named_list(self.type_name()), move |ctx| { + FieldFuture::new(async move { + let mut conn = ctx.data::>()?.acquire().await?; + let address = extract::( + ctx.args.as_index_map(), + &account_address.to_case(Case::Camel), + )?; + let limit = extract::(ctx.args.as_index_map(), &limit.to_case(Case::Camel))?; + let limit: u32 = limit.try_into()?; + + let erc_transfers = fetch_erc_transfers(&mut conn, address, limit).await?; + + Ok(Some(Value::List(erc_transfers))) + }) + }) + .argument(arg_addr) + .argument(arg_limit); + vec![field] + } +} + +async fn fetch_erc_transfers( + conn: &mut SqliteConnection, + address: Felt, + limit: u32, +) -> sqlx::Result> { + let query = format!( + r#" +SELECT + et.contract_address, + et.from_address, + et.to_address, + et.amount, + et.token_id, + et.executed_at, + t.name, + t.symbol, + t.decimals, + c.contract_type +FROM + erc_transfers et +JOIN + tokens t ON et.token_id = t.id +JOIN + contracts c ON t.contract_address = c.contract_address +WHERE + et.from_address = ? OR et.to_address = ? +ORDER BY + et.executed_at DESC +LIMIT {}; +"#, + limit + ); + + let address = felt_to_sql_string(&address); + let rows = sqlx::query(&query).bind(&address).bind(&address).fetch_all(conn).await?; + + let mut erc_balances = Vec::new(); + + for row in rows { + let row = TransferQueryResultRaw::from_row(&row)?; + + let transfer_value = match row.contract_type.as_str() { + "ERC20" | "Erc20" | "erc20" => { + let token_metadata = Value::Object(ValueMapping::from([ + (Name::new("name"), Value::String(row.name)), + (Name::new("symbol"), Value::String(row.symbol)), + // for erc20 there is no token_id + (Name::new("token_id"), Value::Null), + (Name::new("decimals"), Value::String(row.decimals.to_string())), + (Name::new("contract_address"), Value::String(row.contract_address.clone())), + ])); + + Value::Object(ValueMapping::from([ + (Name::new("from"), Value::String(row.from_address)), + (Name::new("to"), Value::String(row.to_address)), + (Name::new("amount"), Value::String(row.amount)), + (Name::new("type"), Value::String(row.contract_type)), + (Name::new("executed_at"), Value::String(row.executed_at)), + (Name::new("token_metadata"), token_metadata), + ])) + } + "ERC721" | "Erc721" | "erc721" => { + // contract_address:token_id + let token_id = row.token_id.split(':').collect::>(); + assert!(token_id.len() == 2); + + let token_metadata = Value::Object(ValueMapping::from([ + (Name::new("name"), Value::String(row.name)), + (Name::new("symbol"), Value::String(row.symbol)), + (Name::new("token_id"), Value::String(token_id[1].to_string())), + (Name::new("decimals"), Value::String(row.decimals.to_string())), + (Name::new("contract_address"), Value::String(row.contract_address.clone())), + ])); + + Value::Object(ValueMapping::from([ + (Name::new("from"), Value::String(row.from_address)), + (Name::new("to"), Value::String(row.to_address)), + (Name::new("amount"), Value::String(row.amount)), + (Name::new("type"), Value::String(row.contract_type)), + (Name::new("executed_at"), Value::String(row.executed_at)), + (Name::new("token_metadata"), token_metadata), + ])) + } + _ => { + warn!("Unknown contract type: {}", row.contract_type); + continue; + } + }; + + erc_balances.push(transfer_value); + } + + Ok(erc_balances) +} + +// TODO: This would be required when subscriptions are needed +// impl ErcTransferObject { +// pub fn value_mapping(entity: ErcBalance) -> ValueMapping { +// IndexMap::from([ +// ]) +// } +// } + +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +struct TransferQueryResultRaw { + pub contract_address: String, + pub from_address: String, + pub to_address: String, + pub token_id: String, + pub amount: String, + pub executed_at: String, + pub name: String, + pub symbol: String, + pub decimals: u8, + pub contract_type: String, +} diff --git a/crates/torii/graphql/src/object/erc/mod.rs b/crates/torii/graphql/src/object/erc/mod.rs new file mode 100644 index 0000000000..eac2c5510b --- /dev/null +++ b/crates/torii/graphql/src/object/erc/mod.rs @@ -0,0 +1,3 @@ +pub mod erc_balance; +pub mod erc_token; +pub mod erc_transfer; diff --git a/crates/torii/graphql/src/object/mod.rs b/crates/torii/graphql/src/object/mod.rs index c1046ffbe4..8997cdabe3 100644 --- a/crates/torii/graphql/src/object/mod.rs +++ b/crates/torii/graphql/src/object/mod.rs @@ -1,5 +1,6 @@ pub mod connection; pub mod entity; +pub mod erc; pub mod event; pub mod event_message; pub mod inputs; diff --git a/crates/torii/graphql/src/schema.rs b/crates/torii/graphql/src/schema.rs index 48a915345b..5f70c49908 100644 --- a/crates/torii/graphql/src/schema.rs +++ b/crates/torii/graphql/src/schema.rs @@ -10,6 +10,9 @@ use super::object::model_data::ModelDataObject; use super::types::ScalarType; use super::utils; use crate::constants::{QUERY_TYPE_NAME, SUBSCRIPTION_TYPE_NAME}; +use crate::object::erc::erc_balance::ErcBalanceObject; +use crate::object::erc::erc_token::ErcTokenObject; +use crate::object::erc::erc_transfer::ErcTransferObject; use crate::object::event_message::EventMessageObject; use crate::object::metadata::content::ContentObject; use crate::object::metadata::social::SocialObject; @@ -28,6 +31,7 @@ pub async fn build_schema(pool: &SqlitePool) -> Result { let (objects, unions) = build_objects(pool).await?; let mut schema_builder = Schema::build(QUERY_TYPE_NAME, None, Some(SUBSCRIPTION_TYPE_NAME)); + //? why we need to provide QUERY_TYPE_NAME object here when its already passed to Schema? let mut query_root = Object::new(QUERY_TYPE_NAME); let mut subscription_root = Subscription::new(SUBSCRIPTION_TYPE_NAME); @@ -112,9 +116,12 @@ async fn build_objects(pool: &SqlitePool) -> Result<(Vec, Vec Result { .await .unwrap(); - TransactionWaiter::new(transaction_hash, &provider).await?; + TransactionWaiter::new(transaction_hash, &account.provider()).await?; // Execute `delete` and delete Record with id 20 let InvokeTransactionResult { transaction_hash } = account @@ -350,29 +348,29 @@ pub async fn spinup_types_test() -> Result { let world = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let db = Sql::new(pool.clone(), strat.world_address).await.unwrap(); + let db = Sql::new( + pool.clone(), + strat.world_address, + &HashMap::from([(strat.world_address, ContractType::WORLD)]), + ) + .await + .unwrap(); + let world_address = strat.world_address; let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( world, db, Arc::clone(&provider), - Processors { - event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), - Arc::new(StoreDelRecordProcessor), - ]) - .unwrap(), - ..Processors::default() - }, + Processors { ..Processors::default() }, EngineConfig::default(), shutdown_tx, None, + Arc::new(HashMap::from([(world_address, ContractType::WORLD)])), ); let to = account.provider().block_hash_and_number().await?.block_number; - let data = engine.fetch_range(0, to, None).await.unwrap(); + let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); Ok(pool) diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index 363082878a..efe9a7fd47 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -1,5 +1,6 @@ #[cfg(test)] mod tests { + use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; @@ -13,7 +14,9 @@ mod tests { use starknet::core::types::Event; use starknet_crypto::{poseidon_hash_many, Felt}; use tokio::sync::mpsc; - use torii_core::sql::{felts_sql_string, Sql}; + use torii_core::sql::utils::felts_to_sql_string; + use torii_core::sql::Sql; + use torii_core::types::ContractType; use crate::tests::{model_fixtures, run_graphql_subscription}; use crate::utils; @@ -21,7 +24,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); model_fixtures(&mut db).await; // 0. Preprocess expected entity value @@ -104,7 +110,7 @@ mod tests { ], }); let keys = keys_from_ty(&ty).unwrap(); - let keys_str = felts_sql_string(&keys); + let keys_str = felts_to_sql_string(&keys); let entity_id = poseidon_hash_many(&keys); let model_id = model_id_from_ty(&ty); @@ -156,7 +162,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription_with_id(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); model_fixtures(&mut db).await; // 0. Preprocess expected entity value @@ -222,7 +231,7 @@ mod tests { }); let keys = keys_from_ty(&ty).unwrap(); - let keys_str = felts_sql_string(&keys); + let keys_str = felts_to_sql_string(&keys); let entity_id = poseidon_hash_many(&keys); let model_id = model_id_from_ty(&ty); @@ -271,7 +280,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); // 0. Preprocess model value let namespace = "types_test".to_string(); let model_name = "Subrecord".to_string(); @@ -336,7 +348,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription_with_id(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); // 0. Preprocess model value let namespace = "types_test".to_string(); let model_name = "Subrecord".to_string(); @@ -402,7 +417,10 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_event_emitted(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); let block_timestamp: u64 = 1710754478_u64; let (tx, mut rx) = mpsc::channel(7); tokio::spawn(async move { diff --git a/crates/torii/graphql/src/utils.rs b/crates/torii/graphql/src/utils.rs index 8f49990d4a..949e3b9711 100644 --- a/crates/torii/graphql/src/utils.rs +++ b/crates/torii/graphql/src/utils.rs @@ -1,5 +1,8 @@ +use std::str::FromStr; + use async_graphql::{Result, Value}; use convert_case::{Case, Casing}; +use starknet_crypto::Felt; use crate::error::ExtractError; use crate::types::ValueMapping; @@ -28,6 +31,18 @@ impl ExtractFromIndexMap for String { } } +impl ExtractFromIndexMap for Felt { + fn extract(indexmap: &ValueMapping, input: &str) -> Result { + let value = indexmap.get(input).ok_or_else(|| ExtractError::NotFound(input.to_string()))?; + match value { + Value::String(s) => { + Ok(Felt::from_str(s).map_err(|_| ExtractError::NotFelt(input.to_string()))?) + } + _ => Err(ExtractError::NotString(input.to_string())), + } + } +} + impl ExtractFromIndexMap for Vec { fn extract(indexmap: &ValueMapping, input: &str) -> Result { let value = indexmap.get(input).ok_or_else(|| ExtractError::NotFound(input.to_string()))?; diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index 0b04574a03..c84e23775a 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -20,10 +21,8 @@ use starknet::providers::{JsonRpcClient, Provider}; use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast; use torii_core::engine::{Engine, EngineConfig, Processors}; -use torii_core::processors::generate_event_processors_map; -use torii_core::processors::register_model::RegisterModelProcessor; -use torii_core::processors::store_set_record::StoreSetRecordProcessor; use torii_core::sql::Sql; +use torii_core::types::ContractType; use crate::proto::types::KeysClause; use crate::server::DojoWorld; @@ -92,28 +91,29 @@ async fn test_entities_queries(sequencer: &RunnerCtx) { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); - let db = Sql::new(pool.clone(), strat.world_address).await.unwrap(); + let db = Sql::new( + pool.clone(), + strat.world_address, + &HashMap::from([(strat.world_address, ContractType::WORLD)]), + ) + .await + .unwrap(); + let world_address = strat.world_address; let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( world_reader, db.clone(), Arc::clone(&provider), - Processors { - event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), - ]) - .unwrap(), - ..Processors::default() - }, + Processors { ..Processors::default() }, EngineConfig::default(), shutdown_tx, None, + Arc::new(HashMap::from([(world_address, ContractType::WORLD)])), ); let to = provider.block_hash_and_number().await.unwrap().block_number; - let data = engine.fetch_range(0, to, None).await.unwrap(); + let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); let (_, receiver) = tokio::sync::mpsc::channel(1); diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index 9bc1e25ce3..aebf341a59 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -25,7 +25,8 @@ use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall}; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; use starknet_crypto::poseidon_hash_many; -use torii_core::sql::{felts_sql_string, Sql}; +use torii_core::sql::utils::felts_to_sql_string; +use torii_core::sql::Sql; use tracing::{info, warn}; use webrtc::tokio::Certificate; @@ -245,7 +246,7 @@ impl Relay

{ continue; } }; - let keys_str = felts_sql_string(&keys); + let keys_str = felts_to_sql_string(&keys); let entity_id = poseidon_hash_many(&keys); let model_id = ty_model_id(&ty).unwrap(); diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index 7ef1472068..6aa1ba9858 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -524,6 +524,7 @@ mod test { #[cfg(not(target_arch = "wasm32"))] #[tokio::test] async fn test_client_messaging() -> Result<(), Box> { + use std::collections::HashMap; use std::time::Duration; use dojo_types::schema::{Member, Struct, Ty}; @@ -537,6 +538,7 @@ mod test { use tokio::select; use tokio::time::sleep; use torii_core::sql::Sql; + use torii_core::types::ContractType; use crate::server::Relay; use crate::typed_data::{Domain, Field, SimpleField, TypedData}; @@ -559,7 +561,10 @@ mod test { let account = sequencer.account_data(0); - let mut db = Sql::new(pool.clone(), Felt::ZERO).await?; + let mut db = + Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + .await + .unwrap(); // Register the model of our Message db.register_model( diff --git a/crates/torii/migrations/20240913104418_add_erc.sql b/crates/torii/migrations/20240913104418_add_erc.sql new file mode 100644 index 0000000000..aca9f4d817 --- /dev/null +++ b/crates/torii/migrations/20240913104418_add_erc.sql @@ -0,0 +1,35 @@ +CREATE TABLE balances ( + -- account_address:contract_address:token_id + id TEXT NOT NULL PRIMARY KEY, + balance TEXT NOT NULL, + account_address TEXT NOT NULL, + contract_address TEXT NOT NULL, + -- contract_address:token_id + token_id TEXT NOT NULL, + FOREIGN KEY (token_id) REFERENCES tokens(id) +); + +CREATE INDEX balances_account_address ON balances (account_address); +CREATE INDEX balances_contract_address ON balances (contract_address); + +CREATE TABLE tokens ( + -- contract_address:token_id + id TEXT NOT NULL PRIMARY KEY, + contract_address TEXT NOT NULL, + name TEXT NOT NULL, + symbol TEXT NOT NULL, + decimals INTEGER NOT NULL, + FOREIGN KEY (contract_address) REFERENCES contracts(id) +); + +CREATE TABLE erc_transfers ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + contract_address TEXT NOT NULL, + from_address TEXT NOT NULL, + to_address TEXT NOT NULL, + amount TEXT NOT NULL, + -- contract_address:token_id + token_id TEXT NOT NULL, + executed_at DATETIME NOT NULL, + FOREIGN KEY (token_id) REFERENCES tokens(id) +); diff --git a/crates/torii/migrations/20240918200125_rename_column_contracts_table.sql b/crates/torii/migrations/20240918200125_rename_column_contracts_table.sql new file mode 100644 index 0000000000..3213853e8a --- /dev/null +++ b/crates/torii/migrations/20240918200125_rename_column_contracts_table.sql @@ -0,0 +1 @@ +ALTER TABLE contracts RENAME COLUMN last_pending_block_world_tx TO last_pending_block_contract_tx; diff --git a/crates/torii/types-test/Scarb.lock b/crates/torii/types-test/Scarb.lock index ae6ec350d2..8720cc6b90 100644 --- a/crates/torii/types-test/Scarb.lock +++ b/crates/torii/types-test/Scarb.lock @@ -4,14 +4,6 @@ version = 1 [[package]] name = "dojo" version = "1.0.0-alpha.4" -dependencies = [ - "dojo_plugin", -] - -[[package]] -name = "dojo_plugin" -version = "1.0.0-alpha.4" -source = "git+https://github.com/dojoengine/dojo?rev=f15def33#f15def330c0d099e79351d11c197f63e8cc1ff36" [[package]] name = "types_test" diff --git a/scripts/compare-torii-data.py b/scripts/compare-torii-data.py new file mode 100644 index 0000000000..04798185cd --- /dev/null +++ b/scripts/compare-torii-data.py @@ -0,0 +1,87 @@ +# This script compares data across 'events', 'entities', and 'transactions' tables between two SQLite databases. +# Helpful to make sure any changes made in torii doesn't affect the resulting data. + +import sqlite3 +import argparse + +def fetch_table_data(db_path, table_name, columns): + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute(f"SELECT {', '.join(columns)} FROM {table_name}") + data = cursor.fetchall() + conn.close() + return {row[0]: row[1:] for row in data} + +def get_table_row_count(db_path, table_name): + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + count = cursor.fetchone()[0] + conn.close() + return count + +def compare_data(data1, data2, table_name): + differences_found = False + for id, values in data1.items(): + if id in data2: + if values != data2[id]: + print(f"Mismatch found in {table_name} for ID {id}:") + print(f" Database 1: {values}") + print(f" Database 2: {data2[id]}") + differences_found = True + else: + print(f"ID {id} found in {table_name} of Database 1 but not in Database 2") + differences_found = True + + for id in data2: + if id not in data1: + print(f"ID {id} found in {table_name} of Database 2 but not in Database 1") + differences_found = True + + if not differences_found: + print(f"No differences found in {table_name}") + +def compare_databases(db_path1, db_path2): + # Columns to compare, ignoring time-dependent and event_id columns + events_columns = ["id", "keys", "data", "transaction_hash"] + entities_columns = ["id", "keys"] + transactions_columns = ["id", "transaction_hash", "sender_address", "calldata", "max_fee", "signature", "nonce", "transaction_type"] + + # Fetch data from both databases + events_data_db1 = fetch_table_data(db_path1, "events", events_columns) + events_data_db2 = fetch_table_data(db_path2, "events", events_columns) + entities_data_db1 = fetch_table_data(db_path1, "entities", entities_columns) + entities_data_db2 = fetch_table_data(db_path2, "entities", entities_columns) + transactions_data_db1 = fetch_table_data(db_path1, "transactions", transactions_columns) + transactions_data_db2 = fetch_table_data(db_path2, "transactions", transactions_columns) + + # Get row counts from both databases + events_count_db1 = get_table_row_count(db_path1, "events") + events_count_db2 = get_table_row_count(db_path2, "events") + entities_count_db1 = get_table_row_count(db_path1, "entities") + entities_count_db2 = get_table_row_count(db_path2, "entities") + transactions_count_db1 = get_table_row_count(db_path1, "transactions") + transactions_count_db2 = get_table_row_count(db_path2, "transactions") + + # Print row counts + print(f"Number of rows in events table: Database 1 = {events_count_db1}, Database 2 = {events_count_db2}") + print(f"Number of rows in entities table: Database 1 = {entities_count_db1}, Database 2 = {entities_count_db2}") + print(f"Number of rows in transactions table: Database 1 = {transactions_count_db1}, Database 2 = {transactions_count_db2}") + + # Compare data + print("\nComparing events table:") + compare_data(events_data_db1, events_data_db2, "events") + + print("\nComparing entities table:") + compare_data(entities_data_db1, entities_data_db2, "entities") + + print("\nComparing transactions table:") + compare_data(transactions_data_db1, transactions_data_db2, "transactions") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Compare data in events, entities, and transactions tables between two SQLite databases.") + parser.add_argument("db_path1", help="Path to the first SQLite database") + parser.add_argument("db_path2", help="Path to the second SQLite database") + args = parser.parse_args() + + compare_databases(args.db_path1, args.db_path2) diff --git a/scripts/deploy_erc20_katana.sh b/scripts/deploy_erc20_katana.sh new file mode 100755 index 0000000000..3ad8d87937 --- /dev/null +++ b/scripts/deploy_erc20_katana.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +starkli deploy --account ../account.json --keystore ../signer.json --keystore-password "" 0x02a8846878b6ad1f54f6ba46f5f40e11cee755c677f130b2c4b60566c9003f1f 0x626c6f62 0x424c42 0x8 u256:10000000000 0xb3ff441a68610b30fd5e2abbf3a1548eb6ba6f3559f2862bf2dc757e5828ca --rpc http://localhost:5050 diff --git a/scripts/send_erc20_transfer.sh b/scripts/send_erc20_transfer.sh new file mode 100755 index 0000000000..b321d2fa19 --- /dev/null +++ b/scripts/send_erc20_transfer.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +if [ $# -eq 0 ]; then + echo "Error: Contract address argument is required." + echo "Usage: $0 " + exit 1 +fi + +contract_address=$1 +rpc="http://localhost:5050" + +starkli invoke $contract_address transfer 0x1234 u256:1 --account ../account.json --keystore ../signer.json --keystore-password "" --rpc $rpc diff --git a/spawn-and-move-db.tar.gz b/spawn-and-move-db.tar.gz index ac6788ec41..2dfed8d7fd 100644 Binary files a/spawn-and-move-db.tar.gz and b/spawn-and-move-db.tar.gz differ diff --git a/types-test-db.tar.gz b/types-test-db.tar.gz index b5a45bd0d7..22468eb3ce 100644 Binary files a/types-test-db.tar.gz and b/types-test-db.tar.gz differ