Skip to content

Commit

Permalink
refactor(torii): event messages share logic entities
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Jul 18, 2024
1 parent 5d8b5f9 commit f1c2c9b
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 103 deletions.
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ where
let mut entity = model.schema().await?;
entity.deserialize(&mut keys_and_unpacked)?;

db.set_event_message(entity, event_id, block_timestamp).await?;
db.set_entity(entity, true, event_id, block_timestamp).await?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
let mut entity = model.schema().await?;
entity.deserialize(&mut keys_and_unpacked)?;

db.set_entity(entity, event_id, block_timestamp).await?;
db.set_entity(entity, false, event_id, block_timestamp).await?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/torii/core/src/processors/store_update_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ where
let mut entity = model.schema().await?;
entity.deserialize(&mut keys_and_unpacked)?;

db.set_entity(entity, event_id, block_timestamp).await?;
db.set_entity(entity, false, event_id, block_timestamp).await?;
Ok(())
}
}
106 changes: 21 additions & 85 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ use super::World;
use crate::model::ModelSQLReader;
use crate::query_queue::{Argument, QueryQueue};
use crate::simple_broker::SimpleBroker;
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
};
use crate::types::{Entity as EntityUpdated, Event as EventEmitted, Model as ModelRegistered};
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};

pub const FELT_DELIMITER: &str = "/";
Expand Down Expand Up @@ -156,6 +153,7 @@ impl Sql {
pub async fn set_entity(
&mut self,
entity: Ty,
is_event_message: bool,
event_id: &str,
block_timestamp: u64,
) -> Result<()> {
Expand All @@ -172,7 +170,11 @@ impl Sql {
let namespaced_name = entity.name();
let (model_namespace, model_name) = namespaced_name.split_once('-').unwrap();

let entity_id = format!("{:#x}", poseidon_hash_many(&keys));
let entity_id = format!(
"{}{:#x}",
if is_event_message { "event:" } else { "" },
poseidon_hash_many(&keys)
);
let model_id = format!("{:#x}", compute_selector_from_names(model_namespace, model_name));

self.query_queue.enqueue(
Expand All @@ -186,7 +188,7 @@ impl Sql {
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities)
let mut entity_updated: EntityUpdated = sqlx::query_as(&insert_entities)
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
Expand All @@ -200,74 +202,14 @@ impl Sql {
self.build_set_entity_queries_recursive(
path,
event_id,
(&entity_id, false),
&entity_id,
&entity,
block_timestamp,
&vec![],
);
self.query_queue.execute_all().await?;

SimpleBroker::publish(entity_updated);

Ok(())
}

pub async fn set_event_message(
&mut self,
entity: Ty,
event_id: &str,
block_timestamp: u64,
) -> Result<()> {
let keys = if let Ty::Struct(s) = &entity {
let mut keys = Vec::new();
for m in s.keys() {
keys.extend(m.serialize()?);
}
keys
} else {
return Err(anyhow!("Entity is not a struct"));
};

let namespaced_name = entity.name();
let (model_namespace, model_name) = namespaced_name.split_once('-').unwrap();

let entity_id = format!("{:#x}", poseidon_hash_many(&keys));
let model_id = format!("{:#x}", compute_selector_from_names(model_namespace, model_name));

self.query_queue.enqueue(
"INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
);

let keys_str = felts_sql_string(&keys);
let insert_entities = "INSERT INTO event_messages (id, keys, event_id, executed_at) \
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, event_id=EXCLUDED.event_id RETURNING \
*";
let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

event_message_updated.updated_model = Some(entity.clone());

let path = vec![namespaced_name];
self.build_set_entity_queries_recursive(
path,
event_id,
(&entity_id, true),
&entity,
block_timestamp,
&vec![],
);
self.query_queue.execute_all().await?;

SimpleBroker::publish(event_message_updated);

Ok(())
}

Expand Down Expand Up @@ -561,13 +503,11 @@ impl Sql {
path: Vec<String>,
event_id: &str,
// The id of the entity and if the entity is an event message
entity_id: (&str, bool),
entity_id: &str,
entity: &Ty,
block_timestamp: u64,
indexes: &Vec<i64>,
) {
let (entity_id, is_event_message) = entity_id;

let update_members =
|members: &[Member], query_queue: &mut QueryQueue, indexes: &Vec<i64>| {
let table_id = path.join("$");
Expand All @@ -576,19 +516,11 @@ impl Sql {
"event_id".to_string(),
"executed_at".to_string(),
"updated_at".to_string(),
if is_event_message {
"event_message_id".to_string()
} else {
"entity_id".to_string()
},
"entity_id".to_string(),
];

let mut arguments = vec![
Argument::String(if is_event_message {
"event:".to_string() + entity_id
} else {
entity_id.to_string()
}),
Argument::String(entity_id.to_string()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(chrono::Utc::now().to_rfc3339()),
Expand Down Expand Up @@ -648,7 +580,7 @@ impl Sql {
self.build_set_entity_queries_recursive(
path_clone,
event_id,
(entity_id, is_event_message),
entity_id,
&member.ty,
block_timestamp,
indexes,
Expand All @@ -658,7 +590,11 @@ impl Sql {
Ty::Enum(e) => {
if e.options.iter().all(
|o| {
if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }
if let Ty::Tuple(t) = &o.ty {
t.is_empty()
} else {
false
}
},
) {
return;
Expand All @@ -684,7 +620,7 @@ impl Sql {
self.build_set_entity_queries_recursive(
path_clone,
event_id,
(entity_id, is_event_message),
entity_id,
&option.ty,
block_timestamp,
indexes,
Expand Down Expand Up @@ -713,7 +649,7 @@ impl Sql {
self.build_set_entity_queries_recursive(
path_clone,
event_id,
(entity_id, is_event_message),
entity_id,
member,
block_timestamp,
indexes,
Expand Down Expand Up @@ -751,7 +687,7 @@ impl Sql {
self.build_set_entity_queries_recursive(
path_clone,
event_id,
(entity_id, is_event_message),
entity_id,
member,
block_timestamp,
&indexes,
Expand Down
15 changes: 0 additions & 15 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,6 @@ pub struct Entity {
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct EventMessage {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,

// this should never be None. as a EventMessage cannot be deleted
#[sqlx(skip)]
pub updated_model: Option<Ty>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Model {
Expand Down

0 comments on commit f1c2c9b

Please sign in to comment.