Skip to content

Commit

Permalink
fix: traits
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Sep 16, 2024
1 parent 13ca286 commit 4d04d47
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 38 deletions.
18 changes: 9 additions & 9 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,19 @@ async fn main() -> anyhow::Result<()> {
let provider: Arc<_> = JsonRpcClient::new(HttpTransport::new(args.rpc)).into();

// Get world address
let world = WorldContractReader::new(args.world_address, &provider);
let world = WorldContractReader::new(args.world_address, provider.clone());

let db = Sql::new(pool.clone(), args.world_address).await?;

let processors = Processors {
event: generate_event_processors_map(vec![
Box::new(RegisterModelProcessor),
Box::new(StoreSetRecordProcessor),
Box::new(MetadataUpdateProcessor),
Box::new(StoreDelRecordProcessor),
Box::new(EventMessageProcessor),
Box::new(StoreUpdateRecordProcessor),
Box::new(StoreUpdateMemberProcessor),
Arc::new(RegisterModelProcessor),
Arc::new(StoreSetRecordProcessor),
Arc::new(MetadataUpdateProcessor),
Arc::new(StoreDelRecordProcessor),
Arc::new(EventMessageProcessor),
Arc::new(StoreUpdateRecordProcessor),
Arc::new(StoreUpdateMemberProcessor),
])?,
transaction: vec![Box::new(StoreTransactionProcessor)],
..Processors::default()
Expand All @@ -193,7 +193,7 @@ async fn main() -> anyhow::Result<()> {
let mut engine = Engine::new(
world,
db.clone(),
&provider,
provider.clone(),
processors,
EngineConfig {
start_block: args.start_block,
Expand Down
17 changes: 11 additions & 6 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::sql::Sql;

#[allow(missing_debug_implementations)]
pub struct Processors<P: Provider + Send + Sync + std::fmt::Debug> {
pub struct Processors<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
pub block: Vec<Box<dyn BlockProcessor<P>>>,
pub transaction: Vec<Box<dyn TransactionProcessor<P>>>,
pub event: HashMap<Felt, Arc<dyn EventProcessor<P>>>,
pub catch_all_event: Box<dyn EventProcessor<P>>,
}

impl<P: Provider + Send + Sync + std::fmt::Debug> Default for Processors<P> {
impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Default for Processors<P> {
fn default() -> Self {
Self {
block: vec![],
Expand Down Expand Up @@ -87,9 +87,9 @@ pub struct FetchPendingResult {
}

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug> {
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
db: Arc<Sql>,
db: Sql,
provider: Box<P>,
processors: Arc<Processors<P>>,
config: EngineConfig,
Expand All @@ -103,7 +103,7 @@ struct UnprocessedEvent {
data: Vec<String>,
}

impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
pub fn new(
world: WorldContractReader<P>,
db: Sql,
Expand Down Expand Up @@ -442,12 +442,13 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

// Process queued tasks in parallel
let tasks: Vec<_> = self.tasks.drain().map(|(task_id, events)| {
let world = self.world.clone();
let db = self.db.clone();
let world = self.world.clone();
let processors = self.processors.clone();
let block_timestamp = data.blocks[&last_block];

tokio::spawn(async move {
let mut local_db = db.clone();
for (event_id, event) in events {
if let Some(processor) = processors.event.get(&event.keys[0]) {
if let Err(e) = processor
Expand All @@ -464,6 +465,10 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

// We wait for all tasks to complete processing
let results = try_join_all(tasks).await?;
for local_db in results {
// We merge the query queues of each task into the main db
self.db.merge(local_db?)?;
}

self.db.set_head(data.latest_block_number, None, None);
self.db.execute().await?;
Expand Down
5 changes: 3 additions & 2 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::{Error, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -75,8 +76,8 @@ pub trait TransactionProcessor<P: Provider + Sync>: Send + Sync {

/// Given a list of event processors, generate a map of event keys to the event processor
pub fn generate_event_processors_map<P: Provider + Sync + Send>(
event_processor: Vec<Box<dyn EventProcessor<P>>>,
) -> Result<HashMap<Felt, Box<dyn EventProcessor<P>>>> {
event_processor: Vec<Arc<dyn EventProcessor<P>>>,
) -> Result<HashMap<Felt, Arc<dyn EventProcessor<P>>>> {
let mut event_processors = HashMap::new();

for processor in event_processor {
Expand Down
42 changes: 21 additions & 21 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ pub const FELT_DELIMITER: &str = "/";
#[path = "sql_test.rs"]
mod test;

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Sql {
world_address: Felt,
pub pool: Pool<Sqlite>,
pub query_queue: QueryQueue,
model_cache: Arc<ModelCache>,
}

// impl Clone for Sql {
// fn clone(&self) -> Self {
// Self { world_address: self.world_address, pool: self.pool.clone(), query_queue: QueryQueue::new(self.pool.clone()), model_cache: self.model_cache.clone() }
// }
// }
impl Clone for Sql {
fn clone(&self) -> Self {
Self { world_address: self.world_address, pool: self.pool.clone(), query_queue: QueryQueue::new(self.pool.clone()), model_cache: self.model_cache.clone() }
}
}

impl Sql {
pub async fn new(pool: Pool<Sqlite>, world_address: Felt) -> Result<Self> {
Expand All @@ -71,21 +71,21 @@ impl Sql {
})
}

// pub fn merge(&mut self, other: Sql) -> Result<()> {
// // Merge query queue
// self.query_queue.queue.extend(other.query_queue.queue);
// self.query_queue.publish_queue.extend(other.query_queue.publish_queue);

// // This should never happen
// if self.world_address != other.world_address {
// warn!(
// "Merging Sql instances with different world addresses: {} and {}",
// self.world_address, other.world_address
// );
// }

// Ok(())
// }
pub fn merge(&mut self, other: Sql) -> Result<()> {
// Merge query queue
self.query_queue.queue.extend(other.query_queue.queue);
self.query_queue.publish_queue.extend(other.query_queue.publish_queue);

// This should never happen
if self.world_address != other.world_address {
warn!(
"Merging Sql instances with different world addresses: {} and {}",
self.world_address, other.world_address
);
}

Ok(())
}

pub async fn head(&self) -> Result<(u64, Option<Felt>, Option<Felt>)> {
let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
Expand Down

0 comments on commit 4d04d47

Please sign in to comment.