Skip to content

Commit

Permalink
non bloking execute engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Sep 25, 2024
1 parent a7e4f1f commit 8cf4452
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
3 changes: 2 additions & 1 deletion crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::sync::Semaphore;
use tokio::time::{sleep, Instant};
use tracing::{debug, error, info, trace, warn};

use crate::executor::QueryMessage;
use crate::processors::event_message::EventMessageProcessor;
use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::sql::Sql;
Expand Down Expand Up @@ -179,7 +180,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

match self.process(fetch_result).await {
Ok(()) => self.db.execute().await?,
Ok(()) => self.db.executor.send(QueryMessage::execute())?,
Err(e) => {
error!(target: LOG_TARGET, error = %e, "Processing fetched data.");
erroring_out = true;
Expand Down
45 changes: 32 additions & 13 deletions crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub enum QueryType {
EventMessage(Ty),
RegisterModel,
StoreEvent,
Execute(oneshot::Sender<Result<()>>),
Execute,
Other,
}

Expand All @@ -67,23 +67,40 @@ pub struct QueryMessage {
pub statement: String,
pub arguments: Vec<Argument>,
pub query_type: QueryType,
tx: Option<oneshot::Sender<Result<()>>>,
}

impl QueryMessage {
pub fn new(statement: String, arguments: Vec<Argument>, query_type: QueryType) -> Self {
Self { statement, arguments, query_type }
Self { statement, arguments, query_type, tx: None }
}

pub fn new_recv(statement: String, arguments: Vec<Argument>, query_type: QueryType) -> (Self, oneshot::Receiver<Result<()>>) {
let (tx, rx) = oneshot::channel();
(Self { statement, arguments, query_type, tx: Some(tx) }, rx)
}

pub fn other(statement: String, arguments: Vec<Argument>) -> Self {
Self { statement, arguments, query_type: QueryType::Other }
Self { statement, arguments, query_type: QueryType::Other, tx: None }
}

pub fn other_recv(statement: String, arguments: Vec<Argument>) -> (Self, oneshot::Receiver<Result<()>>) {
let (tx, rx) = oneshot::channel();
(Self { statement, arguments, query_type: QueryType::Other, tx: Some(tx) }, rx)
}

pub fn execute(sender: oneshot::Sender<Result<()>>) -> Self {
Self {
pub fn execute() -> Self {
Self { statement: "".to_string(), arguments: vec![], query_type: QueryType::Execute, tx: None }
}

pub fn execute_recv() -> (Self, oneshot::Receiver<Result<()>>) {
let (tx, rx) = oneshot::channel();
(Self {
statement: "".to_string(),
arguments: vec![],
query_type: QueryType::Execute(sender),
}
query_type: QueryType::Execute,
tx: Some(tx),
}, rx)
}
}

Expand All @@ -108,7 +125,7 @@ impl<'c> Executor<'c> {
break Ok(());
}
Some(msg) = self.rx.recv() => {
let QueryMessage { statement, arguments, query_type } = msg;
let QueryMessage { statement, arguments, query_type, tx } = msg;
let mut query = sqlx::query(&statement);

for arg in &arguments {
Expand All @@ -121,7 +138,7 @@ impl<'c> Executor<'c> {
}
}

self.handle_query_type(query, query_type, &statement, &arguments).await?;
self.handle_query_type(query, query_type, &statement, &arguments, tx).await?;
}
}
}
Expand All @@ -133,6 +150,7 @@ impl<'c> Executor<'c> {
query_type: QueryType,
statement: &str,
arguments: &[Argument],
sender: Option<oneshot::Sender<Result<()>>>,
) -> Result<()> {
let tx = &mut self.transaction;

Expand Down Expand Up @@ -210,10 +228,11 @@ impl<'c> Executor<'c> {
let event = EventEmitted::from_row(&row)?;
self.publish_queue.push_back(BrokerMessage::EventEmitted(event));
}
QueryType::Execute(sender) => {
sender
.send(self.execute().await)
.map_err(|_| anyhow::anyhow!("Failed to send execute result"))?;
QueryType::Execute => {
let res = self.execute().await;
if let Some(sender) = sender {
sender.send(res).map_err(|_| anyhow::anyhow!("Failed to send execute result"))?;
}
}
QueryType::Other => {
query.execute(&mut **tx).await.with_context(|| {
Expand Down
7 changes: 3 additions & 4 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use sqlx::{Pool, Sqlite};
use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction};
use starknet_crypto::poseidon_hash_many;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;

use crate::cache::{Model, ModelCache};
use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType};
Expand Down Expand Up @@ -1128,9 +1127,9 @@ impl Sql {
}

pub async fn execute(&self) -> Result<()> {
let (sender, receiver) = oneshot::channel();
self.executor.send(QueryMessage::execute(sender))?;
receiver.await?
let (execute, recv) = QueryMessage::execute_recv();
self.executor.send(execute)?;
recv.await?
}
}

Expand Down

0 comments on commit 8cf4452

Please sign in to comment.