Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): configutation file for all torii cli options #2646

Merged
merged 8 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/torii/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ torii-grpc = { workspace = true, features = [ "server" ] }
torii-relay.workspace = true
torii-server.workspace = true
tower.workspace = true
toml.workspace = true

tower-http.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
url.workspace = true
webbrowser = "0.8"
tempfile.workspace = true
clap_config = "0.1.1"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

🛠️ Refactor suggestion

Consider using clap directly instead of clap_config, sensei!

The verification reveals:

  1. clap_config is only used in one place (bin/torii/src/main.rs) for a simple skip attribute
  2. The package is relatively new (last updated July 2024) with low adoption (~1.8K total downloads)
  3. The codebase already extensively uses clap directly, as seen in multiple files

The functionality can be replaced with clap's built-in attributes, making the dependency unnecessary:

  • Replace #[clap_config(skip)] with #[arg(skip)]
🔗 Analysis chain

Consider workspace version management and package stability, sensei!

The clap_config dependency has a few concerns:

  1. It's not using workspace version management (.workspace = true) like other dependencies
  2. Version 0.1.1 is quite early and might be unstable
  3. Being a small community package, we should verify its maintenance status

Let's check the package's health:

Consider:

  1. Moving the version to workspace-level management
  2. Evaluating more mature alternatives or waiting for a stable release
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify clap_config package health
# Get package info from crates.io API
curl -s https://crates.io/api/v1/crates/clap_config | jq -r '.crate | {max_version, updated_at, downloads, recent_downloads}'

Length of output: 257


Script:

#!/bin/bash
# Check if clap_config is used in workspace dependencies
rg -U "clap_config.*workspace\s*=\s*true" Cargo.toml

# Check workspace root Cargo.toml for any clap_config version specification
rg "clap_config.*version" Cargo.toml

# Find all usages of clap_config in the codebase
rg "clap_config" -A 2 -B 2

# Check for alternative crates that might be used
rg "clap::" -A 2 -B 2

Length of output: 15413


[dev-dependencies]
camino.workspace = true
Expand Down
116 changes: 51 additions & 65 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
//! for more info.

use std::cmp;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use clap::{ArgAction, Parser};
use clap::{ArgAction, CommandFactory, FromArgMatches, Parser};
use clap_config::ClapConfig;
use dojo_metrics::exporters::prometheus::PrometheusRecorder;
use dojo_utils::parse::{parse_socket_address, parse_url};
use dojo_world::contracts::world::WorldContractReader;
Expand All @@ -37,9 +37,10 @@ use tokio_stream::StreamExt;
use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors};
use torii_core::executor::Executor;
use torii_core::processors::store_transaction::StoreTransactionProcessor;
use torii_core::processors::EventProcessorConfig;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::Sql;
use torii_core::types::{Contract, ContractType, Model, ToriiConfig};
use torii_core::types::{Contract, ContractType, Model};
use torii_server::proxy::Proxy;
use tracing::{error, info};
use tracing_subscriber::{fmt, EnvFilter};
Expand All @@ -48,7 +49,7 @@ use url::{form_urlencoded, Url};
pub(crate) const LOG_TARGET: &str = "torii::cli";

/// Dojo World Indexer
#[derive(Parser, Debug)]
#[derive(ClapConfig, Parser, Debug)]
#[command(name = "torii", author, version, about, long_about = None)]
struct Args {
/// The world to index
Expand Down Expand Up @@ -91,9 +92,8 @@ struct Args {

/// Specify allowed origins for api endpoints (comma-separated list of allowed origins, or "*"
/// for all)
#[arg(long)]
#[arg(value_delimiter = ',')]
allowed_origins: Option<Vec<String>>,
#[arg(long, value_delimiter = ',')]
allowed_origins: Vec<String>,

/// The external url of the server, used for configuring the GraphQL Playground in a hosted
/// environment
Expand Down Expand Up @@ -139,32 +139,38 @@ struct Args {
index_raw_events: bool,

/// ERC contract addresses to index
#[arg(long, value_parser = parse_erc_contracts)]
#[arg(conflicts_with = "config")]
contracts: Option<std::vec::Vec<Contract>>,
#[arg(long, value_delimiter = ',', value_parser = parse_erc_contract)]
contracts: Vec<Contract>,

/// Event messages that are going to be treated as historical
/// A list of the model tags (namespace-name)
#[arg(long, value_delimiter = ',')]
historical_events: Vec<String>,

/// Configuration file
#[arg(long)]
#[clap_config(skip)]
config: Option<PathBuf>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();

let mut config = if let Some(path) = args.config {
ToriiConfig::load_from_path(&path)?
let matches = <Args as CommandFactory>::command().get_matches();
let mut args = if let Some(path) = matches.get_one::<PathBuf>("config") {
let config: ArgsConfig = toml::from_str(&std::fs::read_to_string(path)?)?;
Args::from_merged(matches, Some(config))
} else {
let mut config = ToriiConfig::default();

if let Some(contracts) = args.contracts {
config.contracts = VecDeque::from(contracts);
}
Args::from_arg_matches(&matches)?
};
Comment on lines +158 to +164
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider enhancing error handling for config file loading.

The current implementation could benefit from more robust error handling and validation.

Consider this improvement:

let matches = <Args as CommandFactory>::command().get_matches();
let mut args = if let Some(path) = matches.get_one::<PathBuf>("config") {
-    let config: ArgsConfig = toml::from_str(&std::fs::read_to_string(path)?)?;
+    let content = std::fs::read_to_string(path)
+        .with_context(|| format!("Failed to read config file: {}", path.display()))?;
+    let config: ArgsConfig = toml::from_str(&content)
+        .with_context(|| format!("Failed to parse TOML config at: {}", path.display()))?;
    Args::from_merged(matches, Some(config))
} else {
    Args::from_arg_matches(&matches)?
};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let matches = <Args as CommandFactory>::command().get_matches();
let mut args = if let Some(path) = matches.get_one::<PathBuf>("config") {
let config: ArgsConfig = toml::from_str(&std::fs::read_to_string(path)?)?;
Args::from_merged(matches, Some(config))
} else {
let mut config = ToriiConfig::default();
if let Some(contracts) = args.contracts {
config.contracts = VecDeque::from(contracts);
}
Args::from_arg_matches(&matches)?
};
let matches = <Args as CommandFactory>::command().get_matches();
let mut args = if let Some(path) = matches.get_one::<PathBuf>("config") {
let content = std::fs::read_to_string(path)
.with_context(|| format!("Failed to read config file: {}", path.display()))?;
let config: ArgsConfig = toml::from_str(&content)
.with_context(|| format!("Failed to parse TOML config at: {}", path.display()))?;
Args::from_merged(matches, Some(config))
} else {
Args::from_arg_matches(&matches)?
};


config
let world_address = if let Some(world_address) = args.world_address {
world_address
} else {
return Err(anyhow::anyhow!("Please specify a world address."));
};

let world_address = verify_single_world_address(args.world_address, &mut config)?;
// let mut contracts = parse_erc_contracts(&args.contracts)?;
args.contracts.push(Contract { address: world_address, r#type: ContractType::WORLD });

let filter_layer = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,hyper_reverse_proxy=off"));
Expand Down Expand Up @@ -210,15 +216,12 @@ async fn main() -> anyhow::Result<()> {
// Get world address
let world = WorldContractReader::new(world_address, provider.clone());

let contracts =
config.contracts.iter().map(|contract| (contract.address, contract.r#type)).collect();

let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?;
tokio::spawn(async move {
executor.run().await.unwrap();
});

let db = Sql::new(pool.clone(), sender.clone(), &contracts).await?;
let db = Sql::new(pool.clone(), sender.clone(), &args.contracts).await?;

let processors = Processors {
transaction: vec![Box::new(StoreTransactionProcessor)],
Expand Down Expand Up @@ -248,10 +251,13 @@ async fn main() -> anyhow::Result<()> {
index_pending: args.index_pending,
polling_interval: Duration::from_millis(args.polling_interval),
flags,
event_processor_config: EventProcessorConfig {
historical_events: args.historical_events,
},
},
shutdown_tx.clone(),
Some(block_tx),
Arc::new(contracts),
&args.contracts,
);

let shutdown_rx = shutdown_tx.subscribe();
Expand All @@ -270,7 +276,12 @@ async fn main() -> anyhow::Result<()> {
)
.expect("Failed to start libp2p relay server");

let proxy_server = Arc::new(Proxy::new(args.addr, args.allowed_origins, Some(grpc_addr), None));
let proxy_server = Arc::new(Proxy::new(
args.addr,
if args.allowed_origins.is_empty() { None } else { Some(args.allowed_origins) },
Some(grpc_addr),
None,
));

let graphql_server = spawn_rebuilding_graphql_server(
shutdown_tx.clone(),
Expand Down Expand Up @@ -321,26 +332,6 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

// Verifies that the world address is defined at most once
// and returns the world address
fn verify_single_world_address(
world_address: Option<Felt>,
config: &mut ToriiConfig,
) -> anyhow::Result<Felt> {
let world_from_config =
config.contracts.iter().find(|c| c.r#type == ContractType::WORLD).map(|c| c.address);

match (world_address, world_from_config) {
(Some(_), Some(_)) => Err(anyhow::anyhow!("World address specified multiple times")),
(Some(addr), _) => {
config.contracts.push_front(Contract { address: addr, r#type: ContractType::WORLD });
Ok(addr)
}
(_, Some(addr)) => Ok(addr),
(None, None) => Err(anyhow::anyhow!("World address not specified")),
}
}

async fn spawn_rebuilding_graphql_server(
shutdown_tx: Sender<()>,
pool: Arc<SqlitePool>,
Expand Down Expand Up @@ -368,25 +359,20 @@ async fn spawn_rebuilding_graphql_server(
// Parses clap cli argument which is expected to be in the format:
// - erc_type:address:start_block
// - address:start_block (erc_type defaults to ERC20)
fn parse_erc_contracts(s: &str) -> anyhow::Result<Vec<Contract>> {
let parts: Vec<&str> = s.split(',').collect();
let mut contracts = Vec::new();
for part in parts {
match part.split(':').collect::<Vec<&str>>().as_slice() {
[r#type, address] => {
let r#type = r#type.parse::<ContractType>()?;
let address = Felt::from_str(address)
.with_context(|| format!("Expected address, found {}", address))?;
contracts.push(Contract { address, r#type });
}
[address] => {
let r#type = ContractType::WORLD;
let address = Felt::from_str(address)
.with_context(|| format!("Expected address, found {}", address))?;
contracts.push(Contract { address, r#type });
fn parse_erc_contract(part: &str) -> anyhow::Result<Contract> {
match part.split(':').collect::<Vec<&str>>().as_slice() {
[r#type, address] => {
let r#type = r#type.parse::<ContractType>()?;
if r#type == ContractType::WORLD {
return Err(anyhow::anyhow!(
"World address cannot be specified as an ERC contract"
));
}
_ => return Err(anyhow::anyhow!("Invalid contract format")),

let address = Felt::from_str(address)
.with_context(|| format!("Expected address, found {}", address))?;
Ok(Contract { address, r#type })
}
_ => Err(anyhow::anyhow!("Invalid contract format")),
}
Ok(contracts)
}
1 change: 0 additions & 1 deletion crates/torii/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ thiserror.workspace = true
tokio = { version = "1.32.0", features = [ "sync", "macros" ], default-features = true }
# tokio-stream = "0.1.11"
tokio-util.workspace = true
toml.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
15 changes: 11 additions & 4 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use crate::processors::store_del_record::StoreDelRecordProcessor;
use crate::processors::store_set_record::StoreSetRecordProcessor;
use crate::processors::store_update_member::StoreUpdateMemberProcessor;
use crate::processors::store_update_record::StoreUpdateRecordProcessor;
use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::processors::{BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor};
use crate::sql::{Cursors, Sql};
use crate::types::ContractType;
use crate::types::{Contract, ContractType};

type EventProcessorMap<P> = HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>>;

Expand Down Expand Up @@ -142,6 +142,7 @@ pub struct EngineConfig {
pub index_pending: bool,
pub max_concurrent_tasks: usize,
pub flags: IndexingFlags,
pub event_processor_config: EventProcessorConfig,
}

impl Default for EngineConfig {
Expand All @@ -154,6 +155,7 @@ impl Default for EngineConfig {
index_pending: true,
max_concurrent_tasks: 100,
flags: IndexingFlags::empty(),
event_processor_config: EventProcessorConfig::default(),
}
}
}
Expand Down Expand Up @@ -217,8 +219,10 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
contracts: Arc<HashMap<Felt, ContractType>>,
contracts: &Vec<Contract>,
) -> Self {
let contracts = Arc::new(contracts.iter().map(|contract| (contract.address, contract.r#type)).collect());

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider adding contract validation.

Ohayo, sensei! The contract initialization lacks validation for duplicate addresses.

Add validation to ensure no duplicate contract addresses exist in the input vector. This could lead to undefined behavior if multiple contracts share the same address but have different types.

+let mut seen_addresses = HashSet::new();
+for contract in contracts {
+    if !seen_addresses.insert(contract.address) {
+        return Err(anyhow::anyhow!("Duplicate contract address: {:#x}", contract.address));
+    }
+}
 let contracts = Arc::new(contracts
     .iter()
     .map(|contract| (contract.address, contract.r#type))
     .collect::<HashMap<_, _>>());

Also applies to: 32-38

Self {
world: Arc::new(world),
db,
Expand Down Expand Up @@ -574,6 +578,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let semaphore = semaphore.clone();
let processors = self.processors.clone();

let event_processor_config = self.config.event_processor_config.clone();
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();
Expand All @@ -586,7 +591,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event.");

if let Err(e) = processor
.process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event)
.process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event, &event_processor_config)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event.");
Expand Down Expand Up @@ -796,6 +801,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
block_timestamp,
event_id,
event,
&self.config.event_processor_config,
)
.await
{
Expand Down Expand Up @@ -855,6 +861,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
block_timestamp,
event_id,
event,
&self.config.event_processor_config,
)
.await
{
Expand Down
3 changes: 2 additions & 1 deletion crates/torii/core/src/processors/erc20_legacy_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use starknet::core::types::{Event, U256};
use starknet::providers::Provider;
use tracing::debug;

use super::EventProcessor;
use super::{EventProcessor, EventProcessorConfig};
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_legacy_transfer";
Expand Down Expand Up @@ -42,6 +42,7 @@ where
block_timestamp: u64,
event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
) -> Result<(), Error> {
let token_address = event.from_address;
let from = event.data[0];
Expand Down
Loading
Loading