Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii-core): cleanups #2702

Merged
merged 10 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ async fn main() -> anyhow::Result<()> {
if args.events.raw {
flags.insert(IndexingFlags::RAW_EVENTS);
}
if args.indexing.pending {
flags.insert(IndexingFlags::PENDING_BLOCKS);
}

let mut engine: Engine<Arc<JsonRpcClient<HttpTransport>>> = Engine::new(
world,
Expand All @@ -145,10 +148,8 @@ async fn main() -> anyhow::Result<()> {
processors,
EngineConfig {
max_concurrent_tasks: args.indexing.max_concurrent_tasks,
start_block: 0,
blocks_chunk_size: args.indexing.blocks_chunk_size,
events_chunk_size: args.indexing.events_chunk_size,
index_pending: args.indexing.pending,
polling_interval: Duration::from_millis(args.indexing.polling_interval),
flags,
event_processor_config: EventProcessorConfig {
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl IndexingOptions {
#[command(next_help_heading = "Events indexing options")]
pub struct EventsOptions {
/// Whether or not to index raw events
#[arg(long = "events.raw", action = ArgAction::Set, default_value_t = true, help = "Whether or not to index raw events.")]
#[arg(long = "events.raw", action = ArgAction::Set, default_value_t = false, help = "Whether or not to index raw events.")]
#[serde(default)]
pub raw: bool,

Expand Down
15 changes: 2 additions & 13 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,15 @@ bitflags! {
pub struct IndexingFlags: u32 {
const TRANSACTIONS = 0b00000001;
const RAW_EVENTS = 0b00000010;
const PENDING_BLOCKS = 0b00000100;
}
}

#[derive(Debug)]
pub struct EngineConfig {
pub polling_interval: Duration,
pub start_block: u64,
pub blocks_chunk_size: u64,
pub events_chunk_size: u64,
pub index_pending: bool,
pub max_concurrent_tasks: usize,
pub flags: IndexingFlags,
pub event_processor_config: EventProcessorConfig,
Expand All @@ -155,10 +154,8 @@ impl Default for EngineConfig {
fn default() -> Self {
Self {
polling_interval: Duration::from_millis(500),
start_block: 0,
blocks_chunk_size: 10240,
events_chunk_size: 1024,
index_pending: true,
max_concurrent_tasks: 100,
flags: IndexingFlags::empty(),
event_processor_config: EventProcessorConfig::default(),
Expand Down Expand Up @@ -245,14 +242,6 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

pub async fn start(&mut self) -> Result<()> {
// use the start block provided by user if head is 0
let (head, _, _) = self.db.head(self.world.address).await?;
if head == 0 {
self.db.set_head(self.config.start_block, 0, 0, self.world.address).await?;
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}

let mut backoff_delay = Duration::from_secs(1);
let max_backoff_delay = Duration::from_secs(60);

Expand Down Expand Up @@ -324,7 +313,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let data = self.fetch_range(from, to, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range.");
FetchDataResult::Range(data)
} else if self.config.index_pending {
} else if self.config.flags.contains(IndexingFlags::PENDING_BLOCKS) {
let data =
self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data.");
Expand Down
3 changes: 1 addition & 2 deletions crates/torii/core/src/processors/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ where
entity.deserialize(&mut keys_and_unpacked)?;

// TODO: this must come from some torii's configuration.
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
let historical =
config.historical_events.contains(&format!("{}-{}", model.namespace, model.name));
let historical = config.is_historical(&format!("{}-{}", model.namespace, model.name));
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
db.set_event_message(entity, event_id, block_timestamp, historical).await?;
Ok(())
}
Expand Down
10 changes: 10 additions & 0 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ pub struct EventProcessorConfig {
pub namespaces: HashSet<String>,
}

impl EventProcessorConfig {
pub fn is_historical(&self, tag: &str) -> bool {
self.historical_events.contains(tag)
}

pub fn should_index(&self, namespace: &str) -> bool {
self.namespaces.is_empty() || self.namespaces.contains(namespace)
}
}

#[async_trait]
pub trait EventProcessor<P>: Send + Sync
where
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/register_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where

// If the namespace is not in the list of namespaces to index, silently ignore it.
// If our config is empty, we index all namespaces.
if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) {
if !config.should_index(&namespace) {
return Ok(());
}

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/register_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where

// If the namespace is not in the list of namespaces to index, silently ignore it.
// If our config is empty, we index all namespaces.
if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) {
if !config.should_index(&namespace) {
return Ok(());
}

Expand Down
7 changes: 5 additions & 2 deletions crates/torii/core/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub const FELT_DELIMITER: &str = "/";

pub mod cache;
pub mod erc;
pub mod query_queue;
#[cfg(test)]
#[path = "test.rs"]
mod test;
Expand Down Expand Up @@ -830,7 +829,11 @@ impl Sql {
Ty::Enum(e) => {
if e.options.iter().all(
|o| {
if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }
if let Ty::Tuple(t) = &o.ty {
t.is_empty()
} else {
false
}
},
) {
return Ok(());
Expand Down
188 changes: 0 additions & 188 deletions crates/torii/core/src/sql/query_queue.rs

This file was deleted.

Loading