Skip to content

Commit

Permalink
refactor(torii-core): enqueue models & events
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Sep 24, 2024
1 parent b598b07 commit d1990c0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 43 deletions.
26 changes: 26 additions & 0 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct DeleteEntityQuery {
pub enum QueryType {
SetEntity(Ty),
DeleteEntity(DeleteEntityQuery),
EventMessage(Ty),
RegisterModel,
StoreEvent,
Other,
}

Expand Down Expand Up @@ -137,6 +140,29 @@ impl QueryQueue {
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::RegisterModel => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let model_registered = ModelRegistered::from_row(&row)?;
self.push_publish(BrokerMessage::ModelRegistered(model_registered));
}
QueryType::EventMessage(entity) => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let mut event_message = EventMessageUpdated::from_row(&row)?;
event_message.updated_model = Some(entity);
let broker_message = BrokerMessage::EventMessageUpdated(event_message);
self.push_publish(broker_message);
}
QueryType::StoreEvent => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let event = EventEmitted::from_row(&row)?;
self.push_publish(BrokerMessage::EventEmitted(event));
}
QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
Expand Down
70 changes: 27 additions & 43 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use chrono::Utc;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{EnumOption, Member, Struct, Ty};
use dojo_world::contracts::abi::model::Layout;
Expand All @@ -16,11 +15,8 @@ use starknet_crypto::poseidon_hash_many;
use tracing::{debug, warn};

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType};
use crate::types::{
Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered,
};
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};
use crate::query_queue::{Argument, DeleteEntityQuery, QueryQueue, QueryType};
use crate::utils::utc_dt_string_from_timestamp;

type IsEventMessage = bool;
type IsStoreUpdate = bool;
Expand Down Expand Up @@ -173,19 +169,20 @@ impl Sql {
class_hash=EXCLUDED.class_hash, layout=EXCLUDED.layout, \
packed_size=EXCLUDED.packed_size, unpacked_size=EXCLUDED.unpacked_size, \
executed_at=EXCLUDED.executed_at RETURNING *";
let model_registered: ModelRegistered = sqlx::query_as(insert_models)
// this is temporary until the model hash is precomputed
.bind(format!("{:#x}", selector))
.bind(namespace)
.bind(model.name())
.bind(format!("{class_hash:#x}"))
.bind(format!("{contract_address:#x}"))
.bind(serde_json::to_string(&layout)?)
.bind(packed_size)
.bind(unpacked_size)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

let arguments = vec![
Argument::String(format!("{:#x}", selector)),
Argument::String(namespace.to_string()),
Argument::String(model.name().to_string()),
Argument::String(format!("{class_hash:#x}")),
Argument::String(format!("{contract_address:#x}")),
Argument::String(serde_json::to_string(&layout)?),
Argument::Int(packed_size as i64),
Argument::Int(unpacked_size as i64),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(insert_models, arguments, QueryType::RegisterModel);

let mut model_idx = 0_i64;
self.build_register_queries_recursive(
Expand Down Expand Up @@ -220,7 +217,6 @@ impl Sql {
},
)
.await;
self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered));

Ok(())
}
Expand Down Expand Up @@ -316,15 +312,16 @@ impl Sql {
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

event_message_updated.updated_model = Some(entity.clone());
self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::Other,
);

let path = vec![namespaced_name];
self.build_set_entity_queries_recursive(
Expand All @@ -336,8 +333,6 @@ impl Sql {
&vec![],
);

self.query_queue.push_publish(BrokerMessage::EventMessageUpdated(event_message_updated));

Ok(())
}

Expand Down Expand Up @@ -504,19 +499,8 @@ impl Sql {
"INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \
(?, ?, ?, ?, ?)",
vec![id, keys, data, hash, executed_at],
QueryType::Other,
QueryType::StoreEvent,
);

let emitted = EventEmitted {
id: event_id.to_string(),
keys: felts_sql_string(&event.keys),
data: felts_sql_string(&event.data),
transaction_hash: format!("{:#x}", transaction_hash),
created_at: Utc::now(),
executed_at: must_utc_datetime_from_timestamp(block_timestamp),
};

self.query_queue.push_publish(BrokerMessage::EventEmitted(emitted));
}

#[allow(clippy::too_many_arguments)]
Expand Down

0 comments on commit d1990c0

Please sign in to comment.