Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii-grpc): channel based approach for entity updates #2453

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::Stream;
use futures_util::StreamExt;
Expand All @@ -12,6 +13,7 @@
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::time::interval;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
Expand Down Expand Up @@ -81,10 +83,8 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EntityManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>,
update_sender: Sender<Entity>,
}

impl Service {
Expand All @@ -93,15 +93,49 @@
subs_manager: Arc<EntityManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
let (update_sender, update_receiver) = channel(100);
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()), update_sender };

// Spawn a task to process entity updates
tokio::spawn(Self::process_updates(
Arc::clone(&subs_manager),
Arc::clone(&model_cache),
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()),
update_receiver,
));

service
}
Comment on lines +96 to +109
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei! Potential unbounded task spawning without backpressure control.

In the new method, the process_updates task is spawned to handle entity updates. However, since it's consuming from a channel with a buffer size of 100 (channel(100)), there's a risk of the update_sender becoming saturated if entities are produced faster than they are processed, which could lead to unbounded memory growth.

Consider implementing backpressure or increasing the channel's capacity to prevent potential bottlenecks. You might also monitor the channel's usage to adjust its size dynamically based on load.


async fn process_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<Entity>,
) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for entity in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
}
}
}
Some(entity) = update_receiver.recv() => {
pending_updates.push(entity);
}
}
}
}
Comment on lines +111 to 136
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei! Inefficient processing in process_updates function.

In the process_updates function, you're processing updates every 100 milliseconds using an interval. If the publish_updates function takes longer than the interval duration, it could cause overlapping executions or delays.

Consider adjusting the design to process updates as they arrive or ensuring that publish_updates completes before the next interval tick. Alternatively, you could use a mechanism like tokio::sync::Notify to trigger processing when new entities are added.

Apply this diff to process updates immediately upon receiving them:

- let mut interval = interval(Duration::from_millis(100));
  loop {
-     tokio::select! {
-         _ = interval.tick() => {
-             if !pending_updates.is_empty() {
-                 // Process pending updates
-             }
-         }
-         Some(entity) = update_receiver.recv() => {
-             pending_updates.push(entity);
-         }
-     }
+     if let Some(entity) = update_receiver.recv().await {
+         if let Err(e) = Self::publish_updates(
+             Arc::clone(&subs),
+             Arc::clone(&cache),
+             pool.clone(),
+             &entity
+         ).await {
+             error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
+         }
+     } else {
+         break; // Channel closed
+     }
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn process_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<Entity>,
) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();
loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for entity in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
}
}
}
Some(entity) = update_receiver.recv() => {
pending_updates.push(entity);
}
}
}
}
async fn process_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<Entity>,
) {
loop {
if let Some(entity) = update_receiver.recv().await {
if let Err(e) = Self::publish_updates(
Arc::clone(&subs),
Arc::clone(&cache),
pool.clone(),
&entity
).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
} else {
break; // Channel closed
}
}
}


async fn publish_updates(
pub async fn publish_updates(

Check warning on line 138 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L138

Added line #L138 was not covered by tests
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
Expand Down Expand Up @@ -259,16 +293,14 @@
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
let sender = pin.update_sender.clone();

Check warning on line 300 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L300

Added line #L300 was not covered by tests
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(entity).await {
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");

Check warning on line 303 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L302-L303

Added lines #L302 - L303 were not covered by tests
Comment on lines +296 to +303
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei! Avoid spawning tasks inside a loop in the poll method.

In the poll method, you're spawning a new task for each entity polled from simple_broker. This could lead to a high number of spawned tasks if entities arrive rapidly, impacting performance.

Consider processing the entities directly without spawning new tasks:

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
-     let sender = pin.update_sender.clone();
-     tokio::spawn(async move {
-         if let Err(e) = sender.send(entity).await {
-             error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
-         }
-     });
+     if let Err(e) = pin.update_sender.try_send(entity) {
+         error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
+     }
}

Alternatively, if asynchronous sending is required, batch the entities and process them in a controlled manner.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
let sender = pin.update_sender.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(entity).await {
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
if let Err(e) = pin.update_sender.try_send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
}
}

}
});
}
Expand Down
41 changes: 36 additions & 5 deletions crates/torii/grpc/src/server/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::time::interval;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
Expand Down Expand Up @@ -62,13 +64,42 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventManager>,
simple_broker: Pin<Box<dyn Stream<Item = Event> + Send>>,
update_sender: Sender<Event>,
}

impl Service {
pub fn new(subs_manager: Arc<EventManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()) }
let (update_sender, update_receiver) = channel(100);
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), update_sender };

// Spawn a task to process event updates
tokio::spawn(Self::process_updates(Arc::clone(&subs_manager), update_receiver));

service
}
Comment on lines +73 to +81
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei!

Consider handling the JoinHandle from tokio::spawn.

In the new function, when spawning the process_updates task, the returned JoinHandle is not stored or managed. If you need to control or monitor the spawned task (such as for graceful shutdown or error handling), consider keeping the JoinHandle.


async fn process_updates(subs: Arc<EventManager>, mut update_receiver: Receiver<Event>) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for event in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing events update.");
}
}
}
}
Some(event) = update_receiver.recv() => {
pending_updates.push(event);
}
}
}
}

async fn publish_updates(subs: Arc<EventManager>, event: &Event) -> Result<(), Error> {
Expand Down Expand Up @@ -151,10 +182,10 @@
let pin = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let sender = pin.update_sender.clone();

Check warning on line 185 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L185

Added line #L185 was not covered by tests
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing events update.");
if let Err(e) = sender.send(event).await {
error!(target = LOG_TARGET, error = %e, "Sending event update to channel.");

Check warning on line 188 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L187-L188

Added lines #L187 - L188 were not covered by tests
}
});
}
Comment on lines +185 to 191
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei!

Potential issue with unbounded task spawning in poll method.

In the poll method, spawning a new task for each event to send it via the channel may lead to excessive task creation if events are frequent. This could impact performance.

Consider refactoring to avoid spawning a task per event. One approach is to use try_send on the update_sender without spawning a task. However, be mindful of handling cases where the channel buffer is full. Alternatively, accumulate events and send them in batches if possible.

Expand Down
62 changes: 48 additions & 14 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::time::interval;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
Expand Down Expand Up @@ -75,10 +77,8 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
update_sender: Sender<EventMessage>,
}

impl Service {
Expand All @@ -87,11 +87,47 @@
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
pool,
subs_manager,
model_cache,
let (update_sender, update_receiver) = channel(100);
let service = Self {
simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()),
update_sender,
};

// Spawn a task to process event message updates
tokio::spawn(Self::process_updates(
Arc::clone(&subs_manager),
Arc::clone(&model_cache),
pool,
update_receiver,
));

Comment on lines +96 to +103
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors when spawning process_updates task

When spawning the process_updates asynchronous task, any errors that occur within the task might go unnoticed. Consider adding error handling or logging within the process_updates function to ensure that any unexpected errors are captured and addressed.

service
}

async fn process_updates(
subs: Arc<EventMessageManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
mut update_receiver: Receiver<EventMessage>,
) {
let mut interval = interval(Duration::from_millis(100));
let mut pending_updates = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {
if !pending_updates.is_empty() {
for event_message in pending_updates.drain(..) {
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &event_message).await {
error!(target = LOG_TARGET, error = %e, "Publishing event message update.");
}
}
Comment on lines +121 to +124
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in publish_updates

In the process_updates loop, when calling publish_updates, errors are logged but not handled further. If an error occurs, it might affect the processing of subsequent updates. Consider implementing retry logic or more comprehensive error handling to ensure reliability.

}
}
Some(event_message) = update_receiver.recv() => {
pending_updates.push(event_message);
}
}
Comment on lines +107 to +130
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential accumulation of pending updates may affect timing

The process_updates function processes pending_updates every 100 milliseconds. If publish_updates takes a significant amount of time, there might be a delay in processing new event messages, leading to a backlog. To improve efficiency, consider processing updates concurrently or optimizing the publishing logic.

}
}

Expand Down Expand Up @@ -241,13 +277,11 @@
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
let sender = pin.update_sender.clone();

Check warning on line 281 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L281

Added line #L281 was not covered by tests
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(event_message).await {
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");

Check warning on line 284 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L283-L284

Added lines #L283 - L284 were not covered by tests
Comment on lines +280 to +284
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid spawning a new task for each event message to enhance performance

Currently, a new asynchronous task is spawned for each event message received in the poll implementation. This could introduce overhead under high throughput. To improve performance, consider sending the event message directly without spawning a new task.

Apply this diff to refactor the code:

 while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
-    let sender = pin.update_sender.clone();
-    tokio::spawn(async move {
-        if let Err(e) = sender.send(event_message).await {
-            error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
-        }
-    });
+    if let Err(e) = pin.update_sender.try_send(event_message) {
+        error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
+    }
 }

This modification replaces the spawned task with a direct try_send, eliminating unnecessary task creation and reducing overhead.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
let sender = pin.update_sender.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
if let Err(e) = sender.send(event_message).await {
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
if let Err(e) = pin.update_sender.try_send(event_message) {
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
}
}

}
});
}
Expand Down
Loading