diff --git a/Cargo.lock b/Cargo.lock index bb7394efc4..ac6c55fbc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14758,6 +14758,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "toml 0.8.15", "tracing", ] diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index b1447970de..8949f950d8 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -12,6 +12,7 @@ use std::cmp; use std::net::SocketAddr; +use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -31,6 +32,9 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, Processors}; +use torii_core::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor; +use torii_core::processors::erc20_transfer::Erc20TransferProcessor; +use torii_core::processors::erc721_transfer::Erc721TransferProcessor; use torii_core::processors::event_message::EventMessageProcessor; use torii_core::processors::generate_event_processors_map; use torii_core::processors::metadata_update::MetadataUpdateProcessor; @@ -42,7 +46,7 @@ use torii_core::processors::store_update_member::StoreUpdateMemberProcessor; use torii_core::processors::store_update_record::StoreUpdateRecordProcessor; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::Sql; -use torii_core::types::Model; +use torii_core::types::{ErcContract, ErcType, Model, ToriiConfig}; use torii_server::proxy::Proxy; use tracing::{error, info}; use tracing_subscriber::{fmt, EnvFilter}; @@ -132,11 +136,38 @@ struct Args { /// Max concurrent tasks #[arg(long, default_value = "100")] max_concurrent_tasks: usize, + + /// ERC contract addresses to index + #[arg(long, value_parser = parse_erc_contracts)] + #[arg(conflicts_with = "config")] + erc_contracts: Option>, + + /// Configuration file + #[arg(long)] + config: Option, } #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); + + let mut start_block = args.start_block; + + let mut config = if let Some(path) = args.config { + ToriiConfig::load_from_path(&path)? + } else { + ToriiConfig::default() + }; + + if let Some(erc_contracts) = args.erc_contracts { + config.erc_contracts = erc_contracts; + } + + for address in &config.erc_contracts { + if address.start_block < start_block { + start_block = address.start_block; + } + } let filter_layer = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info,hyper_reverse_proxy=off")); @@ -177,17 +208,26 @@ async fn main() -> anyhow::Result<()> { // Get world address let world = WorldContractReader::new(args.world_address, provider.clone()); - let db = Sql::new(pool.clone(), args.world_address).await?; + let erc_contracts = config + .erc_contracts + .iter() + .map(|contract| (contract.contract_address, contract.clone())) + .collect(); + + let db = Sql::new(pool.clone(), args.world_address, &erc_contracts).await?; let processors = Processors { event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), - Arc::new(MetadataUpdateProcessor), - Arc::new(StoreDelRecordProcessor), - Arc::new(EventMessageProcessor), - Arc::new(StoreUpdateRecordProcessor), - Arc::new(StoreUpdateMemberProcessor), + Box::new(RegisterModelProcessor), + Box::new(StoreSetRecordProcessor), + Box::new(MetadataUpdateProcessor), + Box::new(StoreDelRecordProcessor), + Box::new(EventMessageProcessor), + Box::new(StoreUpdateRecordProcessor), + Box::new(StoreUpdateMemberProcessor), + Box::new(Erc20LegacyTransferProcessor), + Box::new(Erc20TransferProcessor), + Box::new(Erc721TransferProcessor), ])?, transaction: vec![Box::new(StoreTransactionProcessor)], ..Processors::default() @@ -209,6 +249,7 @@ async fn main() -> anyhow::Result<()> { }, shutdown_tx.clone(), Some(block_tx), + erc_contracts, ); let shutdown_rx = shutdown_tx.subscribe(); @@ -305,3 +346,29 @@ async fn spawn_rebuilding_graphql_server( } } } + +// Parses clap cli argument which is expected to be in the format: +// - erc_type:address:start_block +// - address:start_block (erc_type defaults to ERC20) +fn parse_erc_contracts(s: &str) -> anyhow::Result> { + let parts: Vec<&str> = s.split(',').collect(); + let mut contracts = Vec::new(); + for part in parts { + match part.split(':').collect::>().as_slice() { + [r#type, address, start_block] => { + let contract_address = Felt::from_str(address).unwrap(); + let start_block = start_block.parse::()?; + let r#type = r#type.parse::()?; + contracts.push(ErcContract { contract_address, start_block, r#type }); + } + [address, start_block] => { + let contract_address = Felt::from_str(address)?; + let start_block = start_block.parse::()?; + let r#type = ErcType::default(); + contracts.push(ErcContract { contract_address, start_block, r#type }); + } + _ => return Err(anyhow::anyhow!("Invalid ERC contract format")), + } + } + Ok(contracts) +} diff --git a/bin/torii/torii.toml b/bin/torii/torii.toml new file mode 100644 index 0000000000..45305c0301 --- /dev/null +++ b/bin/torii/torii.toml @@ -0,0 +1,8 @@ +# Example configuration file for Torii +# erc_contracts = [ +# { contract_address = "0x1234567890abcdef1234567890abcdef12345678", start_block = 0, type = "ERC20" }, +# { contract_address = "0xabcdef1234567890abcdef1234567890abcdef12", start_block = 1, type = "ERC721" }, +# ] +# erc_contracts = [ +# { type = "ERC20", contract_address = "0x07fc13cc1f43f0b0519f84df8bf13bea4d9fd5ce2d748c3baf27bf90a565f60a", start_block = 0 }, +# ] \ No newline at end of file diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index a22ccfcc9c..a0d8df3647 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -38,6 +38,7 @@ thiserror.workspace = true tokio = { version = "1.32.0", features = [ "sync" ], default-features = true } tokio-stream = "0.1.11" tokio-util = "0.7.7" +toml.workspace = true tracing.workspace = true [dev-dependencies] diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 2330091a4d..75c0103abc 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{btree_map, BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; @@ -6,11 +6,12 @@ use std::time::Duration; use anyhow::Result; use dojo_world::contracts::world::WorldContractReader; +use futures_util::future::join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ - BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, - MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt, - TransactionReceiptWithBlockInfo, TransactionWithReceipt, + BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, Felt, + MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, + ReceiptBlock, TransactionReceipt, TransactionReceiptWithBlockInfo, TransactionWithReceipt, }; use starknet::providers::Provider; use tokio::sync::broadcast::Sender; @@ -22,13 +23,14 @@ use tracing::{debug, error, info, trace, warn}; use crate::processors::event_message::EventMessageProcessor; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; -use crate::sql::Sql; +use crate::sql::{Cursors, Sql}; +use crate::types::ErcContract; #[allow(missing_debug_implementations)] pub struct Processors { pub block: Vec>>, pub transaction: Vec>>, - pub event: HashMap>>, + pub event: HashMap>>>, pub catch_all_event: Box>, } @@ -77,6 +79,7 @@ pub enum FetchDataResult { #[derive(Debug)] pub struct FetchRangeResult { // (block_number, transaction_hash) -> events + // NOTE: LinkedList might contains blocks in different order pub transactions: LinkedHashMap<(u64, Felt), Vec>, pub blocks: BTreeMap, pub latest_block_number: u64, @@ -106,6 +109,8 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, + // ERC tokens to index + tokens: HashMap, tasks: HashMap>, } @@ -115,6 +120,7 @@ struct UnprocessedEvent { } impl Engine

{ + #[allow(clippy::too_many_arguments)] pub fn new( world: WorldContractReader

, db: Sql, @@ -123,6 +129,7 @@ impl Engine

{ config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, + tokens: HashMap, ) -> Self { Self { world: Arc::new(world), @@ -132,15 +139,16 @@ impl Engine

{ config, shutdown_tx, block_tx, + tokens, tasks: HashMap::new(), } } pub async fn start(&mut self) -> Result<()> { // use the start block provided by user if head is 0 - let (head, _, _) = self.db.head().await?; + let (head, _, _) = self.db.head(self.world.address).await?; if head == 0 { - self.db.set_head(self.config.start_block); + self.db.set_head(self.world.address, self.config.start_block); } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -152,12 +160,12 @@ impl Engine

{ let mut erroring_out = false; loop { - let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?; + let cursors = self.db.cursors().await?; tokio::select! { _ = shutdown_rx.recv() => { break Ok(()); } - res = self.fetch_data(head, last_pending_block_world_tx, last_pending_block_tx) => { + res = self.fetch_data(&cursors) => { match res { Ok(fetch_result) => { if erroring_out { @@ -193,22 +201,18 @@ impl Engine

{ } } - pub async fn fetch_data( - &mut self, - from: u64, - last_pending_block_world_tx: Option, - last_pending_block_tx: Option, - ) -> Result { + pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result { let latest_block_number = self.provider.block_hash_and_number().await?.block_number; + let from = cursors.head.unwrap_or(0); let result = if from < latest_block_number { let from = if from == 0 { from } else { from + 1 }; debug!(target: LOG_TARGET, from = %from, to = %latest_block_number, "Fetching data for range."); - let data = - self.fetch_range(from, latest_block_number, last_pending_block_world_tx).await?; + let data = self.fetch_range(from, latest_block_number, &cursors.cursor_map).await?; FetchDataResult::Range(data) } else if self.config.index_pending { - let data = self.fetch_pending(latest_block_number + 1, last_pending_block_tx).await?; + let data = + self.fetch_pending(latest_block_number + 1, cursors.last_pending_block_tx).await?; if let Some(data) = data { FetchDataResult::Pending(data) } else { @@ -225,98 +229,117 @@ impl Engine

{ &mut self, from: u64, to: u64, - last_pending_block_world_tx: Option, + cursor_map: &HashMap, ) -> Result { // Process all blocks from current to latest. - let get_events = |token: Option| { - self.provider.get_events( - EventFilter { - from_block: Some(BlockId::Number(from)), - to_block: Some(BlockId::Number(to)), - address: Some(self.world.address), - keys: None, - }, - token, - self.config.events_chunk_size, - ) + let world_events_filter = EventFilter { + from_block: Some(BlockId::Number(from)), + to_block: Some(BlockId::Number(to)), + address: Some(self.world.address), + keys: None, }; - // handle next events pages - let mut events_pages = vec![get_events(None).await?]; + let mut fetch_all_events_tasks = vec![]; + let world_events_pages = + get_all_events(&self.provider, world_events_filter, self.config.events_chunk_size); - while let Some(token) = &events_pages.last().unwrap().continuation_token { - debug!(target: LOG_TARGET, "Fetching events page with continuation token: {}", &token); - events_pages.push(get_events(Some(token.clone())).await?); - } + fetch_all_events_tasks.push(world_events_pages); - debug!(target: LOG_TARGET, "Total events pages fetched: {}", &events_pages.len()); - // Transactions & blocks to process - let mut last_block = 0_u64; - let mut blocks = BTreeMap::new(); + for token in self.tokens.iter() { + let events_filter = EventFilter { + from_block: Some(BlockId::Number(from)), + to_block: Some(BlockId::Number(to)), + address: Some(*token.0), + keys: None, + }; + let token_events_pages = + get_all_events(&self.provider, events_filter, self.config.events_chunk_size); - // Flatten events pages and events according to the pending block cursor - // to array of (block_number, transaction_hash) - let mut last_pending_block_world_tx_cursor = last_pending_block_world_tx; - let mut transactions = LinkedHashMap::new(); - for events_page in events_pages { - debug!("Processing events page with events: {}", &events_page.events.len()); - for event in events_page.events { - let block_number = match event.block_number { - Some(block_number) => block_number, - // If the block number is not present, try to fetch it from the transaction - // receipt Should not/rarely happen. Thus the additional - // fetch is acceptable. - None => { - let TransactionReceiptWithBlockInfo { receipt, block } = - self.provider.get_transaction_receipt(event.transaction_hash).await?; - - match receipt { - TransactionReceipt::Invoke(_) | TransactionReceipt::L1Handler(_) => { - if let ReceiptBlock::Block { block_number, .. } = block { - block_number - } else { - // If the block is pending, we assume the block number is the - // latest + 1 - to + 1 - } - } + fetch_all_events_tasks.push(token_events_pages); + } - _ => to + 1, + let task_result = join_all(fetch_all_events_tasks).await; + + let mut events = vec![]; + + for result in task_result { + let result = result?; + let contract_address = + result.0.expect("EventFilters that we use always have an address"); + let events_pages = result.1; + let last_contract_tx = cursor_map.get(&contract_address).cloned(); + let mut last_contract_tx_tmp = last_contract_tx; + debug!(target: LOG_TARGET, "Total events pages fetched for contract ({:#x}): {}", &contract_address, &events_pages.len()); + + for events_page in events_pages { + debug!("Processing events page with events: {}", &events_page.events.len()); + for event in events_page.events { + // Then we skip all transactions until we reach the last pending processed + // transaction (if any) + if let Some(last_contract_tx) = last_contract_tx_tmp { + if event.transaction_hash != last_contract_tx { + continue; } + + last_contract_tx_tmp = None; } - }; - // Keep track of last block number and fetch block timestamp - if block_number > last_block { - let block_timestamp = self.get_block_timestamp(block_number).await?; - blocks.insert(block_number, block_timestamp); + // Skip the latest pending block transaction events + // * as we might have multiple events for the same transaction + if let Some(last_contract_tx) = last_contract_tx { + if event.transaction_hash == last_contract_tx { + continue; + } + } - last_block = block_number; + events.push(event); } + } + } - // Then we skip all transactions until we reach the last pending processed - // transaction (if any) - if let Some(tx) = last_pending_block_world_tx_cursor { - if event.transaction_hash != tx { - continue; - } + // Transactions & blocks to process + let mut blocks = BTreeMap::new(); - last_pending_block_world_tx_cursor = None; - } + // Flatten events pages and events according to the pending block cursor + // to array of (block_number, transaction_hash) + let mut transactions = LinkedHashMap::new(); + for event in events { + let block_number = match event.block_number { + Some(block_number) => block_number, + // If the block number is not present, try to fetch it from the transaction + // receipt Should not/rarely happen. Thus the additional + // fetch is acceptable. + None => { + let TransactionReceiptWithBlockInfo { receipt, block } = + self.provider.get_transaction_receipt(event.transaction_hash).await?; + + match receipt { + TransactionReceipt::Invoke(_) | TransactionReceipt::L1Handler(_) => { + if let ReceiptBlock::Block { block_number, .. } = block { + block_number + } else { + // If the block is pending, we assume the block number is the + // latest + 1 + to + 1 + } + } - // Skip the latest pending block transaction events - // * as we might have multiple events for the same transaction - if let Some(tx) = last_pending_block_world_tx { - if event.transaction_hash == tx { - continue; + _ => to + 1, } } + }; - transactions - .entry((block_number, event.transaction_hash)) - .or_insert(vec![]) - .push(event); + // Keep track of last block number and fetch block timestamp + if let btree_map::Entry::Vacant(v) = blocks.entry(block_number) { + debug!("Fetching block timestamp for block number: {}", block_number); + let block_timestamp = self.get_block_timestamp(block_number).await?; + v.insert(block_timestamp); } + + transactions + .entry((block_number, event.transaction_hash)) + .or_insert(vec![]) + .push(event); } debug!("Transactions: {}", &transactions.len()); @@ -368,10 +391,10 @@ impl Engine

{ let mut last_pending_block_tx_cursor = data.last_pending_block_tx; let mut last_pending_block_tx = data.last_pending_block_tx; - let mut last_pending_block_world_tx = None; let timestamp = data.pending_block.timestamp; + let mut cursor_map = HashMap::new(); for t in data.pending_block.transactions { let transaction_hash = t.transaction.transaction_hash(); if let Some(tx) = last_pending_block_tx_cursor { @@ -383,23 +406,25 @@ impl Engine

{ continue; } - match self.process_transaction_with_receipt(&t, data.block_number, timestamp).await { + match self + .process_transaction_with_receipt(&t, data.block_number, timestamp, &mut cursor_map) + .await + { Err(e) => { match e.to_string().as_str() { + // TODO: remove this we now fetch the pending block with receipts so this + // error is no longer relevant "TransactionHashNotFound" => { // We failed to fetch the transaction, which is because // the transaction might not have been processed fast enough by the // provider. So we can fail silently and try // again in the next iteration. warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(data.block_number - 1); + self.db.set_head(self.world.address, data.block_number - 1); if let Some(tx) = last_pending_block_tx { self.db.set_last_pending_block_tx(Some(tx)); } - if let Some(tx) = last_pending_block_world_tx { - self.db.set_last_pending_block_world_tx(Some(tx)); - } self.db.execute().await?; return Ok(()); } @@ -409,11 +434,6 @@ impl Engine

{ } } } - Ok(true) => { - last_pending_block_world_tx = Some(*transaction_hash); - last_pending_block_tx = Some(*transaction_hash); - info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction."); - } Ok(_) => { last_pending_block_tx = Some(*transaction_hash); debug!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.") @@ -424,17 +444,8 @@ impl Engine

{ // Process parallelized events self.process_tasks().await?; - // Set the head to the last processed pending transaction // Head block number should still be latest block number - self.db.set_head(data.block_number - 1); - - if let Some(tx) = last_pending_block_tx { - self.db.set_last_pending_block_tx(Some(tx)); - } - - if let Some(tx) = last_pending_block_world_tx { - self.db.set_last_pending_block_world_tx(Some(tx)); - } + self.db.update_cursors(data.block_number - 1, last_pending_block_tx, cursor_map); self.db.execute().await?; @@ -443,7 +454,7 @@ impl Engine

{ pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> { // Process all transactions - let mut last_block = 0; + let mut processed_blocks = HashSet::new(); for ((block_number, transaction_hash), events) in data.transactions { debug!("Processing transaction hash: {:#x}", transaction_hash); // Process transaction @@ -458,13 +469,13 @@ impl Engine

{ .await?; // Process block - if block_number > last_block { + if !processed_blocks.contains(&block_number) { if let Some(ref block_tx) = self.block_tx { block_tx.send(block_number).await?; } self.process_block(block_number, data.blocks[&block_number]).await?; - last_block = block_number; + processed_blocks.insert(block_number); } if self.db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { @@ -475,9 +486,7 @@ impl Engine

{ // Process parallelized events self.process_tasks().await?; - self.db.set_head(data.latest_block_number); - self.db.set_last_pending_block_world_tx(None); - self.db.set_last_pending_block_tx(None); + self.db.reset_cursors(data.latest_block_number); self.db.execute().await?; @@ -500,14 +509,16 @@ impl Engine

{ let _permit = semaphore.acquire().await.unwrap(); let mut local_db = db.clone(); for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events { - if let Some(processor) = processors.event.get(&event.keys[0]) { - debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); - - if let Err(e) = processor - .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); + if let Some(event_processors) = processors.event.get(&event.keys[0]) { + for processor in event_processors.iter() { + debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); + + if let Err(e) = processor + .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); + } } } } @@ -538,6 +549,7 @@ impl Engine

{ block_number: u64, block_timestamp: u64, ) -> Result<()> { + // Contract -> Cursor for (event_idx, event) in events.iter().enumerate() { let event_id = format!("{:#064x}:{:#x}:{:#04x}", block_number, transaction_hash, event_idx); @@ -547,15 +559,7 @@ impl Engine

{ keys: event.keys.clone(), data: event.data.clone(), }; - Self::process_event( - self, - block_number, - block_timestamp, - &event_id, - &event, - // transaction_hash, - ) - .await?; + Self::process_event(self, block_number, block_timestamp, &event_id, &event).await?; } // Commented out this transaction processor because it requires an RPC call for each @@ -578,7 +582,8 @@ impl Engine

{ transaction_with_receipt: &TransactionWithReceipt, block_number: u64, block_timestamp: u64, - ) -> Result { + cursor_map: &mut HashMap, + ) -> Result<()> { let transaction_hash = transaction_with_receipt.transaction.transaction_hash(); let events = match &transaction_with_receipt.receipt { TransactionReceipt::Invoke(receipt) => Some(&receipt.events), @@ -586,26 +591,19 @@ impl Engine

{ _ => None, }; - let mut world_event = false; if let Some(events) = events { for (event_idx, event) in events.iter().enumerate() { - if event.from_address != self.world.address { + if event.from_address != self.world.address + && !self.tokens.contains_key(&event.from_address) + { continue; } - world_event = true; + cursor_map.insert(event.from_address, *transaction_hash); let event_id = format!("{:#064x}:{:#x}:{:#04x}", block_number, *transaction_hash, event_idx); - Self::process_event( - self, - block_number, - block_timestamp, - &event_id, - event, - // *transaction_hash, - ) - .await?; + Self::process_event(self, block_number, block_timestamp, &event_id, event).await?; } // if world_event { @@ -620,7 +618,7 @@ impl Engine

{ // } } - Ok(world_event) + Ok(()) } async fn process_block(&mut self, block_number: u64, block_timestamp: u64) -> Result<()> { @@ -663,12 +661,10 @@ impl Engine

{ block_timestamp: u64, event_id: &str, event: &Event, - // transaction_hash: Felt, ) -> Result<()> { - // self.db.store_event(event_id, event, transaction_hash, block_timestamp); let event_key = event.keys[0]; - let Some(processor) = self.processors.event.get(&event_key) else { + let Some(processors) = self.processors.event.get(&event_key) else { // if we dont have a processor for this event, we try the catch all processor if self.processors.catch_all_event.validate(event) { if let Err(e) = self @@ -703,14 +699,19 @@ impl Engine

{ return Ok(()); }; - let task_identifier = match processor.event_key().as_str() { - "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { - let mut hasher = DefaultHasher::new(); - event.data[0].hash(&mut hasher); - event.data[1].hash(&mut hasher); - hasher.finish() + // For now we only have 1 processor for store* events + let task_identifier = if processors.len() == 1 { + match processors[0].event_key().as_str() { + "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { + let mut hasher = DefaultHasher::new(); + event.data[0].hash(&mut hasher); + event.data[1].hash(&mut hasher); + hasher.finish() + } + _ => 0, } - _ => 0, + } else { + 0 }; // if we have a task identifier, we queue the event to be parallelized @@ -723,14 +724,54 @@ impl Engine

{ }); } else { // if we dont have a task identifier, we process the event immediately - if let Err(e) = processor - .process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + for processor in processors.iter() { + if !processor.validate(event) { + continue; + } + + if let Err(e) = processor + .process( + &self.world, + &mut self.db, + block_number, + block_timestamp, + event_id, + event, + ) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = ?e, "Processing event."); + } } } Ok(()) } } + +async fn get_all_events

( + provider: &P, + events_filter: EventFilter, + events_chunk_size: u64, +) -> Result<(Option, Vec)> +where + P: Provider + Sync, +{ + let mut events_pages = Vec::new(); + let mut continuation_token = None; + + loop { + let events_page = provider + .get_events(events_filter.clone(), continuation_token.clone(), events_chunk_size) + .await?; + + continuation_token = events_page.continuation_token.clone(); + events_pages.push(events_page); + + if continuation_token.is_none() { + break; + } + } + + Ok((events_filter.address, events_pages)) +} diff --git a/crates/torii/core/src/lib.rs b/crates/torii/core/src/lib.rs index df6e8b3adc..c415bec0f8 100644 --- a/crates/torii/core/src/lib.rs +++ b/crates/torii/core/src/lib.rs @@ -3,7 +3,6 @@ pub mod engine; pub mod error; pub mod model; pub mod processors; -pub mod query_queue; pub mod simple_broker; pub mod sql; pub mod types; diff --git a/crates/torii/core/src/processors/erc20_legacy_transfer.rs b/crates/torii/core/src/processors/erc20_legacy_transfer.rs new file mode 100644 index 0000000000..41852cd89e --- /dev/null +++ b/crates/torii/core/src/processors/erc20_legacy_transfer.rs @@ -0,0 +1,58 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::info; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_legacy_transfer"; + +#[derive(Default, Debug)] +pub struct Erc20LegacyTransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc20LegacyTransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // key: [hash(Transfer)] + // data: [from, to, value.0, value.1] + if event.keys.len() == 1 && event.data.len() == 4 { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.data[0]; + let to = event.data[1]; + + let value = U256Cainome::cairo_deserialize(&event.data, 2)?; + let value = U256::from_words(value.low, value.high); + + db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) + .await?; + info!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/erc20_transfer.rs b/crates/torii/core/src/processors/erc20_transfer.rs new file mode 100644 index 0000000000..a98e288780 --- /dev/null +++ b/crates/torii/core/src/processors/erc20_transfer.rs @@ -0,0 +1,58 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::info; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_transfer"; + +#[derive(Default, Debug)] +pub struct Erc20TransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc20TransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // key: [hash(Transfer), from, to] + // data: [value.0, value.1] + if event.keys.len() == 3 && event.data.len() == 2 { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.keys[1]; + let to = event.keys[2]; + + let value = U256Cainome::cairo_deserialize(&event.data, 0)?; + let value = U256::from_words(value.low, value.high); + + db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) + .await?; + info!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/erc721_transfer.rs b/crates/torii/core/src/processors/erc721_transfer.rs new file mode 100644 index 0000000000..665de6424a --- /dev/null +++ b/crates/torii/core/src/processors/erc721_transfer.rs @@ -0,0 +1,66 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::info; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_transfer"; + +#[derive(Default, Debug)] +pub struct Erc721TransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc721TransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/eabfa029b7b681d9e83bf171f723081b07891016/packages/token/src/erc721/erc721.cairo#L44-L53 + // key: [hash(Transfer), from, to, token_id.low, token_id.high] + // data: [] + if event.keys.len() == 5 && event.data.is_empty() { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.keys[1]; + let to = event.keys[2]; + + let token_id = U256Cainome::cairo_deserialize(&event.keys, 3)?; + let token_id = U256::from_words(token_id.low, token_id.high); + + db.handle_erc721_transfer( + token_address, + from, + to, + token_id, + world.provider(), + block_timestamp, + ) + .await?; + info!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index c6a8f13af5..f4bfddffd3 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use anyhow::{Error, Result}; use async_trait::async_trait; @@ -10,6 +9,12 @@ use starknet::providers::Provider; use crate::sql::Sql; +// pub mod erc20_legacy_transfer; +// pub mod erc20_transfer; +// pub mod erc721_transfer; +pub mod erc20_legacy_transfer; +pub mod erc20_transfer; +pub mod erc721_transfer; pub mod event_message; pub mod metadata_update; pub mod register_model; @@ -74,15 +79,17 @@ pub trait TransactionProcessor: Send + Sync { ) -> Result<(), Error>; } +type EventProcessors

= Vec>>; + /// Given a list of event processors, generate a map of event keys to the event processor pub fn generate_event_processors_map( - event_processor: Vec>>, -) -> Result>>> { + event_processor: EventProcessors

, +) -> Result>> { let mut event_processors = HashMap::new(); for processor in event_processor { let key = get_selector_from_name(processor.event_key().as_str())?; - event_processors.insert(key, processor); + event_processors.entry(key).or_insert(vec![]).push(processor); } Ok(event_processors) diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index c5f70a2a54..38e8e67415 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -8,7 +8,8 @@ use tracing::info; use super::EventProcessor; use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX, NUM_KEYS_INDEX}; -use crate::sql::{felts_sql_string, Sql}; +use crate::sql::utils::felts_sql_string; +use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_set_record"; diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs new file mode 100644 index 0000000000..de26b2300d --- /dev/null +++ b/crates/torii/core/src/sql/erc.rs @@ -0,0 +1,387 @@ +use anyhow::Result; +use cainome::cairo_serde::ByteArray; +use cainome::cairo_serde::CairoSerde; +use starknet::core::types::{BlockId, BlockTag, FunctionCall, U256}; +use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; +use starknet::{core::types::Felt, providers::Provider}; +use std::ops::{Add, Sub}; + +use super::query_queue::{Argument, QueryQueue, QueryType}; +use super::utils::{sql_string_to_u256, u256_to_sql_string}; +use crate::utils::utc_dt_string_from_timestamp; + +use super::Sql; + +impl Sql { + pub async fn handle_erc20_transfer( + &mut self, + contract_address: Felt, + from: Felt, + to: Felt, + amount: U256, + provider: &P, + block_timestamp: u64, + ) -> Result<()> { + // unique token identifier in DB + let token_id = format!("{:#x}", contract_address); + + let token_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tokens WHERE id = ?)") + .bind(token_id.clone()) + .fetch_one(&self.pool) + .await?; + + if !token_exists { + register_erc20_token_metadata( + contract_address, + &mut self.query_queue, + &token_id, + provider, + ) + .await?; + } + + register_erc_transfer_event( + contract_address, + from, + to, + amount, + &token_id, + block_timestamp, + &mut self.query_queue, + ); + + // Update balances in erc20_balance table + { + // NOTE: formatting here should match the format we use for Argument type in QueryQueue + // TODO: abstract this so they cannot mismatch + + // Since balance are stored as TEXT in db, we cannot directly use INSERT OR UPDATE + // statements. + // Fetch balances for both `from` and `to` addresses, update them and write back to db + let query = sqlx::query_as::<_, (String, String)>( + "SELECT account_address, balance FROM balances WHERE contract_address = ? AND \ + account_address IN (?, ?)", + ) + .bind(format!("{:#x}", contract_address)) + .bind(format!("{:#x}", from)) + .bind(format!("{:#x}", to)); + + // (address, balance) + let balances: Vec<(String, String)> = query.fetch_all(&self.pool).await?; + // (address, balance) is primary key in DB, and we are fetching for 2 addresses so there + // should be at most 2 rows returned + assert!(balances.len() <= 2); + + let from_balance = balances + .iter() + .find(|(address, _)| address == &format!("{:#x}", from)) + .map(|(_, balance)| balance.clone()) + .unwrap_or_else(|| format!("{:#64x}", crypto_bigint::U256::ZERO)); + + let to_balance = balances + .iter() + .find(|(address, _)| address == &format!("{:#x}", to)) + .map(|(_, balance)| balance.clone()) + .unwrap_or_else(|| format!("{:#64x}", crypto_bigint::U256::ZERO)); + + let from_balance = sql_string_to_u256(&from_balance); + let to_balance = sql_string_to_u256(&to_balance); + + let new_from_balance = + if from != Felt::ZERO { from_balance.sub(amount) } else { from_balance }; + let new_to_balance = if to != Felt::ZERO { to_balance.add(amount) } else { to_balance }; + + let update_query = " + INSERT INTO balances (id, balance, account_address, contract_address, token_id) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (id) + DO UPDATE SET balance = excluded.balance"; + + if from != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!("{:#x}:{:#x}", from, contract_address)), + Argument::String(u256_to_sql_string(&new_from_balance)), + Argument::FieldElement(from), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + + if to != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!("{:#x}:{:#x}", to, contract_address)), + Argument::String(u256_to_sql_string(&new_to_balance)), + Argument::FieldElement(to), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + } + self.query_queue.execute_all().await?; + + Ok(()) + } + + pub async fn handle_erc721_transfer( + &mut self, + contract_address: Felt, + from: Felt, + to: Felt, + token_id: U256, + provider: &P, + block_timestamp: u64, + ) -> Result<()> { + let token_id = format!("{:#x}:{}", contract_address, u256_to_sql_string(&token_id)); + let token_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tokens WHERE id = ?)") + .bind(token_id.clone()) + .fetch_one(&self.pool) + .await?; + + if !token_exists { + register_erc721_token_metadata( + contract_address, + &mut self.query_queue, + &token_id, + provider, + ) + .await?; + } + + register_erc_transfer_event( + contract_address, + from, + to, + U256::from(1u8), + &token_id, + block_timestamp, + &mut self.query_queue, + ); + + // Update balances in erc721_balances table + { + let update_query = " + INSERT INTO balances (id, balance, account_address, contract_address, token_id) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (id) + DO UPDATE SET balance = excluded.balance"; + + if from != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!( + "{}{FELT_DELIMITER}{}", + felt_to_sql_string(&from_address), + &token_id + )), + Argument::String(u256_to_sql_string(&U256::from(0u8))), + Argument::FieldElement(from_address), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + + if to != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!( + "{}{FELT_DELIMITER}{}", + felt_to_sql_string(&to_address), + &token_id + )), + Argument::String(u256_to_sql_string(&U256::from(1u8))), + Argument::FieldElement(to_address), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + } + self.query_queue.execute_all().await?; + + Ok(()) + } +} + +async fn register_erc20_token_metadata( + contract_address: Felt, + queue: &mut QueryQueue, + token_id: &str, + provider: &P, +) -> Result<()> { + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + // len = 1 => return value felt (i.e. legacy erc20 token) + // len > 1 => return value ByteArray (i.e. new erc20 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let decimals = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("decimals").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let decimals = u8::cairo_deserialize(&decimals, 0).expect("Return value not u8"); + + // Insert the token into the tokens table + queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + + Ok(()) +} + +async fn register_erc721_token_metadata( + contract_address: Felt, + queue: &mut QueryQueue, + token_id: &str, + provider: &P, +) -> Result<()> { + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + // len = 1 => return value felt (i.e. legacy erc721 token) + // len > 1 => return value ByteArray (i.e. new erc721 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let decimals = 0; + + // Insert the token into the tokens table + queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + + Ok(()) +} + +fn register_erc_transfer_event( + contract_address: Felt, + from: Felt, + to: Felt, + amount: U256, + token_id: &str, + block_timestamp: u64, + queue: &mut QueryQueue, +) { + let insert_query = "INSERT INTO erc_transfers (contract_address, from_address, to_address, \ + amount, token_id, executed_at) VALUES (?, ?, ?, ?, ?, ?)"; + + queue.enqueue( + insert_query, + vec![ + Argument::FieldElement(contract_address), + Argument::FieldElement(from), + Argument::FieldElement(to), + Argument::String(u256_to_sql_string(&amount)), + Argument::String(token_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + QueryType::Other, + ); +} diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql/mod.rs similarity index 90% rename from crates/torii/core/src/sql.rs rename to crates/torii/core/src/sql/mod.rs index ccca4f4c7d..15ea37bc39 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::convert::TryInto; use std::str::FromStr; use std::sync::Arc; @@ -9,16 +10,18 @@ use dojo_types::schema::{EnumOption, Member, Struct, Ty}; use dojo_world::contracts::abi::model::Layout; use dojo_world::contracts::naming::compute_selector_from_names; use dojo_world::metadata::WorldMetadata; +use query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; use tracing::{debug, warn}; +use utils::felts_sql_string; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; use crate::types::{ - Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, + ErcContract, Event as EventEmitted, EventMessage as EventMessageUpdated, + Model as ModelRegistered, }; use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; @@ -28,9 +31,12 @@ type IsStoreUpdate = bool; pub const WORLD_CONTRACT_TYPE: &str = "WORLD"; pub const FELT_DELIMITER: &str = "/"; +pub mod erc; +pub mod query_queue; #[cfg(test)] -#[path = "sql_test.rs"] +#[path = "test.rs"] mod test; +pub mod utils; #[derive(Debug)] pub struct Sql { @@ -40,6 +46,13 @@ pub struct Sql { model_cache: Arc, } +#[derive(Debug, Clone)] +pub struct Cursors { + pub cursor_map: HashMap, + pub last_pending_block_tx: Option, + pub head: Option, +} + impl Clone for Sql { fn clone(&self) -> Self { Self { @@ -52,7 +65,11 @@ impl Clone for Sql { } impl Sql { - pub async fn new(pool: Pool, world_address: Felt) -> Result { + pub async fn new( + pool: Pool, + world_address: Felt, + erc_contracts: &HashMap, + ) -> Result { let mut query_queue = QueryQueue::new(pool.clone()); query_queue.enqueue( @@ -66,6 +83,19 @@ impl Sql { QueryType::Other, ); + for contract in erc_contracts.values() { + query_queue.enqueue( + "INSERT OR IGNORE INTO contracts (id, contract_address, contract_type) VALUES (?, \ + ?, ?)", + vec![ + Argument::FieldElement(contract.contract_address), + Argument::FieldElement(contract.contract_address), + Argument::String(contract.r#type.to_string()), + ], + QueryType::Other, + ); + } + query_queue.execute_all().await?; Ok(Self { @@ -92,14 +122,14 @@ impl Sql { Ok(()) } - pub async fn head(&self) -> Result<(u64, Option, Option)> { + pub async fn head(&self, contract: Felt) -> Result<(u64, Option, Option)> { let mut conn: PoolConnection = self.pool.acquire().await?; let indexer_query = sqlx::query_as::<_, (Option, Option, Option, String)>( - "SELECT head, last_pending_block_world_tx, last_pending_block_tx, contract_type \ - FROM contracts WHERE id = ?", + "SELECT head, last_pending_block_contract_tx, last_pending_block_tx, \ + contract_type FROM contracts WHERE id = ?", ) - .bind(format!("{:#x}", self.world_address)); + .bind(format!("{:#x}", contract)); let indexer: (Option, Option, Option, String) = indexer_query.fetch_one(&mut *conn).await?; @@ -110,9 +140,9 @@ impl Sql { )) } - pub fn set_head(&mut self, head: u64) { + pub fn set_head(&mut self, contract: Felt, head: u64) { let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); - let id = Argument::FieldElement(self.world_address); + let id = Argument::FieldElement(contract); self.query_queue.enqueue( "UPDATE contracts SET head = ? WHERE id = ?", vec![head, id], @@ -120,18 +150,22 @@ impl Sql { ); } - pub fn set_last_pending_block_world_tx(&mut self, last_pending_block_world_tx: Option) { - let last_pending_block_world_tx = if let Some(f) = last_pending_block_world_tx { + pub fn set_last_pending_block_contract_tx( + &mut self, + contract: Felt, + last_pending_block_contract_tx: Option, + ) { + let last_pending_block_contract_tx = if let Some(f) = last_pending_block_contract_tx { Argument::String(format!("{:#x}", f)) } else { Argument::Null }; - let id = Argument::FieldElement(self.world_address); + let id = Argument::FieldElement(contract); self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?", - vec![last_pending_block_world_tx, id], + "UPDATE contracts SET last_pending_block_contract_tx = ? WHERE id = ?", + vec![last_pending_block_contract_tx, id], QueryType::Other, ); } @@ -142,11 +176,86 @@ impl Sql { } else { Argument::Null }; - let id = Argument::FieldElement(self.world_address); self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_tx = ? WHERE id = ?", - vec![last_pending_block_tx, id], + "UPDATE contracts SET last_pending_block_tx = ? WHERE 1=1", + vec![last_pending_block_tx], + QueryType::Other, + ) + } + + pub(crate) async fn cursors(&self) -> Result { + let mut conn: PoolConnection = self.pool.acquire().await?; + let cursors = sqlx::query_as::<_, (String, String)>( + "SELECT contract_address, last_pending_block_contract_tx FROM contracts WHERE \ + last_pending_block_contract_tx IS NOT NULL", + ) + .fetch_all(&mut *conn) + .await?; + + let (head, last_pending_block_tx) = sqlx::query_as::<_, (Option, Option)>( + "SELECT head, last_pending_block_tx FROM contracts WHERE 1=1", + ) + .fetch_one(&mut *conn) + .await?; + + let head = head.map(|h| h.try_into().expect("doesn't fit in u64")); + let last_pending_block_tx = + last_pending_block_tx.map(|t| Felt::from_str(&t).expect("its a valid felt")); + Ok(Cursors { + cursor_map: cursors + .into_iter() + .map(|(c, t)| { + ( + Felt::from_str(&c).expect("its a valid felt"), + Felt::from_str(&t).expect("its a valid felt"), + ) + }) + .collect(), + last_pending_block_tx, + head, + }) + } + + pub fn update_cursors( + &mut self, + head: u64, + last_pending_block_tx: Option, + cursor_map: HashMap, + ) { + let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); + let last_pending_block_tx = if let Some(f) = last_pending_block_tx { + Argument::String(format!("{:#x}", f)) + } else { + Argument::Null + }; + + self.query_queue.enqueue( + "UPDATE contracts SET head = ?, last_pending_block_tx = ? WHERE 1=1", + vec![head, last_pending_block_tx], + QueryType::Other, + ); + + for cursor in cursor_map { + let tx = Argument::FieldElement(cursor.1); + let contract = Argument::FieldElement(cursor.0); + + self.query_queue.enqueue( + "UPDATE contracts SET last_pending_block_contract_tx = ? WHERE id = ?", + vec![tx, contract], + QueryType::Other, + ); + } + } + + // For a given contract address, sets head to the passed value and sets + // last_pending_block_contract_tx and last_pending_block_tx to null + pub fn reset_cursors(&mut self, head: u64) { + let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); + self.query_queue.enqueue( + "UPDATE contracts SET head = ?, last_pending_block_contract_tx = ?, \ + last_pending_block_tx = ? WHERE 1=1", + vec![head, Argument::Null, Argument::Null], QueryType::Other, ); } @@ -1151,8 +1260,3 @@ impl Sql { Ok(()) } } - -pub fn felts_sql_string(felts: &[Felt]) -> String { - felts.iter().map(|k| format!("{:#x}", k)).collect::>().join(FELT_DELIMITER) - + FELT_DELIMITER -} diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/sql/query_queue.rs similarity index 99% rename from crates/torii/core/src/query_queue.rs rename to crates/torii/core/src/sql/query_queue.rs index 589035ca4e..ba3af4ba22 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/sql/query_queue.rs @@ -5,6 +5,7 @@ use dojo_types::schema::Ty; use sqlx::{FromRow, Pool, Sqlite}; use starknet::core::types::Felt; +use super::utils::felt_to_sql_string; use crate::simple_broker::SimpleBroker; use crate::types::{ Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql/test.rs similarity index 95% rename from crates/torii/core/src/sql_test.rs rename to crates/torii/core/src/sql/test.rs index b60ea3de36..c2a2bd97da 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql/test.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -44,20 +45,21 @@ where provider, Processors { event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), - Arc::new(StoreUpdateRecordProcessor), - Arc::new(StoreUpdateMemberProcessor), - Arc::new(StoreDelRecordProcessor), + Box::new(RegisterModelProcessor), + Box::new(StoreSetRecordProcessor), + Box::new(StoreUpdateRecordProcessor), + Box::new(StoreUpdateMemberProcessor), + Box::new(StoreDelRecordProcessor), ])?, ..Processors::default() }, EngineConfig::default(), shutdown_tx, None, + HashMap::new(), ); - let data = engine.fetch_range(0, to, None).await.unwrap(); + let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); Ok(engine) @@ -126,7 +128,7 @@ async fn test_load_from_remote() { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let mut db = Sql::new(pool.clone(), world_reader.address, &HashMap::new()).await.unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), provider).await.unwrap(); @@ -286,7 +288,7 @@ async fn test_load_from_remote_del() { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let mut db = Sql::new(pool.clone(), world_reader.address, &HashMap::new()).await.unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), provider).await; @@ -376,7 +378,7 @@ async fn test_update_with_set_record() { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let mut db = Sql::new(pool.clone(), world_reader.address, &HashMap::new()).await.unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); diff --git a/crates/torii/core/src/sql/utils.rs b/crates/torii/core/src/sql/utils.rs new file mode 100644 index 0000000000..d9cbf1fab9 --- /dev/null +++ b/crates/torii/core/src/sql/utils.rs @@ -0,0 +1,18 @@ +use starknet::core::types::U256; +use starknet_crypto::Felt; + +use super::FELT_DELIMITER; + +pub fn felts_sql_string(felts: &[Felt]) -> String { + felts.iter().map(|k| format!("{:#x}", k)).collect::>().join(FELT_DELIMITER) + + FELT_DELIMITER +} + +pub(crate) fn u256_to_sql_string(u256: &U256) -> String { + format!("{:#064x}", u256) +} + +pub(crate) fn sql_string_to_u256(sql_string: &str) -> U256 { + let sql_string = sql_string.strip_prefix("0x").unwrap_or(sql_string); + U256::from(crypto_bigint::U256::from_be_hex(sql_string)) +} diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index de75fca94a..0e9031ae0b 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -1,4 +1,6 @@ use core::fmt; +use std::path::PathBuf; +use std::str::FromStr; use chrono::{DateTime, Utc}; use dojo_types::schema::Ty; @@ -84,3 +86,52 @@ pub struct Event { pub executed_at: DateTime, pub created_at: DateTime, } + +#[derive(Default, Deserialize, Debug, Clone)] +pub struct ToriiConfig { + /// ERC contract addresses to index + pub erc_contracts: Vec, +} + +impl ToriiConfig { + pub fn load_from_path(path: &PathBuf) -> Result { + let config = std::fs::read_to_string(path)?; + let config: Self = toml::from_str(&config)?; + Ok(config) + } +} + +#[derive(Default, Deserialize, Debug, Clone)] +pub struct ErcContract { + pub contract_address: Felt, + pub start_block: u64, + pub r#type: ErcType, +} + +#[derive(Default, Deserialize, Debug, Clone)] +pub enum ErcType { + #[default] + ERC20, + ERC721, +} + +impl FromStr for ErcType { + type Err = anyhow::Error; + + fn from_str(input: &str) -> Result { + match input.to_lowercase().as_str() { + "erc20" => Ok(ErcType::ERC20), + "erc721" => Ok(ErcType::ERC721), + _ => Err(anyhow::anyhow!("Invalid ERC type: {}", input)), + } + } +} + +impl std::fmt::Display for ErcType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ErcType::ERC20 => write!(f, "ERC20"), + ErcType::ERC721 => write!(f, "ERC721"), + } + } +} diff --git a/crates/torii/graphql/src/tests/metadata_test.rs b/crates/torii/graphql/src/tests/metadata_test.rs index 53ff0367ff..86205be342 100644 --- a/crates/torii/graphql/src/tests/metadata_test.rs +++ b/crates/torii/graphql/src/tests/metadata_test.rs @@ -1,5 +1,7 @@ #[cfg(test)] mod tests { + use std::collections::HashMap; + use dojo_world::config::ProfileConfig; use dojo_world::metadata::WorldMetadata; use sqlx::SqlitePool; @@ -48,7 +50,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_metadata(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); let schema = build_schema(&pool).await.unwrap(); let cover_img = "QWxsIHlvdXIgYmFzZSBiZWxvbmcgdG8gdXM="; @@ -101,7 +103,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_empty_content(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); let schema = build_schema(&pool).await.unwrap(); db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP); diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index 133b46075e..d5064d5704 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -350,7 +351,7 @@ pub async fn spinup_types_test() -> Result { let world = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let db = Sql::new(pool.clone(), strat.world_address).await.unwrap(); + let db = Sql::new(pool.clone(), strat.world_address, &HashMap::new()).await.unwrap(); let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( @@ -359,9 +360,9 @@ pub async fn spinup_types_test() -> Result { Arc::clone(&provider), Processors { event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), - Arc::new(StoreDelRecordProcessor), + Box::new(RegisterModelProcessor), + Box::new(StoreSetRecordProcessor), + Box::new(StoreDelRecordProcessor), ]) .unwrap(), ..Processors::default() @@ -369,10 +370,11 @@ pub async fn spinup_types_test() -> Result { EngineConfig::default(), shutdown_tx, None, + HashMap::new(), ); let to = account.provider().block_hash_and_number().await?.block_number; - let data = engine.fetch_range(0, to, None).await.unwrap(); + let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); Ok(pool) diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index 363082878a..3f753572b5 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -1,5 +1,6 @@ #[cfg(test)] mod tests { + use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; @@ -21,7 +22,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); model_fixtures(&mut db).await; // 0. Preprocess expected entity value @@ -156,7 +157,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription_with_id(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); model_fixtures(&mut db).await; // 0. Preprocess expected entity value @@ -271,7 +272,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); // 0. Preprocess model value let namespace = "types_test".to_string(); let model_name = "Subrecord".to_string(); @@ -336,7 +337,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription_with_id(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); // 0. Preprocess model value let namespace = "types_test".to_string(); let model_name = "Subrecord".to_string(); @@ -402,7 +403,7 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_event_emitted(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await.unwrap(); let block_timestamp: u64 = 1710754478_u64; let (tx, mut rx) = mpsc::channel(7); tokio::spawn(async move { diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index ba35e24b8a..f60a081fcb 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -95,7 +96,7 @@ async fn test_entities_queries() { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); - let db = Sql::new(pool.clone(), strat.world_address).await.unwrap(); + let db = Sql::new(pool.clone(), strat.world_address, &HashMap::new()).await.unwrap(); let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( @@ -104,8 +105,8 @@ async fn test_entities_queries() { Arc::clone(&provider), Processors { event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), + Box::new(RegisterModelProcessor), + Box::new(StoreSetRecordProcessor), ]) .unwrap(), ..Processors::default() @@ -113,10 +114,11 @@ async fn test_entities_queries() { EngineConfig::default(), shutdown_tx, None, + HashMap::new(), ); let to = provider.block_hash_and_number().await.unwrap().block_number; - let data = engine.fetch_range(0, to, None).await.unwrap(); + let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); let (_, receiver) = tokio::sync::mpsc::channel(1); diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index 9bc1e25ce3..3046a2758f 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -25,7 +25,8 @@ use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall}; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; use starknet_crypto::poseidon_hash_many; -use torii_core::sql::{felts_sql_string, Sql}; +use torii_core::sql::utils::felts_to_sql_string; +use torii_core::sql::Sql; use tracing::{info, warn}; use webrtc::tokio::Certificate; diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index 7ef1472068..dec12e23c6 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -524,6 +524,7 @@ mod test { #[cfg(not(target_arch = "wasm32"))] #[tokio::test] async fn test_client_messaging() -> Result<(), Box> { + use std::collections::HashMap; use std::time::Duration; use dojo_types::schema::{Member, Struct, Ty}; @@ -559,7 +560,7 @@ mod test { let account = sequencer.account_data(0); - let mut db = Sql::new(pool.clone(), Felt::ZERO).await?; + let mut db = Sql::new(pool.clone(), Felt::ZERO, &HashMap::new()).await?; // Register the model of our Message db.register_model( diff --git a/crates/torii/migrations/20240913104418_add_erc.sql b/crates/torii/migrations/20240913104418_add_erc.sql new file mode 100644 index 0000000000..aca9f4d817 --- /dev/null +++ b/crates/torii/migrations/20240913104418_add_erc.sql @@ -0,0 +1,35 @@ +CREATE TABLE balances ( + -- account_address:contract_address:token_id + id TEXT NOT NULL PRIMARY KEY, + balance TEXT NOT NULL, + account_address TEXT NOT NULL, + contract_address TEXT NOT NULL, + -- contract_address:token_id + token_id TEXT NOT NULL, + FOREIGN KEY (token_id) REFERENCES tokens(id) +); + +CREATE INDEX balances_account_address ON balances (account_address); +CREATE INDEX balances_contract_address ON balances (contract_address); + +CREATE TABLE tokens ( + -- contract_address:token_id + id TEXT NOT NULL PRIMARY KEY, + contract_address TEXT NOT NULL, + name TEXT NOT NULL, + symbol TEXT NOT NULL, + decimals INTEGER NOT NULL, + FOREIGN KEY (contract_address) REFERENCES contracts(id) +); + +CREATE TABLE erc_transfers ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + contract_address TEXT NOT NULL, + from_address TEXT NOT NULL, + to_address TEXT NOT NULL, + amount TEXT NOT NULL, + -- contract_address:token_id + token_id TEXT NOT NULL, + executed_at DATETIME NOT NULL, + FOREIGN KEY (token_id) REFERENCES tokens(id) +); diff --git a/crates/torii/migrations/20240918200125_rename_column_contracts_table.sql b/crates/torii/migrations/20240918200125_rename_column_contracts_table.sql new file mode 100644 index 0000000000..3213853e8a --- /dev/null +++ b/crates/torii/migrations/20240918200125_rename_column_contracts_table.sql @@ -0,0 +1 @@ +ALTER TABLE contracts RENAME COLUMN last_pending_block_world_tx TO last_pending_block_contract_tx; diff --git a/scripts/deploy_erc20_katana.sh b/scripts/deploy_erc20_katana.sh new file mode 100755 index 0000000000..3ad8d87937 --- /dev/null +++ b/scripts/deploy_erc20_katana.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +starkli deploy --account ../account.json --keystore ../signer.json --keystore-password "" 0x02a8846878b6ad1f54f6ba46f5f40e11cee755c677f130b2c4b60566c9003f1f 0x626c6f62 0x424c42 0x8 u256:10000000000 0xb3ff441a68610b30fd5e2abbf3a1548eb6ba6f3559f2862bf2dc757e5828ca --rpc http://localhost:5050 diff --git a/scripts/send_erc20_transfer.sh b/scripts/send_erc20_transfer.sh new file mode 100755 index 0000000000..b321d2fa19 --- /dev/null +++ b/scripts/send_erc20_transfer.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +if [ $# -eq 0 ]; then + echo "Error: Contract address argument is required." + echo "Usage: $0 " + exit 1 +fi + +contract_address=$1 +rpc="http://localhost:5050" + +starkli invoke $contract_address transfer 0x1234 u256:1 --account ../account.json --keystore ../signer.json --keystore-password "" --rpc $rpc