Skip to content

Commit

Permalink
refactor(torii-core): enqueue models & events (#2471)
Browse files Browse the repository at this point in the history
* refactor(torii-core): enqueue models & events

* scoped publish queue

* fix: store event

* fix: event message
  • Loading branch information
Larkooo authored Sep 24, 2024
1 parent 87ad4ab commit 8f4bcbb
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 62 deletions.
44 changes: 33 additions & 11 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ pub enum BrokerMessage {
pub struct QueryQueue {
pool: Pool<Sqlite>,
pub queue: VecDeque<(String, Vec<Argument>, QueryType)>,
// publishes that are related to queries in the queue, they should be sent
// after the queries are executed
pub publish_queue: VecDeque<BrokerMessage>,
}

#[derive(Debug, Clone)]
Expand All @@ -49,12 +46,15 @@ pub struct DeleteEntityQuery {
pub enum QueryType {
SetEntity(Ty),
DeleteEntity(DeleteEntityQuery),
EventMessage(Ty),
RegisterModel,
StoreEvent,
Other,
}

impl QueryQueue {
pub fn new(pool: Pool<Sqlite>) -> Self {
QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() }
QueryQueue { pool, queue: VecDeque::new() }
}

pub fn enqueue<S: Into<String>>(
Expand All @@ -66,12 +66,11 @@ impl QueryQueue {
self.queue.push_back((statement.into(), arguments, query_type));
}

pub fn push_publish(&mut self, value: BrokerMessage) {
self.publish_queue.push_back(value);
}

pub async fn execute_all(&mut self) -> Result<()> {
let mut tx = self.pool.begin().await?;
// publishes that are related to queries in the queue, they should be sent
// after the queries are executed
let mut publish_queue = VecDeque::new();

while let Some((statement, arguments, query_type)) = self.queue.pop_front() {
let mut query = sqlx::query(&statement);
Expand All @@ -95,7 +94,7 @@ impl QueryQueue {
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
publish_queue.push_back(broker_message);
}
QueryType::DeleteEntity(entity) => {
let delete_model = query.execute(&mut *tx).await.with_context(|| {
Expand Down Expand Up @@ -135,7 +134,30 @@ impl QueryQueue {
}

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
publish_queue.push_back(broker_message);
}
QueryType::RegisterModel => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let model_registered = ModelRegistered::from_row(&row)?;
publish_queue.push_back(BrokerMessage::ModelRegistered(model_registered));
}
QueryType::EventMessage(entity) => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let mut event_message = EventMessageUpdated::from_row(&row)?;
event_message.updated_model = Some(entity);
let broker_message = BrokerMessage::EventMessageUpdated(event_message);
publish_queue.push_back(broker_message);
}
QueryType::StoreEvent => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let event = EventEmitted::from_row(&row)?;
publish_queue.push_back(BrokerMessage::EventEmitted(event));
}
QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
Expand All @@ -147,7 +169,7 @@ impl QueryQueue {

tx.commit().await?;

while let Some(message) = self.publish_queue.pop_front() {
while let Some(message) = publish_queue.pop_front() {
send_broker_message(message);
}

Expand Down
84 changes: 33 additions & 51 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use chrono::Utc;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{EnumOption, Member, Struct, Ty};
use dojo_world::contracts::abi::model::Layout;
Expand All @@ -16,11 +15,8 @@ use starknet_crypto::poseidon_hash_many;
use tracing::{debug, warn};

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType};
use crate::types::{
Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered,
};
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};
use crate::query_queue::{Argument, DeleteEntityQuery, QueryQueue, QueryType};
use crate::utils::utc_dt_string_from_timestamp;

type IsEventMessage = bool;
type IsStoreUpdate = bool;
Expand Down Expand Up @@ -79,7 +75,6 @@ impl Sql {
pub fn merge(&mut self, other: Sql) -> Result<()> {
// Merge query queue
self.query_queue.queue.extend(other.query_queue.queue);
self.query_queue.publish_queue.extend(other.query_queue.publish_queue);

// This should never happen
if self.world_address != other.world_address {
Expand Down Expand Up @@ -173,19 +168,20 @@ impl Sql {
class_hash=EXCLUDED.class_hash, layout=EXCLUDED.layout, \
packed_size=EXCLUDED.packed_size, unpacked_size=EXCLUDED.unpacked_size, \
executed_at=EXCLUDED.executed_at RETURNING *";
let model_registered: ModelRegistered = sqlx::query_as(insert_models)
// this is temporary until the model hash is precomputed
.bind(format!("{:#x}", selector))
.bind(namespace)
.bind(model.name())
.bind(format!("{class_hash:#x}"))
.bind(format!("{contract_address:#x}"))
.bind(serde_json::to_string(&layout)?)
.bind(packed_size)
.bind(unpacked_size)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

let arguments = vec![
Argument::String(format!("{:#x}", selector)),
Argument::String(namespace.to_string()),
Argument::String(model.name().to_string()),
Argument::String(format!("{class_hash:#x}")),
Argument::String(format!("{contract_address:#x}")),
Argument::String(serde_json::to_string(&layout)?),
Argument::Int(packed_size as i64),
Argument::Int(unpacked_size as i64),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(insert_models, arguments, QueryType::RegisterModel);

let mut model_idx = 0_i64;
self.build_register_queries_recursive(
Expand Down Expand Up @@ -220,7 +216,6 @@ impl Sql {
},
)
.await;
self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered));

Ok(())
}
Expand Down Expand Up @@ -304,28 +299,28 @@ impl Sql {
let entity_id = format!("{:#x}", poseidon_hash_many(&keys));
let model_id = format!("{:#x}", compute_selector_from_names(model_namespace, model_name));

let keys_str = felts_sql_string(&keys);
let insert_entities = "INSERT INTO event_messages (id, keys, event_id, executed_at) \
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::Other,
);
self.query_queue.enqueue(
"INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
QueryType::Other,
);

let keys_str = felts_sql_string(&keys);
let insert_entities = "INSERT INTO event_messages (id, keys, event_id, executed_at) \
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

event_message_updated.updated_model = Some(entity.clone());

let path = vec![namespaced_name];
self.build_set_entity_queries_recursive(
path,
Expand All @@ -336,8 +331,6 @@ impl Sql {
&vec![],
);

self.query_queue.push_publish(BrokerMessage::EventMessageUpdated(event_message_updated));

Ok(())
}

Expand Down Expand Up @@ -502,21 +495,10 @@ impl Sql {

self.query_queue.enqueue(
"INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \
(?, ?, ?, ?, ?)",
(?, ?, ?, ?, ?) RETURNING *",
vec![id, keys, data, hash, executed_at],
QueryType::Other,
QueryType::StoreEvent,
);

let emitted = EventEmitted {
id: event_id.to_string(),
keys: felts_sql_string(&event.keys),
data: felts_sql_string(&event.data),
transaction_hash: format!("{:#x}", transaction_hash),
created_at: Utc::now(),
executed_at: must_utc_datetime_from_timestamp(block_timestamp),
};

self.query_queue.push_publish(BrokerMessage::EventEmitted(emitted));
}

#[allow(clippy::too_many_arguments)]
Expand Down

0 comments on commit 8f4bcbb

Please sign in to comment.