Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii): update query logic for them to not block #2397

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 54 additions & 17 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::VecDeque;

use sqlx::{Executor, Pool, Sqlite};
use anyhow::{Context, Result};
use dojo_types::schema::Ty;
use sqlx::{FromRow, Pool, Sqlite};
use starknet::core::types::Felt;

use crate::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -29,34 +31,49 @@
#[derive(Debug, Clone)]
pub struct QueryQueue {
pool: Pool<Sqlite>,
pub queue: VecDeque<(String, Vec<Argument>)>,
pub queue: VecDeque<(String, Vec<Argument>, QueryType)>,
// publishes that are related to queries in the queue, they should be sent
// after the queries are executed
pub publish_queue: VecDeque<BrokerMessage>,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetEntity(Ty),
Other,
}

impl QueryQueue {
pub fn new(pool: Pool<Sqlite>) -> Self {
QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() }
}

pub fn enqueue<S: Into<String>>(&mut self, statement: S, arguments: Vec<Argument>) {
self.queue.push_back((statement.into(), arguments));
pub fn enqueue<S: Into<String>>(
&mut self,
statement: S,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.queue.push_back((statement.into(), arguments, query_type));
}

pub fn push_front<S: Into<String>>(&mut self, statement: S, arguments: Vec<Argument>) {
self.queue.push_front((statement.into(), arguments));
pub fn push_front<S: Into<String>>(
&mut self,
statement: S,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.queue.push_front((statement.into(), arguments, query_type));
}

pub fn push_publish(&mut self, value: BrokerMessage) {
self.publish_queue.push_back(value);
}

pub async fn execute_all(&mut self) -> sqlx::Result<u64> {
let mut total_affected = 0_u64;
pub async fn execute_all(&mut self) -> Result<()> {
let mut tx = self.pool.begin().await?;

while let Some((statement, arguments)) = self.queue.pop_front() {
while let Some((statement, arguments, query_type)) = self.queue.pop_front() {
let mut query = sqlx::query(&statement);

for arg in &arguments {
Expand All @@ -69,20 +86,40 @@
}
}

total_affected += tx.execute(query).await?.rows_affected();
match query_type {
QueryType::SetEntity(entity) => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)

Check warning on line 92 in crates/torii/core/src/query_queue.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/query_queue.rs#L92

Added line #L92 was not covered by tests
})?;
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)

Check warning on line 102 in crates/torii/core/src/query_queue.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/query_queue.rs#L102

Added line #L102 was not covered by tests
})?;
}
}
}

tx.commit().await?;

while let Some(message) = self.publish_queue.pop_front() {
match message {
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
}
send_broker_message(message);
}

Ok(total_affected)
Ok(())
}
}

fn send_broker_message(message: BrokerMessage) {
match message {
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
}
}
109 changes: 64 additions & 45 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use tracing::debug;

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue, QueryType};
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Expand Down Expand Up @@ -53,6 +53,7 @@
Argument::FieldElement(world_address),
Argument::String(WORLD_CONTRACT_TYPE.to_string()),
],
QueryType::Other,
);

query_queue.execute_all().await?;
Expand Down Expand Up @@ -106,6 +107,7 @@
"UPDATE contracts SET head = ?, last_pending_block_world_tx = ?, \
last_pending_block_tx = ? WHERE id = ?",
vec![head, last_pending_block_world_tx, last_pending_block_tx, id],
QueryType::Other,
);
}

Expand Down Expand Up @@ -174,25 +176,28 @@
let entity_id = format!("{:#x}", entity_id);
let model_id = format!("{:#x}", model_id);

self.query_queue.enqueue(
"INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
);

let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

entity_updated.updated_model = Some(entity.clone());
self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str.to_string()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::SetEntity(entity.clone()),
);

self.query_queue.enqueue(
"INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
QueryType::Other,
);

let path = vec![namespaced_name];
self.build_set_entity_queries_recursive(
Expand All @@ -204,8 +209,6 @@
&vec![],
);

self.query_queue.push_publish(BrokerMessage::EntityUpdated(entity_updated));

Ok(())
}

Expand Down Expand Up @@ -235,6 +238,7 @@
"INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
QueryType::Other,
);

let keys_str = felts_sql_string(&keys);
Expand Down Expand Up @@ -293,18 +297,18 @@
);
self.execute().await?;

let mut update_entity = sqlx::query_as::<_, EntityUpdated>(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \
= ? RETURNING *",
)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.bind(event_id)
.bind(entity_id)
.fetch_one(&self.pool)
.await?;
let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \
event_id=? WHERE id = ? RETURNING *";

Check warning on line 301 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L300-L301

Added lines #L300 - L301 were not covered by tests

update_entity.updated_model = Some(wrapped_ty);
self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity));
self.query_queue.enqueue(
update_query.to_string(),
vec![
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(event_id.to_string()),
Argument::String(entity_id.clone()),
],
QueryType::SetEntity(wrapped_ty),
);

Check warning on line 311 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L303-L311

Added lines #L303 - L311 were not covered by tests

Ok(())
}
Expand Down Expand Up @@ -376,6 +380,7 @@
UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \
updated_at=CURRENT_TIMESTAMP",
vec![resource, uri, executed_at],
QueryType::Other,
);
}

Expand Down Expand Up @@ -405,7 +410,7 @@
let statement = format!("UPDATE metadata SET {} WHERE id = ?", update.join(","));
arguments.push(Argument::FieldElement(*resource));

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);

Ok(())
}
Expand Down Expand Up @@ -520,6 +525,7 @@
Argument::String(transaction_type.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::Other,

Check warning on line 528 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L528

Added line #L528 was not covered by tests
);
}

Expand All @@ -540,6 +546,7 @@
"INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \
(?, ?, ?, ?, ?)",
vec![id, keys, data, hash, executed_at],
QueryType::Other,
);

let emitted = EventEmitted {
Expand Down Expand Up @@ -728,7 +735,7 @@
)
};

query_queue.enqueue(statement, arguments);
query_queue.enqueue(statement, arguments, QueryType::Other);
};

match entity {
Expand Down Expand Up @@ -826,7 +833,7 @@
let mut arguments = vec![Argument::String(entity_id.to_string())];
arguments.extend(indexes.iter().map(|idx| Argument::Int(*idx)));

self.query_queue.enqueue(query, arguments);
self.query_queue.enqueue(query, arguments, QueryType::Other);

// insert the new array elements
for (idx, member) in array.iter().enumerate() {
Expand Down Expand Up @@ -865,8 +872,11 @@
Ty::Struct(s) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue
.push_front(statement, vec![Argument::String(entity_id.to_string())]);
self.query_queue.push_front(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
);
for member in s.children.iter() {
let mut path_clone = path.clone();
path_clone.push(member.name.clone());
Expand All @@ -883,8 +893,11 @@

let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue
.push_front(statement, vec![Argument::String(entity_id.to_string())]);
self.query_queue.push_front(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
);

for child in e.options.iter() {
if let Ty::Tuple(t) = &child.ty {
Expand All @@ -901,8 +914,11 @@
Ty::Array(array) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue
.push_front(statement, vec![Argument::String(entity_id.to_string())]);
self.query_queue.push_front(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
);

for member in array.iter() {
let mut path_clone = path.clone();
Expand All @@ -913,8 +929,11 @@
Ty::Tuple(t) => {
let table_id = path.join("$");
let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
self.query_queue
.push_front(statement, vec![Argument::String(entity_id.to_string())]);
self.query_queue.push_front(
statement,
vec![Argument::String(entity_id.to_string())],
QueryType::Other,
);

Check warning on line 936 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L932-L936

Added lines #L932 - L936 were not covered by tests

for (idx, member) in t.iter().enumerate() {
let mut path_clone = path.clone();
Expand Down Expand Up @@ -1028,7 +1047,7 @@
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);
}
}
Ty::Tuple(tuple) => {
Expand Down Expand Up @@ -1056,7 +1075,7 @@
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);

Check warning on line 1078 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L1078

Added line #L1078 was not covered by tests
}
}
Ty::Array(array) => {
Expand All @@ -1081,7 +1100,7 @@
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);
}
Ty::Enum(e) => {
for (idx, child) in e
Expand Down Expand Up @@ -1120,7 +1139,7 @@
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(statement, arguments);
self.query_queue.enqueue(statement, arguments, QueryType::Other);
}
}
_ => {}
Expand Down Expand Up @@ -1159,10 +1178,10 @@
create_table_query
.push_str("FOREIGN KEY (event_message_id) REFERENCES event_messages(id));");

self.query_queue.enqueue(create_table_query, vec![]);
self.query_queue.enqueue(create_table_query, vec![], QueryType::Other);

indices.iter().for_each(|s| {
self.query_queue.enqueue(s, vec![]);
self.query_queue.enqueue(s, vec![], QueryType::Other);
});
}

Expand Down
Loading
Loading