diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 7c03d096a7..0d19e196df 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use futures::Stream; use futures_util::StreamExt; @@ -12,6 +13,7 @@ use sqlx::{Pool, Sqlite}; use starknet::core::types::Felt; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; +use tokio::time::interval; use torii_core::cache::ModelCache; use torii_core::error::{Error, ParseError}; use torii_core::model::build_sql_query; @@ -81,10 +83,8 @@ impl EntityManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - pool: Pool, - subs_manager: Arc, - model_cache: Arc, simple_broker: Pin + Send>>, + update_sender: Sender, } impl Service { @@ -93,15 +93,49 @@ impl Service { subs_manager: Arc, model_cache: Arc, ) -> Self { - Self { + let (update_sender, update_receiver) = channel(100); + let service = + Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), update_sender }; + + // Spawn a task to process entity updates + tokio::spawn(Self::process_updates( + Arc::clone(&subs_manager), + Arc::clone(&model_cache), pool, - subs_manager, - model_cache, - simple_broker: Box::pin(SimpleBroker::::subscribe()), + update_receiver, + )); + + service + } + + async fn process_updates( + subs: Arc, + cache: Arc, + pool: Pool, + mut update_receiver: Receiver, + ) { + let mut interval = interval(Duration::from_millis(100)); + let mut pending_updates = Vec::new(); + + loop { + tokio::select! { + _ = interval.tick() => { + if !pending_updates.is_empty() { + for entity in pending_updates.drain(..) { + if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &entity).await { + error!(target = LOG_TARGET, error = %e, "Publishing entity update."); + } + } + } + } + Some(entity) = update_receiver.recv() => { + pending_updates.push(entity); + } + } } } - async fn publish_updates( + pub async fn publish_updates( subs: Arc, cache: Arc, pool: Pool, @@ -259,16 +293,14 @@ impl Service { impl Future for Service { type Output = (); - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let pin = self.get_mut(); while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { - let subs = Arc::clone(&pin.subs_manager); - let cache = Arc::clone(&pin.model_cache); - let pool = pin.pool.clone(); + let sender = pin.update_sender.clone(); tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { - error!(target = LOG_TARGET, error = %e, "Publishing entity update."); + if let Err(e) = sender.send(entity).await { + error!(target = LOG_TARGET, error = %e, "Sending entity update to channel."); } }); } diff --git a/crates/torii/grpc/src/server/subscriptions/event.rs b/crates/torii/grpc/src/server/subscriptions/event.rs index a00ddbbbdd..c7628f10f9 100644 --- a/crates/torii/grpc/src/server/subscriptions/event.rs +++ b/crates/torii/grpc/src/server/subscriptions/event.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use futures::Stream; use futures_util::StreamExt; @@ -11,6 +12,7 @@ use rand::Rng; use starknet::core::types::Felt; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; +use tokio::time::interval; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; @@ -62,13 +64,42 @@ impl EventManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - subs_manager: Arc, simple_broker: Pin + Send>>, + update_sender: Sender, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + let (update_sender, update_receiver) = channel(100); + let service = + Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), update_sender }; + + // Spawn a task to process event updates + tokio::spawn(Self::process_updates(Arc::clone(&subs_manager), update_receiver)); + + service + } + + async fn process_updates(subs: Arc, mut update_receiver: Receiver) { + let mut interval = interval(Duration::from_millis(100)); + let mut pending_updates = Vec::new(); + + loop { + tokio::select! { + _ = interval.tick() => { + if !pending_updates.is_empty() { + for event in pending_updates.drain(..) { + if let Err(e) = Self::publish_updates(Arc::clone(&subs), &event).await { + error!(target = LOG_TARGET, error = %e, "Publishing events update."); + } + } + } + } + Some(event) = update_receiver.recv() => { + pending_updates.push(event); + } + } + } } async fn publish_updates(subs: Arc, event: &Event) -> Result<(), Error> { @@ -151,10 +182,10 @@ impl Future for Service { let pin = self.get_mut(); while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { - let subs = Arc::clone(&pin.subs_manager); + let sender = pin.update_sender.clone(); tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, &event).await { - error!(target = LOG_TARGET, error = %e, "Publishing events update."); + if let Err(e) = sender.send(event).await { + error!(target = LOG_TARGET, error = %e, "Sending event update to channel."); } }); } diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index fabd5510aa..2ae93add4b 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -4,14 +4,16 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use futures::Stream; use futures_util::StreamExt; use rand::Rng; use sqlx::{Pool, Sqlite}; use starknet::core::types::Felt; -use tokio::sync::mpsc::{channel, Receiver}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; +use tokio::time::interval; use torii_core::cache::ModelCache; use torii_core::error::{Error, ParseError}; use torii_core::model::build_sql_query; @@ -75,10 +77,8 @@ impl EventMessageManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - pool: Pool, - subs_manager: Arc, - model_cache: Arc, simple_broker: Pin + Send>>, + update_sender: Sender, } impl Service { @@ -87,11 +87,47 @@ impl Service { subs_manager: Arc, model_cache: Arc, ) -> Self { - Self { - pool, - subs_manager, - model_cache, + let (update_sender, update_receiver) = channel(100); + let service = Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), + update_sender, + }; + + // Spawn a task to process event message updates + tokio::spawn(Self::process_updates( + Arc::clone(&subs_manager), + Arc::clone(&model_cache), + pool, + update_receiver, + )); + + service + } + + async fn process_updates( + subs: Arc, + cache: Arc, + pool: Pool, + mut update_receiver: Receiver, + ) { + let mut interval = interval(Duration::from_millis(100)); + let mut pending_updates = Vec::new(); + + loop { + tokio::select! { + _ = interval.tick() => { + if !pending_updates.is_empty() { + for event_message in pending_updates.drain(..) { + if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &event_message).await { + error!(target = LOG_TARGET, error = %e, "Publishing event message update."); + } + } + } + } + Some(event_message) = update_receiver.recv() => { + pending_updates.push(event_message); + } + } } } @@ -241,13 +277,11 @@ impl Future for Service { fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { let pin = self.get_mut(); - while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { - let subs = Arc::clone(&pin.subs_manager); - let cache = Arc::clone(&pin.model_cache); - let pool = pin.pool.clone(); + while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) { + let sender = pin.update_sender.clone(); tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { - error!(target = LOG_TARGET, error = %e, "Publishing entity update."); + if let Err(e) = sender.send(event_message).await { + error!(target = LOG_TARGET, error = %e, "Sending event message update to channel."); } }); }