Skip to content

Commit

Permalink
feat: first pass at parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Sep 16, 2024
1 parent 2d6e5de commit 13ca286
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 22 deletions.
73 changes: 55 additions & 18 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::try_join_all;
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts,
MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt,
TransactionReceiptWithBlockInfo, TransactionWithReceipt,
};
use starknet::providers::Provider;
use starknet_crypto::poseidon_hash_many;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::time::sleep;
Expand All @@ -24,7 +27,7 @@ use crate::sql::Sql;
pub struct Processors<P: Provider + Send + Sync + std::fmt::Debug> {
pub block: Vec<Box<dyn BlockProcessor<P>>>,
pub transaction: Vec<Box<dyn TransactionProcessor<P>>>,
pub event: HashMap<Felt, Box<dyn EventProcessor<P>>>,
pub event: HashMap<Felt, Arc<dyn EventProcessor<P>>>,
pub catch_all_event: Box<dyn EventProcessor<P>>,
}

Expand Down Expand Up @@ -85,13 +88,14 @@ pub struct FetchPendingResult {

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug> {
world: WorldContractReader<P>,
db: Sql,
world: Arc<WorldContractReader<P>>,
db: Arc<Sql>,
provider: Box<P>,
processors: Processors<P>,
processors: Arc<Processors<P>>,
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<Felt, Vec<(String, Event)>>,
}

struct UnprocessedEvent {
Expand All @@ -109,7 +113,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
) -> Self {
Self { world, db, provider: Box::new(provider), processors, config, shutdown_tx, block_tx }
Self { world: Arc::new(world), db, provider: Box::new(provider), processors: Arc::new(processors), config, shutdown_tx, block_tx, tasks: HashMap::new() }
}

pub async fn start(&mut self) -> Result<()> {
Expand Down Expand Up @@ -436,11 +440,30 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
}
}

// We return None for the pending_block_tx because our process_range
// gets only specific events from the world. so some transactions
// might get ignored and wont update the cursor.
// so once the sync range is done, we assume all of the tx of the block
// have been processed.
// Process queued tasks in parallel
let tasks: Vec<_> = self.tasks.drain().map(|(task_id, events)| {
let world = self.world.clone();
let db = self.db.clone();
let processors = self.processors.clone();
let block_timestamp = data.blocks[&last_block];

tokio::spawn(async move {
for (event_id, event) in events {
if let Some(processor) = processors.event.get(&event.keys[0]) {
if let Err(e) = processor
.process(&world, &mut local_db, last_block, block_timestamp, &event_id, &event)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing queued event.");
}
}
}
Ok::<_, anyhow::Error>(local_db)
})
}).collect();

// We wait for all tasks to complete processing
let results = try_join_all(tasks).await?;

self.db.set_head(data.latest_block_number, None, None);
self.db.execute().await?;
Expand Down Expand Up @@ -589,7 +612,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
event: &Event,
transaction_hash: Felt,
) -> Result<()> {
self.db.store_event(event_id, event, transaction_hash, block_timestamp);
// self.db.store_event(event_id, event, transaction_hash, block_timestamp);
let event_key = event.keys[0];

let Some(processor) = self.processors.event.get(&event_key) else {
Expand Down Expand Up @@ -627,14 +650,28 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
return Ok(());
};

// if processor.validate(event) {
if let Err(e) = processor
.process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
let task_identifier = match processor.event_key().as_str() {
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
poseidon_hash_many(&[event.data[0], event.data[1]])
}
_ => Felt::ZERO,
};

// if we have a task identifier, we queue the event to be parallelized
if task_identifier != Felt::ZERO {
self.tasks
.entry(task_identifier)
.or_insert(vec![])
.push((event_id.to_string(), event.clone()));
} else {
// if we dont have a task identifier, we process the event immediately
if let Err(e) = processor
.process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
}
}
// }

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const ENTITY_ID_INDEX: usize = 1;
const NUM_KEYS_INDEX: usize = 2;

#[async_trait]
pub trait EventProcessor<P>
pub trait EventProcessor<P>: Send + Sync
where
P: Provider + Sync,
{
Expand All @@ -48,7 +48,7 @@ where
}

#[async_trait]
pub trait BlockProcessor<P: Provider + Sync> {
pub trait BlockProcessor<P: Provider + Sync>: Send + Sync {
fn get_block_number(&self) -> String;
async fn process(
&self,
Expand All @@ -60,7 +60,7 @@ pub trait BlockProcessor<P: Provider + Sync> {
}

#[async_trait]
pub trait TransactionProcessor<P: Provider + Sync> {
pub trait TransactionProcessor<P: Provider + Sync>: Send + Sync {
#[allow(clippy::too_many_arguments)]
async fn process(
&self,
Expand Down
24 changes: 23 additions & 1 deletion crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction};
use starknet_crypto::poseidon_hash_many;
use tracing::debug;
use tracing::{debug, warn};

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType};
Expand All @@ -40,6 +40,12 @@ pub struct Sql {
model_cache: Arc<ModelCache>,
}

// impl Clone for Sql {
// fn clone(&self) -> Self {
// Self { world_address: self.world_address, pool: self.pool.clone(), query_queue: QueryQueue::new(self.pool.clone()), model_cache: self.model_cache.clone() }
// }
// }

impl Sql {
pub async fn new(pool: Pool<Sqlite>, world_address: Felt) -> Result<Self> {
let mut query_queue = QueryQueue::new(pool.clone());
Expand All @@ -65,6 +71,22 @@ impl Sql {
})
}

// pub fn merge(&mut self, other: Sql) -> Result<()> {
// // Merge query queue
// self.query_queue.queue.extend(other.query_queue.queue);
// self.query_queue.publish_queue.extend(other.query_queue.publish_queue);

// // This should never happen
// if self.world_address != other.world_address {
// warn!(
// "Merging Sql instances with different world addresses: {} and {}",
// self.world_address, other.world_address
// );
// }

// Ok(())
// }

pub async fn head(&self) -> Result<(u64, Option<Felt>, Option<Felt>)> {
let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let indexer_query =
Expand Down

0 comments on commit 13ca286

Please sign in to comment.