Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-core): parallelization #2423

Merged
merged 21 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@
/// Polling interval in ms
#[arg(long, default_value = "500")]
polling_interval: u64,

/// Max concurrent tasks
#[arg(long, default_value = "100")]
max_concurrent_tasks: usize,

Check warning on line 131 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L131

Added line #L131 was not covered by tests
}

#[tokio::main]
Expand Down Expand Up @@ -170,19 +174,19 @@
let provider: Arc<_> = JsonRpcClient::new(HttpTransport::new(args.rpc)).into();

// Get world address
let world = WorldContractReader::new(args.world_address, &provider);
let world = WorldContractReader::new(args.world_address, provider.clone());

Check warning on line 177 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L177

Added line #L177 was not covered by tests

let db = Sql::new(pool.clone(), args.world_address).await?;

let processors = Processors {
event: generate_event_processors_map(vec![
Box::new(RegisterModelProcessor),
Box::new(StoreSetRecordProcessor),
Box::new(MetadataUpdateProcessor),
Box::new(StoreDelRecordProcessor),
Box::new(EventMessageProcessor),
Box::new(StoreUpdateRecordProcessor),
Box::new(StoreUpdateMemberProcessor),
Arc::new(RegisterModelProcessor),
Arc::new(StoreSetRecordProcessor),
Arc::new(MetadataUpdateProcessor),
Arc::new(StoreDelRecordProcessor),
Arc::new(EventMessageProcessor),
Arc::new(StoreUpdateRecordProcessor),
Arc::new(StoreUpdateMemberProcessor),

Check warning on line 189 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L183-L189

Added lines #L183 - L189 were not covered by tests
])?,
transaction: vec![Box::new(StoreTransactionProcessor)],
..Processors::default()
Expand All @@ -193,9 +197,10 @@
let mut engine = Engine::new(
world,
db.clone(),
&provider,
provider.clone(),

Check warning on line 200 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L200

Added line #L200 was not covered by tests
processors,
EngineConfig {
max_concurrent_tasks: args.max_concurrent_tasks,

Check warning on line 203 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L203

Added line #L203 was not covered by tests
start_block: args.start_block,
events_chunk_size: args.events_chunk_size,
index_pending: args.index_pending,
Expand Down
5 changes: 5 additions & 0 deletions crates/torii/core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ impl ModelCache {
Ok(model)
}

pub async fn set(&self, selector: Felt, model: Model) {
let mut cache = self.cache.write().await;
cache.insert(selector, model);
}

pub async fn clear(&self) {
self.cache.write().await.clear();
}
Expand Down
130 changes: 105 additions & 25 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
Expand All @@ -11,8 +12,11 @@
TransactionReceiptWithBlockInfo, TransactionWithReceipt,
};
use starknet::providers::Provider;
use starknet_crypto::poseidon_hash_many;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tracing::{debug, error, info, trace, warn};

Expand All @@ -21,14 +25,14 @@
use crate::sql::Sql;

#[allow(missing_debug_implementations)]
pub struct Processors<P: Provider + Send + Sync + std::fmt::Debug> {
pub struct Processors<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
pub block: Vec<Box<dyn BlockProcessor<P>>>,
pub transaction: Vec<Box<dyn TransactionProcessor<P>>>,
pub event: HashMap<Felt, Box<dyn EventProcessor<P>>>,
pub event: HashMap<Felt, Arc<dyn EventProcessor<P>>>,
pub catch_all_event: Box<dyn EventProcessor<P>>,
}

impl<P: Provider + Send + Sync + std::fmt::Debug> Default for Processors<P> {
impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Default for Processors<P> {
fn default() -> Self {
Self {
block: vec![],
Expand All @@ -48,6 +52,7 @@
pub start_block: u64,
pub events_chunk_size: u64,
pub index_pending: bool,
pub max_concurrent_tasks: usize,
}

impl Default for EngineConfig {
Expand All @@ -57,6 +62,7 @@
start_block: 0,
events_chunk_size: 1024,
index_pending: true,
max_concurrent_tasks: 100,
}
}
}
Expand All @@ -83,23 +89,32 @@
pub block_number: u64,
}

#[derive(Debug)]
pub struct ParallelizedEvent {
pub block_number: u64,
pub block_timestamp: u64,
pub event_id: String,
pub event: Event,
}

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug> {
world: WorldContractReader<P>,
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
db: Sql,
provider: Box<P>,
processors: Processors<P>,
processors: Arc<Processors<P>>,
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<Felt, Vec<ParallelizedEvent>>,
}

struct UnprocessedEvent {
keys: Vec<String>,
data: Vec<String>,
}

impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
pub fn new(
world: WorldContractReader<P>,
db: Sql,
Expand All @@ -109,7 +124,16 @@
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
) -> Self {
Self { world, db, provider: Box::new(provider), processors, config, shutdown_tx, block_tx }
Self {
world: Arc::new(world),
db,
provider: Box::new(provider),
processors: Arc::new(processors),
config,
shutdown_tx,
block_tx,
tasks: HashMap::new(),
}
}

pub async fn start(&mut self) -> Result<()> {
Expand Down Expand Up @@ -397,11 +421,14 @@
}
}

// Process parallelized events
self.process_tasks().await?;

Check warning on line 425 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L425

Added line #L425 was not covered by tests

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(data.block_number - 1, last_pending_block_world_tx, last_pending_block_tx);

self.db.execute().await?;

Ok(())
}

Expand Down Expand Up @@ -436,18 +463,55 @@
}
}

// We return None for the pending_block_tx because our process_range
// gets only specific events from the world. so some transactions
// might get ignored and wont update the cursor.
// so once the sync range is done, we assume all of the tx of the block
// have been processed.
// Process parallelized events
self.process_tasks().await?;

self.db.set_head(data.latest_block_number, None, None);
self.db.execute().await?;

Ok(())
}

async fn process_tasks(&mut self) -> Result<()> {
// We use a semaphore to limit the number of concurrent tasks
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));

// Run all tasks concurrently
let mut set = JoinSet::new();
for (task_id, events) in self.tasks.drain() {
let db = self.db.clone();
let world = self.world.clone();
let processors = self.processors.clone();
let semaphore = semaphore.clone();

set.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let mut local_db = db.clone();
for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events {
if let Some(processor) = processors.event.get(&event.keys[0]) {
debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event.");

if let Err(e) = processor
.process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event)
.await

Check warning on line 496 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L496

Added line #L496 was not covered by tests
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event.");

Check warning on line 498 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L498

Added line #L498 was not covered by tests
}
}

Check warning on line 500 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L500

Added line #L500 was not covered by tests
}
Ok::<_, anyhow::Error>(local_db)
});
}

// Join all tasks
while let Some(result) = set.join_next().await {
let local_db = result??;
self.db.merge(local_db)?;
}

Ok(())
}

async fn get_block_timestamp(&self, block_number: u64) -> Result<u64> {
match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? {
MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp),
Expand Down Expand Up @@ -477,7 +541,7 @@
block_timestamp,
&event_id,
&event,
transaction_hash,
// transaction_hash,
)
.await?;
}
Expand Down Expand Up @@ -527,7 +591,7 @@
block_timestamp,
&event_id,
event,
*transaction_hash,
// *transaction_hash,

Check warning on line 594 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L594

Added line #L594 was not covered by tests
)
.await?;
}
Expand Down Expand Up @@ -587,9 +651,9 @@
block_timestamp: u64,
event_id: &str,
event: &Event,
transaction_hash: Felt,
// transaction_hash: Felt,
) -> Result<()> {
self.db.store_event(event_id, event, transaction_hash, block_timestamp);
// self.db.store_event(event_id, event, transaction_hash, block_timestamp);
let event_key = event.keys[0];

let Some(processor) = self.processors.event.get(&event_key) else {
Expand Down Expand Up @@ -627,14 +691,30 @@
return Ok(());
};

// if processor.validate(event) {
if let Err(e) = processor
.process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
let task_identifier = match processor.event_key().as_str() {
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
poseidon_hash_many(&[event.data[0], event.data[1]])
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
}
_ => Felt::ZERO,
};

// if we have a task identifier, we queue the event to be parallelized
if task_identifier != Felt::ZERO {
self.tasks.entry(task_identifier).or_default().push(ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
});
} else {
// if we dont have a task identifier, we process the event immediately
if let Err(e) = processor
.process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");

Check warning on line 715 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L715

Added line #L715 was not covered by tests
}
}
// }

Ok(())
}
Expand Down
11 changes: 6 additions & 5 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::{Error, Result};
use async_trait::async_trait;
Expand All @@ -23,7 +24,7 @@ const ENTITY_ID_INDEX: usize = 1;
const NUM_KEYS_INDEX: usize = 2;

#[async_trait]
pub trait EventProcessor<P>
pub trait EventProcessor<P>: Send + Sync
where
P: Provider + Sync,
{
Expand All @@ -48,7 +49,7 @@ where
}

#[async_trait]
pub trait BlockProcessor<P: Provider + Sync> {
pub trait BlockProcessor<P: Provider + Sync>: Send + Sync {
fn get_block_number(&self) -> String;
async fn process(
&self,
Expand All @@ -60,7 +61,7 @@ pub trait BlockProcessor<P: Provider + Sync> {
}

#[async_trait]
pub trait TransactionProcessor<P: Provider + Sync> {
pub trait TransactionProcessor<P: Provider + Sync>: Send + Sync {
#[allow(clippy::too_many_arguments)]
async fn process(
&self,
Expand All @@ -75,8 +76,8 @@ pub trait TransactionProcessor<P: Provider + Sync> {

/// Given a list of event processors, generate a map of event keys to the event processor
pub fn generate_event_processors_map<P: Provider + Sync + Send>(
event_processor: Vec<Box<dyn EventProcessor<P>>>,
) -> Result<HashMap<Felt, Box<dyn EventProcessor<P>>>> {
event_processor: Vec<Arc<dyn EventProcessor<P>>>,
) -> Result<HashMap<Felt, Arc<dyn EventProcessor<P>>>> {
let mut event_processors = HashMap::new();

for processor in event_processor {
Expand Down
Loading
Loading