-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opt(torii-grpc): channel based approach for entity updates #2453
Conversation
WalkthroughOhayo, sensei! The changes in this pull request enhance the Changes
Possibly related PRs
TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (3)
- crates/torii/grpc/src/server/subscriptions/entity.rs (5 hunks)
- crates/torii/grpc/src/server/subscriptions/event.rs (3 hunks)
- crates/torii/grpc/src/server/subscriptions/event_message.rs (4 hunks)
Additional comments not posted (4)
crates/torii/grpc/src/server/subscriptions/event.rs (2)
68-68
: Ohayo sensei!Addition of
update_sender
field is appropriate.The inclusion of
update_sender
in theService
struct correctly facilitates event transmission through the channel.
83-102
: Ohayo sensei!Asynchronous event processing is well-implemented.
The
process_updates
function efficiently batches and processes events at regular intervals, enhancing performance.crates/torii/grpc/src/server/subscriptions/event_message.rs (1)
90-94
: Ohayo, sensei! Ensure proper initialization ofService
fieldsI noticed that in the
new
method, the fieldspool
,subs_manager
, andmodel_cache
are no longer stored within theService
struct after introducingupdate_sender
. Please verify that removing these fields doesn't affect other parts of theService
implementation that might rely on them.crates/torii/grpc/src/server/subscriptions/entity.rs (1)
Line range hint
138-172
: Ohayo sensei! Exposing internal methodpublish_updates
as public API.The
publish_updates
function has been changed from private to public. Making it public exposes internal mechanics and could lead to misuse from other modules.Please verify if
publish_updates
needs to be public. If it's only used internally, consider keeping it private to maintain encapsulation.- pub async fn publish_updates( + async fn publish_updates(
let (update_sender, update_receiver) = channel(100); | ||
let service = | ||
Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), update_sender }; | ||
|
||
// Spawn a task to process event updates | ||
tokio::spawn(Self::process_updates(Arc::clone(&subs_manager), update_receiver)); | ||
|
||
service | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei!
Consider handling the JoinHandle
from tokio::spawn
.
In the new
function, when spawning the process_updates
task, the returned JoinHandle
is not stored or managed. If you need to control or monitor the spawned task (such as for graceful shutdown or error handling), consider keeping the JoinHandle
.
let sender = pin.update_sender.clone(); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, &event).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing events update."); | ||
if let Err(e) = sender.send(event).await { | ||
error!(target = LOG_TARGET, error = %e, "Sending event update to channel."); | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei!
Potential issue with unbounded task spawning in poll
method.
In the poll
method, spawning a new task for each event to send it via the channel may lead to excessive task creation if events are frequent. This could impact performance.
Consider refactoring to avoid spawning a task per event. One approach is to use try_send
on the update_sender
without spawning a task. However, be mindful of handling cases where the channel buffer is full. Alternatively, accumulate events and send them in batches if possible.
// Spawn a task to process event message updates | ||
tokio::spawn(Self::process_updates( | ||
Arc::clone(&subs_manager), | ||
Arc::clone(&model_cache), | ||
pool, | ||
update_receiver, | ||
)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors when spawning process_updates
task
When spawning the process_updates
asynchronous task, any errors that occur within the task might go unnoticed. Consider adding error handling or logging within the process_updates
function to ensure that any unexpected errors are captured and addressed.
async fn process_updates( | ||
subs: Arc<EventMessageManager>, | ||
cache: Arc<ModelCache>, | ||
pool: Pool<Sqlite>, | ||
mut update_receiver: Receiver<EventMessage>, | ||
) { | ||
let mut interval = interval(Duration::from_millis(100)); | ||
let mut pending_updates = Vec::new(); | ||
|
||
loop { | ||
tokio::select! { | ||
_ = interval.tick() => { | ||
if !pending_updates.is_empty() { | ||
for event_message in pending_updates.drain(..) { | ||
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &event_message).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing event message update."); | ||
} | ||
} | ||
} | ||
} | ||
Some(event_message) = update_receiver.recv() => { | ||
pending_updates.push(event_message); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential accumulation of pending updates may affect timing
The process_updates
function processes pending_updates
every 100 milliseconds. If publish_updates
takes a significant amount of time, there might be a delay in processing new event messages, leading to a backlog. To improve efficiency, consider processing updates concurrently or optimizing the publishing logic.
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &event_message).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing event message update."); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling in publish_updates
In the process_updates
loop, when calling publish_updates
, errors are logged but not handled further. If an error occurs, it might affect the processing of subsequent updates. Consider implementing retry logic or more comprehensive error handling to ensure reliability.
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let sender = pin.update_sender.clone(); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||
if let Err(e) = sender.send(event_message).await { | ||
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid spawning a new task for each event message to enhance performance
Currently, a new asynchronous task is spawned for each event message received in the poll
implementation. This could introduce overhead under high throughput. To improve performance, consider sending the event message directly without spawning a new task.
Apply this diff to refactor the code:
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
- let sender = pin.update_sender.clone();
- tokio::spawn(async move {
- if let Err(e) = sender.send(event_message).await {
- error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
- }
- });
+ if let Err(e) = pin.update_sender.try_send(event_message) {
+ error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
+ }
}
This modification replaces the spawned task with a direct try_send
, eliminating unnecessary task creation and reducing overhead.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) { | |
let sender = pin.update_sender.clone(); | |
tokio::spawn(async move { | |
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { | |
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | |
if let Err(e) = sender.send(event_message).await { | |
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel."); | |
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) { | |
if let Err(e) = pin.update_sender.try_send(event_message) { | |
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel."); | |
} | |
} |
let (update_sender, update_receiver) = channel(100); | ||
let service = | ||
Self { simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()), update_sender }; | ||
|
||
// Spawn a task to process entity updates | ||
tokio::spawn(Self::process_updates( | ||
Arc::clone(&subs_manager), | ||
Arc::clone(&model_cache), | ||
pool, | ||
subs_manager, | ||
model_cache, | ||
simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()), | ||
update_receiver, | ||
)); | ||
|
||
service | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! Potential unbounded task spawning without backpressure control.
In the new
method, the process_updates
task is spawned to handle entity updates. However, since it's consuming from a channel with a buffer size of 100 (channel(100)
), there's a risk of the update_sender
becoming saturated if entities are produced faster than they are processed, which could lead to unbounded memory growth.
Consider implementing backpressure or increasing the channel's capacity to prevent potential bottlenecks. You might also monitor the channel's usage to adjust its size dynamically based on load.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let pin = self.get_mut(); | ||
|
||
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let subs = Arc::clone(&pin.subs_manager); | ||
let cache = Arc::clone(&pin.model_cache); | ||
let pool = pin.pool.clone(); | ||
let sender = pin.update_sender.clone(); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||
if let Err(e) = sender.send(entity).await { | ||
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! Avoid spawning tasks inside a loop in the poll
method.
In the poll
method, you're spawning a new task for each entity polled from simple_broker
. This could lead to a high number of spawned tasks if entities arrive rapidly, impacting performance.
Consider processing the entities directly without spawning new tasks:
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
- let sender = pin.update_sender.clone();
- tokio::spawn(async move {
- if let Err(e) = sender.send(entity).await {
- error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
- }
- });
+ if let Err(e) = pin.update_sender.try_send(entity) {
+ error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
+ }
}
Alternatively, if asynchronous sending is required, batch the entities and process them in a controlled manner.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
let pin = self.get_mut(); | |
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | |
let subs = Arc::clone(&pin.subs_manager); | |
let cache = Arc::clone(&pin.model_cache); | |
let pool = pin.pool.clone(); | |
let sender = pin.update_sender.clone(); | |
tokio::spawn(async move { | |
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { | |
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | |
if let Err(e) = sender.send(entity).await { | |
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel."); | |
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
let pin = self.get_mut(); | |
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | |
if let Err(e) = pin.update_sender.try_send(entity) { | |
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel."); | |
} | |
} |
async fn process_updates( | ||
subs: Arc<EntityManager>, | ||
cache: Arc<ModelCache>, | ||
pool: Pool<Sqlite>, | ||
mut update_receiver: Receiver<Entity>, | ||
) { | ||
let mut interval = interval(Duration::from_millis(100)); | ||
let mut pending_updates = Vec::new(); | ||
|
||
loop { | ||
tokio::select! { | ||
_ = interval.tick() => { | ||
if !pending_updates.is_empty() { | ||
for entity in pending_updates.drain(..) { | ||
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &entity).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||
} | ||
} | ||
} | ||
} | ||
Some(entity) = update_receiver.recv() => { | ||
pending_updates.push(entity); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! Inefficient processing in process_updates
function.
In the process_updates
function, you're processing updates every 100 milliseconds using an interval. If the publish_updates
function takes longer than the interval duration, it could cause overlapping executions or delays.
Consider adjusting the design to process updates as they arrive or ensuring that publish_updates
completes before the next interval tick. Alternatively, you could use a mechanism like tokio::sync::Notify
to trigger processing when new entities are added.
Apply this diff to process updates immediately upon receiving them:
- let mut interval = interval(Duration::from_millis(100));
loop {
- tokio::select! {
- _ = interval.tick() => {
- if !pending_updates.is_empty() {
- // Process pending updates
- }
- }
- Some(entity) = update_receiver.recv() => {
- pending_updates.push(entity);
- }
- }
+ if let Some(entity) = update_receiver.recv().await {
+ if let Err(e) = Self::publish_updates(
+ Arc::clone(&subs),
+ Arc::clone(&cache),
+ pool.clone(),
+ &entity
+ ).await {
+ error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
+ }
+ } else {
+ break; // Channel closed
+ }
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn process_updates( | |
subs: Arc<EntityManager>, | |
cache: Arc<ModelCache>, | |
pool: Pool<Sqlite>, | |
mut update_receiver: Receiver<Entity>, | |
) { | |
let mut interval = interval(Duration::from_millis(100)); | |
let mut pending_updates = Vec::new(); | |
loop { | |
tokio::select! { | |
_ = interval.tick() => { | |
if !pending_updates.is_empty() { | |
for entity in pending_updates.drain(..) { | |
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &entity).await { | |
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | |
} | |
} | |
} | |
} | |
Some(entity) = update_receiver.recv() => { | |
pending_updates.push(entity); | |
} | |
} | |
} | |
} | |
async fn process_updates( | |
subs: Arc<EntityManager>, | |
cache: Arc<ModelCache>, | |
pool: Pool<Sqlite>, | |
mut update_receiver: Receiver<Entity>, | |
) { | |
loop { | |
if let Some(entity) = update_receiver.recv().await { | |
if let Err(e) = Self::publish_updates( | |
Arc::clone(&subs), | |
Arc::clone(&cache), | |
pool.clone(), | |
&entity | |
).await { | |
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | |
} | |
} else { | |
break; // Channel closed | |
} | |
} | |
} |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2453 +/- ##
==========================================
+ Coverage 68.37% 68.42% +0.04%
==========================================
Files 365 365
Lines 47973 48023 +50
==========================================
+ Hits 32801 32859 +58
+ Misses 15172 15164 -8 ☔ View full report in Codecov by Sentry. |
Closing this in favor of #2455 |
Summary by CodeRabbit
New Features
Bug Fixes