Skip to content

Commit

Permalink
refactor(torii): update query logic for them to not block
Browse files Browse the repository at this point in the history
commit-id:1b6c03a5
  • Loading branch information
lambda-0x committed Sep 13, 2024
1 parent ebb3b70 commit 3871fa8
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 73 deletions.
71 changes: 54 additions & 17 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::VecDeque;

use sqlx::{Executor, Pool, Sqlite};
use anyhow::{Context, Result};
use dojo_types::schema::Ty;
use sqlx::{FromRow, Pool, Sqlite};
use starknet::core::types::Felt;

use crate::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -29,34 +31,49 @@ pub enum BrokerMessage {
#[derive(Debug, Clone)]
pub struct QueryQueue {
pool: Pool<Sqlite>,
pub queue: VecDeque<(String, Vec<Argument>)>,
pub queue: VecDeque<(String, Vec<Argument>, QueryType)>,
// publishes that are related to queries in the queue, they should be sent
// after the queries are executed
pub publish_queue: VecDeque<BrokerMessage>,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetEntity(Ty),
Other,
}

impl QueryQueue {
pub fn new(pool: Pool<Sqlite>) -> Self {
QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() }
}

pub fn enqueue<S: Into<String>>(&mut self, statement: S, arguments: Vec<Argument>) {
self.queue.push_back((statement.into(), arguments));
pub fn enqueue<S: Into<String>>(
&mut self,
statement: S,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.queue.push_back((statement.into(), arguments, query_type));
}

pub fn push_front<S: Into<String>>(&mut self, statement: S, arguments: Vec<Argument>) {
self.queue.push_front((statement.into(), arguments));
pub fn push_front<S: Into<String>>(
&mut self,
statement: S,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.queue.push_front((statement.into(), arguments, query_type));
}

pub fn push_publish(&mut self, value: BrokerMessage) {
self.publish_queue.push_back(value);
}

pub async fn execute_all(&mut self) -> sqlx::Result<u64> {
let mut total_affected = 0_u64;
pub async fn execute_all(&mut self) -> Result<()> {
let mut tx = self.pool.begin().await?;

while let Some((statement, arguments)) = self.queue.pop_front() {
while let Some((statement, arguments, query_type)) = self.queue.pop_front() {
let mut query = sqlx::query(&statement);

for arg in &arguments {
Expand All @@ -69,20 +86,40 @@ impl QueryQueue {
}
}

total_affected += tx.execute(query).await?.rows_affected();
match query_type {
QueryType::SetEntity(entity) => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)

Check warning on line 92 in crates/torii/core/src/query_queue.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/query_queue.rs#L92

Added line #L92 was not covered by tests
})?;
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
send_broker_message(broker_message);
}
QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)

Check warning on line 102 in crates/torii/core/src/query_queue.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/query_queue.rs#L102

Added line #L102 was not covered by tests
})?;
}
}
}

tx.commit().await?;

while let Some(message) = self.publish_queue.pop_front() {
match message {
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
}
send_broker_message(message);
}

Ok(total_affected)
Ok(())
}
}

fn send_broker_message(message: BrokerMessage) {
match message {
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
}
}
114 changes: 70 additions & 44 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use starknet_crypto::poseidon_hash_many;
use tracing::debug;

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue, QueryType};
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Expand Down Expand Up @@ -53,6 +53,7 @@ impl Sql {
Argument::FieldElement(world_address),
Argument::String(WORLD_CONTRACT_TYPE.to_string()),
],
QueryType::Other,
);

query_queue.execute_all().await?;
Expand Down Expand Up @@ -106,6 +107,7 @@ impl Sql {
"UPDATE contracts SET head = ?, last_pending_block_world_tx = ?, \
last_pending_block_tx = ? WHERE id = ?",
vec![head, last_pending_block_world_tx, last_pending_block_tx, id],
QueryType::Other,
);
}

Expand Down Expand Up @@ -174,26 +176,29 @@ impl Sql {
let entity_id = format!("{:#x}", entity_id);
let model_id = format!("{:#x}", model_id);

let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id";

self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str.to_string()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::Other,
);

self.query_queue.enqueue(
"INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
QueryType::Other,
);

let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

entity_updated.updated_model = Some(entity.clone());

let path = vec![namespaced_name];
self.build_set_entity_queries_recursive(
path,
Expand All @@ -204,7 +209,12 @@ impl Sql {
&vec![],
);

self.query_queue.push_publish(BrokerMessage::EntityUpdated(entity_updated));
let query_entities_for_publish = "SELECT * FROM entities WHERE id = ?";
self.query_queue.enqueue(
query_entities_for_publish.to_string(),
vec![Argument::String(entity_id.clone())],
QueryType::SetEntity(entity.clone()),
);

Ok(())
}
Expand Down Expand Up @@ -235,6 +245,7 @@ impl Sql {
"INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
QueryType::Other,
);

let keys_str = felts_sql_string(&keys);
Expand Down Expand Up @@ -293,18 +304,18 @@ impl Sql {
);
self.execute().await?;

let mut update_entity = sqlx::query_as::<_, EntityUpdated>(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \
= ? RETURNING *",
)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.bind(event_id)
.bind(entity_id)
.fetch_one(&self.pool)
.await?;
let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \
event_id=? WHERE id = ? RETURNING *";

Check warning on line 308 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L307-L308

Added lines #L307 - L308 were not covered by tests

update_entity.updated_model = Some(wrapped_ty);
self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity));
self.query_queue.enqueue(
update_query.to_string(),
vec![
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(event_id.to_string()),
Argument::String(entity_id.clone()),
],
QueryType::SetEntity(wrapped_ty),
);

Check warning on line 318 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L310-L318

Added lines #L310 - L318 were not covered by tests

Ok(())
}
Expand Down Expand Up @@ -376,6 +387,7 @@ impl Sql {
UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \
updated_at=CURRENT_TIMESTAMP",
vec![resource, uri, executed_at],
QueryType::Other,
);
}

Expand Down Expand Up @@ -405,7 +417,7 @@ impl Sql {
let statement = format!("UPDATE metadata SET {} WHERE id = ?", update.join(","));
arguments.push(Argument::FieldElement(*resource));

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);

Ok(())
}
Expand Down Expand Up @@ -520,6 +532,7 @@ impl Sql {
Argument::String(transaction_type.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::Other,

Check warning on line 535 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L535

Added line #L535 was not covered by tests
);
}

Expand All @@ -540,6 +553,7 @@ impl Sql {
"INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \
(?, ?, ?, ?, ?)",
vec![id, keys, data, hash, executed_at],
QueryType::Other,
);

let emitted = EventEmitted {
Expand Down Expand Up @@ -728,7 +742,7 @@ impl Sql {
)
};

query_queue.enqueue(statement, arguments);
query_queue.enqueue(statement, arguments, QueryType::Other);
};

match entity {
Expand Down Expand Up @@ -826,7 +840,7 @@ impl Sql {
let mut arguments = vec![Argument::String(entity_id.to_string())];
arguments.extend(indexes.iter().map(|idx| Argument::Int(*idx)));

self.query_queue.enqueue(query, arguments);
self.query_queue.enqueue(query, arguments, QueryType::Other);

// insert the new array elements
for (idx, member) in array.iter().enumerate() {
Expand Down Expand Up @@ -865,8 +879,11 @@ impl Sql {
Ty::Struct(s) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue
.push_front(statement, vec![Argument::String(entity_id.to_string())]);
self.query_queue.push_front(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
);
for member in s.children.iter() {
let mut path_clone = path.clone();
path_clone.push(member.name.clone());
Expand All @@ -883,8 +900,11 @@ impl Sql {

let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue
.push_front(statement, vec![Argument::String(entity_id.to_string())]);
self.query_queue.push_front(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
);

for child in e.options.iter() {
if let Ty::Tuple(t) = &child.ty {
Expand All @@ -901,8 +921,11 @@ impl Sql {
Ty::Array(array) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue
.push_front(statement, vec![Argument::String(entity_id.to_string())]);
self.query_queue.push_front(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
);

for member in array.iter() {
let mut path_clone = path.clone();
Expand All @@ -913,8 +936,11 @@ impl Sql {
Ty::Tuple(t) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue
.push_front(statement, vec![Argument::String(entity_id.to_string())]);
self.query_queue.push_front(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
);

Check warning on line 943 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L939-L943

Added lines #L939 - L943 were not covered by tests

for (idx, member) in t.iter().enumerate() {
let mut path_clone = path.clone();
Expand Down Expand Up @@ -1028,7 +1054,7 @@ impl Sql {
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);
}
}
Ty::Tuple(tuple) => {
Expand Down Expand Up @@ -1056,7 +1082,7 @@ impl Sql {
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);

Check warning on line 1085 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L1085

Added line #L1085 was not covered by tests
}
}
Ty::Array(array) => {
Expand All @@ -1081,7 +1107,7 @@ impl Sql {
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);
}
Ty::Enum(e) => {
for (idx, child) in e
Expand Down Expand Up @@ -1120,7 +1146,7 @@ impl Sql {
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);
}
}
_ => {}
Expand Down Expand Up @@ -1159,10 +1185,10 @@ impl Sql {
create_table_query
.push_str("FOREIGN KEY (event_message_id) REFERENCES event_messages(id));");

self.query_queue.enqueue(create_table_query, vec![]);
self.query_queue.enqueue(create_table_query, vec![], QueryType::Other);

indices.iter().for_each(|s| {
self.query_queue.enqueue(s, vec![]);
self.query_queue.enqueue(s, vec![], QueryType::Other);
});
}

Expand Down
Loading

0 comments on commit 3871fa8

Please sign in to comment.