Skip to content

Commit

Permalink
fix: event message subscription subscription broker (#1885)
Browse files Browse the repository at this point in the history
* fix: updates bug. use own eventmessage type for broker

* fmt
  • Loading branch information
Larkooo authored Apr 26, 2024
1 parent c8815a9 commit 5f421e1
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 16 deletions.
9 changes: 6 additions & 3 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use super::World;
use crate::model::ModelSQLReader;
use crate::query_queue::{Argument, QueryQueue};
use crate::simple_broker::SimpleBroker;
use crate::types::{Entity as EntityUpdated, Event as EventEmitted, Model as ModelRegistered};
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
};
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};

pub const FELT_DELIMITER: &str = "/";
Expand Down Expand Up @@ -212,7 +215,7 @@ impl Sql {
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, event_id=EXCLUDED.event_id RETURNING \
*";
let entity_updated: EntityUpdated = sqlx::query_as(insert_entities)
let event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
Expand All @@ -231,7 +234,7 @@ impl Sql {
);
self.query_queue.execute_all().await?;

SimpleBroker::publish(entity_updated);
SimpleBroker::publish(event_message_updated);

Ok(())
}
Expand Down
11 changes: 11 additions & 0 deletions crates/torii/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ pub struct Entity {
pub updated_at: DateTime<Utc>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct EventMessage {
pub id: String,
pub keys: String,
pub event_id: String,
pub executed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Model {
Expand Down
24 changes: 14 additions & 10 deletions crates/torii/graphql/src/object/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use sqlx::pool::PoolConnection;
use sqlx::{Pool, Sqlite};
use tokio_stream::StreamExt;
use torii_core::simple_broker::SimpleBroker;
use torii_core::types::Entity;
use torii_core::types::EventMessage;

use super::inputs::keys_input::keys_argument;
use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping};
Expand Down Expand Up @@ -75,14 +75,18 @@ impl ResolvableObject for EventMessageObject {
};
// if id is None, then subscribe to all entities
// if id is Some, then subscribe to only the entity with that id
Ok(SimpleBroker::<Entity>::subscribe().filter_map(move |entity: Entity| {
if id.is_none() || id == Some(entity.id.clone()) {
Some(Ok(Value::Object(EventMessageObject::value_mapping(entity))))
} else {
// id != entity.id , then don't send anything, still listening
None
}
}))
Ok(SimpleBroker::<EventMessage>::subscribe().filter_map(
move |entity: EventMessage| {
if id.is_none() || id == Some(entity.id.clone()) {
Some(Ok(Value::Object(EventMessageObject::value_mapping(
entity,
))))
} else {
// id != entity.id , then don't send anything, still listening
None
}
},
))
})
},
)
Expand All @@ -92,7 +96,7 @@ impl ResolvableObject for EventMessageObject {
}

impl EventMessageObject {
pub fn value_mapping(entity: Entity) -> ValueMapping {
pub fn value_mapping(entity: EventMessage) -> ValueMapping {
let keys: Vec<&str> = entity.keys.split('/').filter(|&k| !k.is_empty()).collect();
IndexMap::from([
(Name::new("id"), Value::from(entity.id)),
Expand Down
6 changes: 3 additions & 3 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::{build_sql_query, map_row_to_ty};
use torii_core::simple_broker::SimpleBroker;
use torii_core::types::Entity;
use torii_core::types::EventMessage;
use tracing::{error, trace};

use crate::proto;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
}

impl Service {
Expand All @@ -73,7 +73,7 @@ impl Service {
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()),
simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()),
}
}

Expand Down

0 comments on commit 5f421e1

Please sign in to comment.