Skip to content

Commit

Permalink
refactor(torii-core): cleanups (#2702)
Browse files Browse the repository at this point in the history
* chore(torii-core): cleanups

* hlper functions for evetn rpocessor config

* disable raw events by default

* remove start block entirely

* cleanup constants

* fmt

* fix: clippy/fmt

* fix: ensure the default for EventOptions matches the CLAP values

* tag cleanups

---------

Co-authored-by: glihm <[email protected]>
  • Loading branch information
Larkooo and glihm authored Nov 20, 2024
1 parent 0342464 commit d005475
Show file tree
Hide file tree
Showing 29 changed files with 96 additions and 287 deletions.
5 changes: 3 additions & 2 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ async fn main() -> anyhow::Result<()> {
if args.events.raw {
flags.insert(IndexingFlags::RAW_EVENTS);
}
if args.indexing.pending {
flags.insert(IndexingFlags::PENDING_BLOCKS);
}

let mut engine: Engine<Arc<JsonRpcClient<HttpTransport>>> = Engine::new(
world,
Expand All @@ -146,10 +149,8 @@ async fn main() -> anyhow::Result<()> {
processors,
EngineConfig {
max_concurrent_tasks: args.indexing.max_concurrent_tasks,
start_block: 0,
blocks_chunk_size: args.indexing.blocks_chunk_size,
events_chunk_size: args.indexing.events_chunk_size,
index_pending: args.indexing.pending,
polling_interval: Duration::from_millis(args.indexing.polling_interval),
flags,
event_processor_config: EventProcessorConfig {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ mod test {
db_dir = "/tmp/torii-test"
[events]
raw = false
raw = true
historical = [
"ns-E",
"ns-EH"
Expand Down Expand Up @@ -291,7 +291,7 @@ mod test {
assert_eq!(torii_args.world_address, Some(Felt::from_str("0x1234").unwrap()));
assert_eq!(torii_args.rpc, Url::parse("http://0.0.0.0:2222").unwrap());
assert_eq!(torii_args.db_dir, Some(PathBuf::from("/tmp/torii-test")));
assert!(!torii_args.events.raw);
assert!(torii_args.events.raw);
assert_eq!(torii_args.events.historical, vec!["ns-E".to_string(), "ns-EH".to_string()]);
assert_eq!(torii_args.indexing.events_chunk_size, 9999);
assert_eq!(torii_args.indexing.blocks_chunk_size, 10240);
Expand Down
10 changes: 2 additions & 8 deletions crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ impl IndexingOptions {
}
}

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq, Default)]
#[command(next_help_heading = "Events indexing options")]
pub struct EventsOptions {
/// Whether or not to index raw events
#[arg(long = "events.raw", action = ArgAction::Set, default_value_t = true, help = "Whether or not to index raw events.")]
#[arg(long = "events.raw", action = ArgAction::Set, default_value_t = false, help = "Whether or not to index raw events.")]
#[serde(default)]
pub raw: bool,

Expand All @@ -229,12 +229,6 @@ pub struct EventsOptions {
pub historical: Vec<String>,
}

impl Default for EventsOptions {
fn default() -> Self {
Self { raw: true, historical: vec![] }
}
}

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[command(next_help_heading = "HTTP server options")]
pub struct ServerOptions {
Expand Down
13 changes: 13 additions & 0 deletions crates/torii/core/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
pub const TOKEN_BALANCE_TABLE: &str = "token_balances";
pub const TOKEN_TRANSFER_TABLE: &str = "token_transfers";
pub const TOKENS_TABLE: &str = "tokens";

pub(crate) const LOG_TARGET: &str = "torii_core::engine";
pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000;

pub const IPFS_URL: &str = "https://ipfs.io/ipfs/";
pub const IPFS_CLIENT_MAX_RETRY: u8 = 3;

pub const IPFS_CLIENT_URL: &str = "https://ipfs.infura.io:5001";
pub const IPFS_CLIENT_USERNAME: &str = "2EBrzr7ZASQZKH32sl2xWauXPSA";
pub const IPFS_CLIENT_PASSWORD: &str = "12290b883db9138a8ae3363b6739d220";

pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const SQL_FELT_DELIMITER: &str = "/";
18 changes: 3 additions & 15 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::task::JoinSet;
use tokio::time::{sleep, Instant};
use tracing::{debug, error, info, trace, warn};

use crate::constants::LOG_TARGET;
use crate::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor;
use crate::processors::erc20_transfer::Erc20TransferProcessor;
use crate::processors::erc721_legacy_transfer::Erc721LegacyTransferProcessor;
Expand Down Expand Up @@ -128,24 +129,21 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Processors<P> {
self.event_processors.get(&contract_type).unwrap()
}
}
pub(crate) const LOG_TARGET: &str = "torii_core::engine";
pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000;

bitflags! {
#[derive(Debug, Clone)]
pub struct IndexingFlags: u32 {
const TRANSACTIONS = 0b00000001;
const RAW_EVENTS = 0b00000010;
const PENDING_BLOCKS = 0b00000100;
}
}

#[derive(Debug)]
pub struct EngineConfig {
pub polling_interval: Duration,
pub start_block: u64,
pub blocks_chunk_size: u64,
pub events_chunk_size: u64,
pub index_pending: bool,
pub max_concurrent_tasks: usize,
pub flags: IndexingFlags,
pub event_processor_config: EventProcessorConfig,
Expand All @@ -155,10 +153,8 @@ impl Default for EngineConfig {
fn default() -> Self {
Self {
polling_interval: Duration::from_millis(500),
start_block: 0,
blocks_chunk_size: 10240,
events_chunk_size: 1024,
index_pending: true,
max_concurrent_tasks: 100,
flags: IndexingFlags::empty(),
event_processor_config: EventProcessorConfig::default(),
Expand Down Expand Up @@ -245,14 +241,6 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

pub async fn start(&mut self) -> Result<()> {
// use the start block provided by user if head is 0
let (head, _, _) = self.db.head(self.world.address).await?;
if head == 0 {
self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?;
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}

let mut backoff_delay = Duration::from_secs(1);
let max_backoff_delay = Duration::from_secs(60);

Expand Down Expand Up @@ -324,7 +312,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let data = self.fetch_range(from, to, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range.");
FetchDataResult::Range(data)
} else if self.config.index_pending {
} else if self.config.flags.contains(IndexingFlags::PENDING_BLOCKS) {
let data =
self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data.");
Expand Down
9 changes: 4 additions & 5 deletions crates/torii/core/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ use starknet_crypto::Felt;
use tracing::{debug, trace};

use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::TOKEN_BALANCE_TABLE;
use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256};
use crate::sql::FELT_DELIMITER;
use crate::types::ContractType;
use crate::utils::{fetch_content_from_ipfs, MAX_RETRY};
use crate::utils::fetch_content_from_ipfs;

#[derive(Debug, Clone)]
pub struct RegisterErc721TokenQuery {
Expand Down Expand Up @@ -50,7 +49,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
) -> Result<()> {
let erc_cache = apply_balance_diff.erc_cache;
for ((contract_type, id_str), balance) in erc_cache.iter() {
let id = id_str.split(FELT_DELIMITER).collect::<Vec<&str>>();
let id = id_str.split(SQL_FELT_DELIMITER).collect::<Vec<&str>>();
match contract_type {
ContractType::WORLD => unreachable!(),
ContractType::ERC721 => {
Expand Down Expand Up @@ -228,7 +227,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
uri if uri.starts_with("ipfs") => {
let cid = uri.strip_prefix("ipfs://").unwrap();
debug!(cid = %cid, "Fetching metadata from IPFS");
let response = fetch_content_from_ipfs(cid, MAX_RETRY)
let response = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY)
.await
.context("Failed to fetch metadata from IPFS")?;

Expand Down
5 changes: 3 additions & 2 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::str::FromStr;

use async_trait::async_trait;
use crypto_bigint::U256;
use dojo_types::naming::get_tag;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty};
use dojo_world::contracts::abigen::model::Layout;
Expand Down Expand Up @@ -210,10 +211,10 @@ pub fn parse_sql_model_members(
}

Ty::Struct(Struct {
name: format!("{}-{}", namespace, model),
name: get_tag(namespace, model),
children: model_members_all
.iter()
.filter(|m| m.id == format!("{}-{}", namespace, model))
.filter(|m| m.id == get_tag(namespace, model))
.map(|m| Member {
key: m.key,
name: m.name.to_owned(),
Expand Down
7 changes: 2 additions & 5 deletions crates/torii/core/src/processors/event_message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
use dojo_world::contracts::naming::get_tag;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, Felt};
use starknet::providers::Provider;
Expand Down Expand Up @@ -65,16 +66,12 @@ where
"Store event message."
);

// TODO: check historical and keep the internal counter.

let mut keys_and_unpacked = [event.keys, event.values].concat();

let mut entity = model.schema.clone();
entity.deserialize(&mut keys_and_unpacked)?;

// TODO: this must come from some torii's configuration.
let historical =
config.historical_events.contains(&format!("{}-{}", model.namespace, model.name));
let historical = config.is_historical(&get_tag(&model.namespace, &model.name));
db.set_event_message(entity, event_id, block_timestamp, historical).await?;
Ok(())
}
Expand Down
7 changes: 4 additions & 3 deletions crates/torii/core/src/processors/metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use starknet::providers::Provider;
use tracing::{error, info};

use super::{EventProcessor, EventProcessorConfig};
use crate::constants::IPFS_CLIENT_MAX_RETRY;
use crate::sql::Sql;
use crate::utils::{fetch_content_from_ipfs, MAX_RETRY};
use crate::utils::fetch_content_from_ipfs;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::metadata_update";

Expand Down Expand Up @@ -106,7 +107,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option<String>, Opt
let uri = Uri::Ipfs(uri_str);
let cid = uri.cid().ok_or("Uri is malformed").map_err(Error::msg)?;

let bytes = fetch_content_from_ipfs(cid, MAX_RETRY).await?;
let bytes = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY).await?;
let metadata: WorldMetadata = serde_json::from_str(std::str::from_utf8(&bytes)?)?;

let icon_img = fetch_image(&metadata.icon_uri).await;
Expand All @@ -117,7 +118,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option<String>, Opt

async fn fetch_image(image_uri: &Option<Uri>) -> Option<String> {
if let Some(uri) = image_uri {
let data = fetch_content_from_ipfs(uri.cid()?, MAX_RETRY).await.ok()?;
let data = fetch_content_from_ipfs(uri.cid()?, IPFS_CLIENT_MAX_RETRY).await.ok()?;
let encoded = general_purpose::STANDARD.encode(data);
return Some(encoded);
}
Expand Down
10 changes: 10 additions & 0 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ pub struct EventProcessorConfig {
pub namespaces: HashSet<String>,
}

impl EventProcessorConfig {
pub fn is_historical(&self, tag: &str) -> bool {
self.historical_events.contains(tag)
}

pub fn should_index(&self, namespace: &str) -> bool {
self.namespaces.is_empty() || self.namespaces.contains(namespace)
}
}

#[async_trait]
pub trait EventProcessor<P>: Send + Sync
where
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/register_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where

// If the namespace is not in the list of namespaces to index, silently ignore it.
// If our config is empty, we index all namespaces.
if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) {
if !config.should_index(&namespace) {
return Ok(());
}

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/register_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where

// If the namespace is not in the list of namespaces to index, silently ignore it.
// If our config is empty, we index all namespaces.
if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) {
if !config.should_index(&namespace) {
return Ok(());
}

Expand Down
6 changes: 3 additions & 3 deletions crates/torii/core/src/sql/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string};
use starknet::providers::Provider;

use super::utils::{u256_to_sql_string, I256};
use super::{Sql, FELT_DELIMITER};
use super::{Sql, SQL_FELT_DELIMITER};
use crate::constants::TOKEN_TRANSFER_TABLE;
use crate::executor::{
ApplyBalanceDiffQuery, Argument, QueryMessage, QueryType, RegisterErc20TokenQuery,
Expand Down Expand Up @@ -108,7 +108,7 @@ impl Sql {
// from_address/contract_address:id
if from_address != Felt::ZERO {
let from_balance_id =
format!("{}{FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id);
format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id);
let from_balance = self
.local_cache
.erc_cache
Expand All @@ -119,7 +119,7 @@ impl Sql {

if to_address != Felt::ZERO {
let to_balance_id =
format!("{}{FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id);
format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id);
let to_balance = self
.local_cache
.erc_cache
Expand Down
10 changes: 4 additions & 6 deletions crates/torii/core/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use dojo_types::naming::get_tag;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{EnumOption, Member, Struct, Ty};
use dojo_world::config::WorldMetadata;
Expand All @@ -16,6 +17,7 @@ use starknet_crypto::poseidon_hash_many;
use tokio::sync::mpsc::UnboundedSender;
use utils::felts_to_sql_string;

use crate::constants::SQL_FELT_DELIMITER;
use crate::executor::{
Argument, DeleteEntityQuery, EventMessageQuery, QueryMessage, QueryType, ResetCursorsQuery,
SetHeadQuery, UpdateCursorsQuery,
Expand All @@ -26,12 +28,8 @@ use crate::utils::utc_dt_string_from_timestamp;
type IsEventMessage = bool;
type IsStoreUpdate = bool;

pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const FELT_DELIMITER: &str = "/";

pub mod cache;
pub mod erc;
pub mod query_queue;
#[cfg(test)]
#[path = "test.rs"]
mod test;
Expand Down Expand Up @@ -260,7 +258,7 @@ impl Sql {
upgrade_diff: Option<&Ty>,
) -> Result<()> {
let selector = compute_selector_from_names(namespace, &model.name());
let namespaced_name = format!("{}-{}", namespace, model.name());
let namespaced_name = get_tag(namespace, &model.name());

let insert_models =
"INSERT INTO models (id, namespace, name, class_hash, contract_address, layout, \
Expand Down Expand Up @@ -751,7 +749,7 @@ impl Sql {
std::iter::once(entity_id.to_string())
.chain(indexes.iter().map(|i| i.to_string()))
.collect::<Vec<String>>()
.join(FELT_DELIMITER),
.join(SQL_FELT_DELIMITER),
));
}

Expand Down
Loading

0 comments on commit d005475

Please sign in to comment.