Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii-core): cleanups #2702

Merged
merged 10 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@
if args.events.raw {
flags.insert(IndexingFlags::RAW_EVENTS);
}
if args.indexing.pending {
flags.insert(IndexingFlags::PENDING_BLOCKS);
}

Check warning on line 143 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L141-L143

Added lines #L141 - L143 were not covered by tests

let mut engine: Engine<Arc<JsonRpcClient<HttpTransport>>> = Engine::new(
world,
Expand All @@ -146,10 +149,8 @@
processors,
EngineConfig {
max_concurrent_tasks: args.indexing.max_concurrent_tasks,
start_block: 0,
blocks_chunk_size: args.indexing.blocks_chunk_size,
events_chunk_size: args.indexing.events_chunk_size,
index_pending: args.indexing.pending,
polling_interval: Duration::from_millis(args.indexing.polling_interval),
flags,
event_processor_config: EventProcessorConfig {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ mod test {
db_dir = "/tmp/torii-test"

[events]
raw = false
raw = true
historical = [
"ns-E",
"ns-EH"
Expand Down Expand Up @@ -291,7 +291,7 @@ mod test {
assert_eq!(torii_args.world_address, Some(Felt::from_str("0x1234").unwrap()));
assert_eq!(torii_args.rpc, Url::parse("http://0.0.0.0:2222").unwrap());
assert_eq!(torii_args.db_dir, Some(PathBuf::from("/tmp/torii-test")));
assert!(!torii_args.events.raw);
assert!(torii_args.events.raw);
assert_eq!(torii_args.events.historical, vec!["ns-E".to_string(), "ns-EH".to_string()]);
assert_eq!(torii_args.indexing.events_chunk_size, 9999);
assert_eq!(torii_args.indexing.blocks_chunk_size, 10240);
Expand Down
10 changes: 2 additions & 8 deletions crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ impl IndexingOptions {
}
}

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq, Default)]
#[command(next_help_heading = "Events indexing options")]
pub struct EventsOptions {
/// Whether or not to index raw events
#[arg(long = "events.raw", action = ArgAction::Set, default_value_t = true, help = "Whether or not to index raw events.")]
#[arg(long = "events.raw", action = ArgAction::Set, default_value_t = false, help = "Whether or not to index raw events.")]
#[serde(default)]
pub raw: bool,

Expand All @@ -229,12 +229,6 @@ pub struct EventsOptions {
pub historical: Vec<String>,
}

impl Default for EventsOptions {
fn default() -> Self {
Self { raw: true, historical: vec![] }
}
}

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[command(next_help_heading = "HTTP server options")]
pub struct ServerOptions {
Expand Down
13 changes: 13 additions & 0 deletions crates/torii/core/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
pub const TOKEN_BALANCE_TABLE: &str = "token_balances";
pub const TOKEN_TRANSFER_TABLE: &str = "token_transfers";
pub const TOKENS_TABLE: &str = "tokens";

pub(crate) const LOG_TARGET: &str = "torii_core::engine";
pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000;

pub const IPFS_URL: &str = "https://ipfs.io/ipfs/";
pub const IPFS_CLIENT_MAX_RETRY: u8 = 3;

pub const IPFS_CLIENT_URL: &str = "https://ipfs.infura.io:5001";
pub const IPFS_CLIENT_USERNAME: &str = "2EBrzr7ZASQZKH32sl2xWauXPSA";
pub const IPFS_CLIENT_PASSWORD: &str = "12290b883db9138a8ae3363b6739d220";
Comment on lines +8 to +13
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In sozo we moved this to be read from env variables, and default to cartridge's if nothing is found. Maybe something we want here in a future, to enable usage of custom IPFS nodes.


pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const SQL_FELT_DELIMITER: &str = "/";
18 changes: 3 additions & 15 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use tokio::time::{sleep, Instant};
use tracing::{debug, error, info, trace, warn};

use crate::constants::LOG_TARGET;
use crate::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor;
use crate::processors::erc20_transfer::Erc20TransferProcessor;
use crate::processors::erc721_legacy_transfer::Erc721LegacyTransferProcessor;
Expand Down Expand Up @@ -128,24 +129,21 @@
self.event_processors.get(&contract_type).unwrap()
}
}
pub(crate) const LOG_TARGET: &str = "torii_core::engine";
pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000;

bitflags! {
#[derive(Debug, Clone)]
pub struct IndexingFlags: u32 {
const TRANSACTIONS = 0b00000001;
const RAW_EVENTS = 0b00000010;
const PENDING_BLOCKS = 0b00000100;
}
}

#[derive(Debug)]
pub struct EngineConfig {
pub polling_interval: Duration,
pub start_block: u64,
pub blocks_chunk_size: u64,
pub events_chunk_size: u64,
pub index_pending: bool,
pub max_concurrent_tasks: usize,
pub flags: IndexingFlags,
pub event_processor_config: EventProcessorConfig,
Expand All @@ -155,10 +153,8 @@
fn default() -> Self {
Self {
polling_interval: Duration::from_millis(500),
start_block: 0,
blocks_chunk_size: 10240,
events_chunk_size: 1024,
index_pending: true,
max_concurrent_tasks: 100,
flags: IndexingFlags::empty(),
event_processor_config: EventProcessorConfig::default(),
Expand Down Expand Up @@ -245,14 +241,6 @@
}

pub async fn start(&mut self) -> Result<()> {
// use the start block provided by user if head is 0
let (head, _, _) = self.db.head(self.world.address).await?;
if head == 0 {
self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?;
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}

let mut backoff_delay = Duration::from_secs(1);
let max_backoff_delay = Duration::from_secs(60);

Expand Down Expand Up @@ -324,7 +312,7 @@
let data = self.fetch_range(from, to, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range.");
FetchDataResult::Range(data)
} else if self.config.index_pending {
} else if self.config.flags.contains(IndexingFlags::PENDING_BLOCKS) {

Check warning on line 315 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L315

Added line #L315 was not covered by tests
let data =
self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data.");
Expand Down
9 changes: 4 additions & 5 deletions crates/torii/core/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
use tracing::{debug, trace};

use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::TOKEN_BALANCE_TABLE;
use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256};
use crate::sql::FELT_DELIMITER;
use crate::types::ContractType;
use crate::utils::{fetch_content_from_ipfs, MAX_RETRY};
use crate::utils::fetch_content_from_ipfs;

#[derive(Debug, Clone)]
pub struct RegisterErc721TokenQuery {
Expand Down Expand Up @@ -50,7 +49,7 @@
) -> Result<()> {
let erc_cache = apply_balance_diff.erc_cache;
for ((contract_type, id_str), balance) in erc_cache.iter() {
let id = id_str.split(FELT_DELIMITER).collect::<Vec<&str>>();
let id = id_str.split(SQL_FELT_DELIMITER).collect::<Vec<&str>>();

Check warning on line 52 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L52

Added line #L52 was not covered by tests
match contract_type {
ContractType::WORLD => unreachable!(),
ContractType::ERC721 => {
Expand Down Expand Up @@ -228,7 +227,7 @@
uri if uri.starts_with("ipfs") => {
let cid = uri.strip_prefix("ipfs://").unwrap();
debug!(cid = %cid, "Fetching metadata from IPFS");
let response = fetch_content_from_ipfs(cid, MAX_RETRY)
let response = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY)

Check warning on line 230 in crates/torii/core/src/executor/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/erc.rs#L230

Added line #L230 was not covered by tests
.await
.context("Failed to fetch metadata from IPFS")?;

Expand Down
3 changes: 1 addition & 2 deletions crates/torii/core/src/processors/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ where
entity.deserialize(&mut keys_and_unpacked)?;

// TODO: this must come from some torii's configuration.
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
let historical =
config.historical_events.contains(&format!("{}-{}", model.namespace, model.name));
let historical = config.is_historical(&format!("{}-{}", model.namespace, model.name));
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
db.set_event_message(entity, event_id, block_timestamp, historical).await?;
Ok(())
}
Expand Down
7 changes: 4 additions & 3 deletions crates/torii/core/src/processors/metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
use tracing::{error, info};

use super::{EventProcessor, EventProcessorConfig};
use crate::constants::IPFS_CLIENT_MAX_RETRY;
use crate::sql::Sql;
use crate::utils::{fetch_content_from_ipfs, MAX_RETRY};
use crate::utils::fetch_content_from_ipfs;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::metadata_update";

Expand Down Expand Up @@ -106,7 +107,7 @@
let uri = Uri::Ipfs(uri_str);
let cid = uri.cid().ok_or("Uri is malformed").map_err(Error::msg)?;

let bytes = fetch_content_from_ipfs(cid, MAX_RETRY).await?;
let bytes = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY).await?;

Check warning on line 110 in crates/torii/core/src/processors/metadata_update.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/metadata_update.rs#L110

Added line #L110 was not covered by tests
let metadata: WorldMetadata = serde_json::from_str(std::str::from_utf8(&bytes)?)?;

let icon_img = fetch_image(&metadata.icon_uri).await;
Expand All @@ -117,7 +118,7 @@

async fn fetch_image(image_uri: &Option<Uri>) -> Option<String> {
if let Some(uri) = image_uri {
let data = fetch_content_from_ipfs(uri.cid()?, MAX_RETRY).await.ok()?;
let data = fetch_content_from_ipfs(uri.cid()?, IPFS_CLIENT_MAX_RETRY).await.ok()?;

Check warning on line 121 in crates/torii/core/src/processors/metadata_update.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/metadata_update.rs#L121

Added line #L121 was not covered by tests
let encoded = general_purpose::STANDARD.encode(data);
return Some(encoded);
}
Expand Down
10 changes: 10 additions & 0 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ pub struct EventProcessorConfig {
pub namespaces: HashSet<String>,
}

impl EventProcessorConfig {
pub fn is_historical(&self, tag: &str) -> bool {
self.historical_events.contains(tag)
}

pub fn should_index(&self, namespace: &str) -> bool {
self.namespaces.is_empty() || self.namespaces.contains(namespace)
}
}

#[async_trait]
pub trait EventProcessor<P>: Send + Sync
where
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/register_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where

// If the namespace is not in the list of namespaces to index, silently ignore it.
// If our config is empty, we index all namespaces.
if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) {
if !config.should_index(&namespace) {
return Ok(());
}

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/register_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where

// If the namespace is not in the list of namespaces to index, silently ignore it.
// If our config is empty, we index all namespaces.
if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) {
if !config.should_index(&namespace) {
return Ok(());
}

Expand Down
6 changes: 3 additions & 3 deletions crates/torii/core/src/sql/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use starknet::providers::Provider;

use super::utils::{u256_to_sql_string, I256};
use super::{Sql, FELT_DELIMITER};
use super::{Sql, SQL_FELT_DELIMITER};
use crate::constants::TOKEN_TRANSFER_TABLE;
use crate::executor::{
ApplyBalanceDiffQuery, Argument, QueryMessage, QueryType, RegisterErc20TokenQuery,
Expand Down Expand Up @@ -108,7 +108,7 @@
// from_address/contract_address:id
if from_address != Felt::ZERO {
let from_balance_id =
format!("{}{FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id);
format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id);

Check warning on line 111 in crates/torii/core/src/sql/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql/erc.rs#L111

Added line #L111 was not covered by tests
let from_balance = self
.local_cache
.erc_cache
Expand All @@ -119,7 +119,7 @@

if to_address != Felt::ZERO {
let to_balance_id =
format!("{}{FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id);
format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id);

Check warning on line 122 in crates/torii/core/src/sql/erc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql/erc.rs#L122

Added line #L122 was not covered by tests
let to_balance = self
.local_cache
.erc_cache
Expand Down
7 changes: 2 additions & 5 deletions crates/torii/core/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use tokio::sync::mpsc::UnboundedSender;
use utils::felts_to_sql_string;

use crate::constants::SQL_FELT_DELIMITER;
use crate::executor::{
Argument, DeleteEntityQuery, EventMessageQuery, QueryMessage, QueryType, ResetCursorsQuery,
SetHeadQuery, UpdateCursorsQuery,
Expand All @@ -26,12 +27,8 @@
type IsEventMessage = bool;
type IsStoreUpdate = bool;

pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const FELT_DELIMITER: &str = "/";

pub mod cache;
pub mod erc;
pub mod query_queue;
#[cfg(test)]
#[path = "test.rs"]
mod test;
Expand Down Expand Up @@ -751,7 +748,7 @@
std::iter::once(entity_id.to_string())
.chain(indexes.iter().map(|i| i.to_string()))
.collect::<Vec<String>>()
.join(FELT_DELIMITER),
.join(SQL_FELT_DELIMITER),

Check warning on line 751 in crates/torii/core/src/sql/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql/mod.rs#L751

Added line #L751 was not covered by tests
));
}

Expand Down
Loading
Loading