-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: subscription updates ordered #2507
Conversation
WalkthroughOhayo, sensei! This pull request introduces modifications across several files related to the Changes
Possibly related PRs
Suggested reviewers
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Outside diff range and nitpick comments (4)
crates/torii/grpc/src/server/subscriptions/indexer.rs (3)
12-12
: Ohayo sensei! Be cautious with unbounded channelsUsing unbounded channels can lead to unbounded memory growth if the producer outpaces the consumer. It's important to ensure that the system can handle this or implement backpressure mechanisms if necessary.
103-110
: Sensei, consider enhancing error handling inpublish_updates
While logging the error is helpful, consider whether additional error handling or recovery mechanisms are needed if
process_update
fails repeatedly. This could improve the robustness of your update processing.
149-155
: Sensei, handleupdate_sender
send errors more robustlyWhen
this.update_sender.send(update)
returns an error, it indicates that the receiver has been dropped. Consider implementing additional actions in this case, such as restarting thepublish_updates
task or cleaning up resources to ensure the system remains stable.crates/torii/grpc/src/server/subscriptions/entity.rs (1)
84-92
: Ohayo, sensei! Spawningpublish_updates
withinnew
method may lead to unexpected behavior.While spawning
publish_updates
here initializes the background task, starting asynchronous tasks within constructors can sometimes cause issues with error handling and lifecycle management. It's often better to have an explicit method to start background tasks after construction.Consider refactoring the initialization to start the task outside of the
new
method for clearer control over the service's lifecycle.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
- crates/torii/grpc/src/server/subscriptions/entity.rs (3 hunks)
- crates/torii/grpc/src/server/subscriptions/event.rs (3 hunks)
- crates/torii/grpc/src/server/subscriptions/event_message.rs (3 hunks)
- crates/torii/grpc/src/server/subscriptions/indexer.rs (3 hunks)
🧰 Additional context used
🔇 Additional comments (5)
crates/torii/grpc/src/server/subscriptions/indexer.rs (2)
85-85
: Addition ofupdate_sender
looks good, senseiAdding the
update_sender
to theService
struct appropriately facilitates communication via the unbounded channel.
90-98
: Ohayo sensei! Initialization and task spawning are well doneThe unbounded channel is correctly initialized, and the
publish_updates
task is appropriately spawned. This setup enhances asynchronous handling of updates.crates/torii/grpc/src/server/subscriptions/event_message.rs (2)
75-75
: Ohayo, sensei! Addition ofevent_sender
improves communicationAdding the
event_sender
field to theService
struct allows for efficient communication between the event producer and the event processing task, enhancing the overall design.
80-88
: Ohayo, sensei! Refactorednew
method enhances asynchronous handlingThe introduction of the unbounded channel and the spawning of the
publish_updates
task in thenew
method improve the asynchronous processing of event updates, resulting in better separation of concerns and scalability.crates/torii/grpc/src/server/subscriptions/entity.rs (1)
12-12
: Ohayo, sensei! New imports added for unbounded channels.The inclusion of
unbounded_channel
,UnboundedReceiver
, andUnboundedSender
is appropriate for implementing unbounded channels.
let (event_sender, event_receiver) = unbounded_channel(); | ||
let service = Self { | ||
simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), | ||
event_sender, | ||
}; | ||
|
||
tokio::spawn(Self::publish_updates(subs_manager, event_receiver)); | ||
|
||
service | ||
} | ||
|
||
async fn publish_updates( | ||
subs: Arc<EventManager>, | ||
mut event_receiver: UnboundedReceiver<Event>, | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider potential issues with unbounded_channel
leading to unbounded memory usage.
Ohayo, sensei! The use of unbounded_channel
may result in unbounded memory growth if the event_sender
sends events faster than publish_updates
can process them. This could lead to memory exhaustion over time. Consider using a bounded channel with an appropriate capacity or implementing backpressure mechanisms to prevent potential memory issues.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let this = self.get_mut(); | ||
|
||
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let subs = Arc::clone(&pin.subs_manager); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, &entity).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||
} | ||
}); | ||
while let Poll::Ready(Some(event)) = this.simple_broker.poll_next_unpin(cx) { | ||
if let Err(e) = this.event_sender.send(event) { | ||
error!(target = LOG_TARGET, error = %e, "Sending event update to processor."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Be cautious with unbounded channels to avoid memory issues
The use of UnboundedSender
and UnboundedReceiver
may lead to unbounded memory growth if events are produced faster than they are consumed. Consider using a bounded channel (tokio::sync::mpsc::channel
) with an appropriate capacity to implement backpressure and prevent potential memory exhaustion.
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>, | ||
entity_sender: UnboundedSender<OptimisticEntity>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Potential memory growth due to unbounded channels in Service
struct.
Using an unbounded channel (UnboundedSender<OptimisticEntity>
) means that if the producer sends messages faster than the consumer can process them, it could lead to increased memory usage. Consider using a bounded channel to prevent potential memory exhaustion.
Would you like assistance in refactoring this to use a bounded channel with appropriate capacity?
mut entity_receiver: UnboundedReceiver<OptimisticEntity>, | ||
) { | ||
while let Some(entity) = entity_receiver.recv().await { | ||
if let Err(e) = Self::process_entity_update(&subs, &entity).await { | ||
error!(target = LOG_TARGET, error = %e, "Processing entity update."); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo, sensei! Error handling in publish_updates
may need improvement.
Currently, errors from process_entity_update
are logged but not acted upon. If process_entity_update
fails repeatedly, it might indicate a critical issue. You might want to implement a retry mechanism or take corrective action when such errors occur.
Would you like assistance in implementing enhanced error handling or a retry strategy?
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let this = self.get_mut(); | ||
|
||
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let subs = Arc::clone(&pin.subs_manager); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, &entity).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||
} | ||
}); | ||
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) { | ||
if let Err(e) = this.entity_sender.send(entity) { | ||
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo, sensei! Potential issue if entity_sender
is closed in poll
method.
If entity_sender.send(entity)
fails, it could mean that the receiver has been dropped, possibly causing this loop to continue indefinitely without effective processing. Consider handling this case by breaking out of the loop or shutting down the service gracefully.
Here's a suggested change to handle the closed sender:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
- if let Err(e) = this.entity_sender.send(entity) {
+ if this.entity_sender.send(entity).is_err() {
error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
+ return Poll::Ready(());
}
}
Poll::Pending
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
let this = self.get_mut(); | |
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | |
let subs = Arc::clone(&pin.subs_manager); | |
tokio::spawn(async move { | |
if let Err(e) = Service::publish_updates(subs, &entity).await { | |
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | |
} | |
}); | |
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) { | |
if let Err(e) = this.entity_sender.send(entity) { | |
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor."); | |
} | |
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
let this = self.get_mut(); | |
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) { | |
if this.entity_sender.send(entity).is_err() { | |
error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service."); | |
return Poll::Ready(()); | |
} | |
} | |
Poll::Pending | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (3)
crates/torii/grpc/src/server/subscriptions/indexer.rs (1)
92-100
: Ohayo, sensei! Great refactoring of thenew
method!The changes here are really solid:
- Returning
Self
is more idiomatic for a constructor-like method.- Creating an unbounded channel for updates is a smart move for handling high-throughput scenarios.
- Spawning an async task for update processing allows for better concurrency.
One small suggestion to consider:
Consider adding a comment explaining the purpose of the spawned task. It might help future maintainers understand the design choice quickly. For example:
// Spawn a task to process updates asynchronously, improving responsiveness tokio::spawn(Self::publish_updates(subs_manager, update_receiver));Overall, excellent work on improving the service initialization!
crates/torii/grpc/src/server/subscriptions/event.rs (2)
73-80
: LGTM! Improved event handling with concurrent processing.Ohayo, sensei! The changes in
Service::new
are looking sharp. Creating an unbounded channel and spawning a new task forpublish_updates
allows for concurrent event processing. Nice touch!A small suggestion: Consider using
Self
instead ofservice
when returning:Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), event_sender }This maintains consistency with Rust's idiomatic style.
82-90
: LGTM! Effective separation of event reception and processing.Ohayo, sensei! The new
publish_updates
method is a great addition. It nicely separates event reception from processing and includes error handling. Excellent work!A small suggestion to improve error handling:
if let Err(e) = Self::process_event(&subs, &event).await { error!(target = LOG_TARGET, error = %e, event = ?event, "Failed to process event update."); }This change adds the event details to the error log, which could be helpful for debugging.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
- crates/torii/grpc/src/server/subscriptions/entity.rs (3 hunks)
- crates/torii/grpc/src/server/subscriptions/event.rs (3 hunks)
- crates/torii/grpc/src/server/subscriptions/event_message.rs (3 hunks)
- crates/torii/grpc/src/server/subscriptions/indexer.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- crates/torii/grpc/src/server/subscriptions/entity.rs
- crates/torii/grpc/src/server/subscriptions/event_message.rs
🧰 Additional context used
🔇 Additional comments (5)
crates/torii/grpc/src/server/subscriptions/indexer.rs (2)
12-14
: Ohayo, sensei! LGTM on the import additions!The new imports for
UnboundedReceiver
andUnboundedSender
are spot-on for the unbounded channel implementation. Nice work on keeping the imports tidy and relevant!
87-87
: Excellent addition, sensei!The new
update_sender
field is a great improvement. It allows for asynchronous and non-blocking update handling, which should enhance the overall performance of the service. Well done on implementing this unbounded channel approach!crates/torii/grpc/src/server/subscriptions/event.rs (3)
12-14
: LGTM! New imports for unbounded channels.Ohayo, sensei! The new imports from
tokio::sync::mpsc
are spot on for implementing the unbounded channel. They're essential for the refactoring we're about to see. Keep up the good work!
68-68
: Consider potential memory issues with unbounded channels.Ohayo again, sensei! The addition of
event_sender: UnboundedSender<Event>
is a smart move for improving event handling. However, I must reiterate a concern: unbounded channels could lead to unbounded memory growth if events are sent faster than they're processed. Consider implementing backpressure mechanisms or using a bounded channel with an appropriate capacity to prevent potential memory exhaustion.Let's check if there are any safeguards in place:
#!/bin/bash # Description: Check for potential safeguards against unbounded memory growth # Test: Search for any backpressure mechanisms or bounded channel usage rg --type rust 'bounded_channel|Semaphore|RateLimiter' crates/torii/grpc/src/
173-175
: LGTM! Enhanced separation of concerns in event handling.Ohayo once more, sensei! The changes in the
Future
implementation forService
are spot on. Sending events to theevent_sender
instead of directly invokingpublish_updates
nicely decouples event reception from processing. The error handling for thesend
operation is also a great touch. Keep up the excellent work!
async fn publish_updates( | ||
subs: Arc<IndexerManager>, | ||
mut update_receiver: UnboundedReceiver<ContractUpdated>, | ||
) { | ||
while let Some(update) = update_receiver.recv().await { | ||
if let Err(e) = Self::process_update(&subs, &update).await { | ||
error!(target = LOG_TARGET, error = %e, "Processing indexer update."); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Dojo-level implementation, sensei!
The publish_updates
method is a great addition:
- It effectively separates the concern of update processing.
- The use of
while let
for consuming from the channel is idiomatic and efficient. - Error handling is in place, which is crucial for long-running tasks.
A small suggestion to enhance error handling:
Consider adding more context to the error log. This could help with debugging in the future. For example:
error!(
target = LOG_TARGET,
error = %e,
contract_address = %update.contract_address,
"Failed to process indexer update"
);
This addition provides more information about which update caused the error.
Overall, excellent work on implementing this asynchronous update processing!
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let this = self.get_mut(); | ||
|
||
while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let subs = Arc::clone(&pin.subs_manager); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, &event).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing indexer update."); | ||
} | ||
}); | ||
while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) { | ||
if let Err(e) = this.update_sender.send(update) { | ||
error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo, sensei! Excellent refinement of the poll
method!
The changes here are well thought out:
- Simplifying
std::pin::Pin<&mut Self>
toPin<&mut Self>
improves readability. - Sending updates through the
update_sender
aligns perfectly with the new asynchronous processing design. - Error handling for the send operation is a good practice.
A small suggestion to improve error handling:
Consider breaking the loop when a send error occurs. This could prevent unnecessary CPU usage if the receiver has been dropped. For example:
while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.update_sender.send(update) {
error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor failed. Stopping poll.");
return Poll::Ready(());
}
}
This change ensures that the service stops polling if it can't send updates anymore.
Overall, great job on refining this method to work with the new asynchronous architecture!
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2507 +/- ##
==========================================
+ Coverage 68.82% 68.84% +0.01%
==========================================
Files 377 377
Lines 48930 48978 +48
==========================================
+ Hits 33677 33717 +40
- Misses 15253 15261 +8 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
crates/torii/grpc/src/server/subscriptions/entity.rs (2)
88-96
: LGTM! Consider adding error handling for the spawned task.Ohayo, sensei! The changes in the
new
method look good. The initialization ofentity_sender
andentity_receiver
, along with spawning a new task forpublish_updates
, aligns well with the refactoring objectives.Consider adding error handling for the spawned task to gracefully handle any panics or errors:
tokio::spawn(async move { if let Err(e) = Self::publish_updates(subs_manager, entity_receiver).await { error!(target = LOG_TARGET, error = %e, "Entity update processing task failed"); } });
238-244
: Improve error handling for closedentity_sender
.Ohayo, sensei! The changes in the
poll
method look good overall. However, the error handling forentity_sender.send()
could be improved. If the send operation fails, it likely means that the receiver has been dropped, which could cause this loop to continue indefinitely without effective processing.Consider handling this case by breaking out of the loop or shutting down the service gracefully. Here's a suggested change:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = self.get_mut(); while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.entity_sender.send(entity) { - error!(target = LOG_TARGET, error = %e, "Sending entity update to processor."); + if this.entity_sender.send(entity).is_err() { + error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service."); + return Poll::Ready(()); } } Poll::Pending }This change will cause the service to shut down if the
entity_sender
is closed, preventing potential infinite loops.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
- crates/torii/grpc/src/server/subscriptions/entity.rs (3 hunks)
🧰 Additional context used
🔇 Additional comments (3)
crates/torii/grpc/src/server/subscriptions/entity.rs (3)
12-14
: LGTM! New imports for unbounded channels.Ohayo, sensei! The new imports from
tokio::sync::mpsc
are correctly added to support the introduction of unbounded channels in the code. This change aligns well with the refactoring objectives.
83-83
: Consider using a bounded channel forentity_sender
.Ohayo, sensei! The use of an unbounded channel (
UnboundedSender<OptimisticEntity>
) in theService
struct could potentially lead to memory growth issues if the producer sends messages faster than the consumer can process them. To mitigate this risk, consider using a bounded channel with an appropriate capacity.Would you like assistance in refactoring this to use a bounded channel with a suitable capacity?
101-109
: Improve error handling inpublish_updates
.Ohayo, sensei! While the changes to use
UnboundedReceiver
look good, the error handling inpublish_updates
could be improved. Currently, errors fromprocess_entity_update
are logged but not acted upon. Ifprocess_entity_update
fails repeatedly, it might indicate a critical issue.Consider implementing a retry mechanism or taking corrective action when such errors occur. Would you like assistance in implementing enhanced error handling or a retry strategy?
Summary by CodeRabbit
New Features
Bug Fixes
Documentation