Skip to content

Commit

Permalink
refactor(torii): queries for them to not block
Browse files Browse the repository at this point in the history
commit-id:1b6c03a5
  • Loading branch information
lambda-0x committed Sep 8, 2024
1 parent 6b9da83 commit adae7e8
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 35 deletions.
72 changes: 64 additions & 8 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::VecDeque;

use anyhow::{Context, Result};
use dojo_types::schema::Ty;
use sqlx::{Executor, Pool, Sqlite};
use starknet::core::types::Felt;

Expand Down Expand Up @@ -33,11 +35,22 @@ pub struct QueryQueue {
// publishes that are related to queries in the queue, they should be sent
// after the queries are executed
pub publish_queue: VecDeque<BrokerMessage>,
pub publish_queries: VecDeque<(String, Vec<Argument>, QueryType)>,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetEntity(Ty),
}

impl QueryQueue {
pub fn new(pool: Pool<Sqlite>) -> Self {
QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() }
QueryQueue {
pool,
queue: VecDeque::new(),
publish_queue: VecDeque::new(),
publish_queries: VecDeque::new(),
}
}

pub fn enqueue<S: Into<String>>(&mut self, statement: S, arguments: Vec<Argument>) {
Expand All @@ -52,7 +65,16 @@ impl QueryQueue {
self.publish_queue.push_back(value);
}

pub async fn execute_all(&mut self) -> sqlx::Result<u64> {
pub fn push_publish_query(
&mut self,
statement: String,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.publish_queries.push_back((statement, arguments, query_type));
}

pub async fn execute_all(&mut self) -> Result<u64> {
let mut total_affected = 0_u64;
let mut tx = self.pool.begin().await?;

Expand All @@ -69,20 +91,54 @@ impl QueryQueue {
}
}

total_affected += tx.execute(query).await?.rows_affected();
total_affected += tx
.execute(query)
.await
.with_context(|| format!("Failed to execute query: {}", statement))?
.rows_affected();
}

tx.commit().await?;

while let Some(message) = self.publish_queue.pop_front() {
match message {
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
send_broker_message(message);
}

while let Some((statement, arguments, query_type)) = self.publish_queries.pop_front() {
let mut query = sqlx::query_as(&statement);
for arg in &arguments {
query = match arg {
Argument::Null => query.bind(None::<String>),
Argument::Int(integer) => query.bind(integer),
Argument::Bool(bool) => query.bind(bool),
Argument::String(string) => query.bind(string),
Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)),
}
}

let broker_message = match query_type {
QueryType::SetEntity(entity) => {
let mut result: EntityUpdated = query
.fetch_one(&self.pool)
.await
.with_context(|| format!("Failed to fetch entity: {}", statement))?;
result.updated_model = Some(entity);
result.deleted = false;
BrokerMessage::EntityUpdated(result)
}
};
send_broker_message(broker_message);
}

Ok(total_affected)
}
}

fn send_broker_message(message: BrokerMessage) {
match message {
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
}
}
60 changes: 33 additions & 27 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use starknet_crypto::poseidon_hash_many;
use tracing::debug;

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue, QueryType};
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Expand Down Expand Up @@ -174,26 +174,27 @@ impl Sql {
let entity_id = format!("{:#x}", entity_id);
let model_id = format!("{:#x}", model_id);

let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id";

self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str.to_string()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
);

self.query_queue.enqueue(
"INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
);

let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

entity_updated.updated_model = Some(entity.clone());

let path = vec![namespaced_name];
self.build_set_entity_queries_recursive(
path,
Expand All @@ -204,7 +205,12 @@ impl Sql {
&vec![],
);

self.query_queue.push_publish(BrokerMessage::EntityUpdated(entity_updated));
let query_entities_for_publish = "SELECT * FROM entities WHERE id = ?";
self.query_queue.push_publish_query(
query_entities_for_publish.to_string(),
vec![Argument::String(entity_id.clone())],
QueryType::SetEntity(entity.clone()),
);

Ok(())
}
Expand Down Expand Up @@ -293,18 +299,18 @@ impl Sql {
);
self.execute().await?;

let mut update_entity = sqlx::query_as::<_, EntityUpdated>(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \
= ? RETURNING *",
)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.bind(event_id)
.bind(entity_id)
.fetch_one(&self.pool)
.await?;
let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \
event_id=? WHERE id = ? RETURNING *";

update_entity.updated_model = Some(wrapped_ty);
self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity));
self.query_queue.push_publish_query(
update_query.to_string(),
vec![
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(event_id.to_string()),
Argument::String(entity_id.clone()),
],
QueryType::SetEntity(wrapped_ty),
);

Ok(())
}
Expand Down

0 comments on commit adae7e8

Please sign in to comment.