From 8cf445269dd509db88f50119a5868f6268f36ec8 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 25 Sep 2024 17:59:24 -0400 Subject: [PATCH] non bloking execute engine --- crates/torii/core/src/engine.rs | 3 ++- crates/torii/core/src/executor.rs | 45 ++++++++++++++++++++++--------- crates/torii/core/src/sql.rs | 7 +++-- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index c54e641a9b..54f16af359 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -21,6 +21,7 @@ use tokio::sync::Semaphore; use tokio::time::{sleep, Instant}; use tracing::{debug, error, info, trace, warn}; +use crate::executor::QueryMessage; use crate::processors::event_message::EventMessageProcessor; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; use crate::sql::Sql; @@ -179,7 +180,7 @@ impl Engine

{ } match self.process(fetch_result).await { - Ok(()) => self.db.execute().await?, + Ok(()) => self.db.executor.send(QueryMessage::execute())?, Err(e) => { error!(target: LOG_TARGET, error = %e, "Processing fetched data."); erroring_out = true; diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 03451259b0..3a238cea06 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -49,7 +49,7 @@ pub enum QueryType { EventMessage(Ty), RegisterModel, StoreEvent, - Execute(oneshot::Sender>), + Execute, Other, } @@ -67,23 +67,40 @@ pub struct QueryMessage { pub statement: String, pub arguments: Vec, pub query_type: QueryType, + tx: Option>>, } impl QueryMessage { pub fn new(statement: String, arguments: Vec, query_type: QueryType) -> Self { - Self { statement, arguments, query_type } + Self { statement, arguments, query_type, tx: None } + } + + pub fn new_recv(statement: String, arguments: Vec, query_type: QueryType) -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + (Self { statement, arguments, query_type, tx: Some(tx) }, rx) } pub fn other(statement: String, arguments: Vec) -> Self { - Self { statement, arguments, query_type: QueryType::Other } + Self { statement, arguments, query_type: QueryType::Other, tx: None } + } + + pub fn other_recv(statement: String, arguments: Vec) -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + (Self { statement, arguments, query_type: QueryType::Other, tx: Some(tx) }, rx) } - pub fn execute(sender: oneshot::Sender>) -> Self { - Self { + pub fn execute() -> Self { + Self { statement: "".to_string(), arguments: vec![], query_type: QueryType::Execute, tx: None } + } + + pub fn execute_recv() -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + (Self { statement: "".to_string(), arguments: vec![], - query_type: QueryType::Execute(sender), - } + query_type: QueryType::Execute, + tx: Some(tx), + }, rx) } } @@ -108,7 +125,7 @@ impl<'c> Executor<'c> { break Ok(()); } Some(msg) = self.rx.recv() => { - let QueryMessage { statement, arguments, query_type } = msg; + let QueryMessage { statement, arguments, query_type, tx } = msg; let mut query = sqlx::query(&statement); for arg in &arguments { @@ -121,7 +138,7 @@ impl<'c> Executor<'c> { } } - self.handle_query_type(query, query_type, &statement, &arguments).await?; + self.handle_query_type(query, query_type, &statement, &arguments, tx).await?; } } } @@ -133,6 +150,7 @@ impl<'c> Executor<'c> { query_type: QueryType, statement: &str, arguments: &[Argument], + sender: Option>>, ) -> Result<()> { let tx = &mut self.transaction; @@ -210,10 +228,11 @@ impl<'c> Executor<'c> { let event = EventEmitted::from_row(&row)?; self.publish_queue.push_back(BrokerMessage::EventEmitted(event)); } - QueryType::Execute(sender) => { - sender - .send(self.execute().await) - .map_err(|_| anyhow::anyhow!("Failed to send execute result"))?; + QueryType::Execute => { + let res = self.execute().await; + if let Some(sender) = sender { + sender.send(res).map_err(|_| anyhow::anyhow!("Failed to send execute result"))?; + } } QueryType::Other => { query.execute(&mut **tx).await.with_context(|| { diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index de493b4859..7bee0d10ac 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -13,7 +13,6 @@ use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::oneshot; use crate::cache::{Model, ModelCache}; use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType}; @@ -1128,9 +1127,9 @@ impl Sql { } pub async fn execute(&self) -> Result<()> { - let (sender, receiver) = oneshot::channel(); - self.executor.send(QueryMessage::execute(sender))?; - receiver.await? + let (execute, recv) = QueryMessage::execute_recv(); + self.executor.send(execute)?; + recv.await? } }