-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opt(torii-grpc): channel based approach for entity updates #2453
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,6 +4,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use std::str::FromStr; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use std::sync::Arc; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use std::task::{Context, Poll}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use std::time::Duration; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use futures::Stream; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use futures_util::StreamExt; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -12,6 +13,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use starknet::core::types::Felt; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tokio::sync::mpsc::{channel, Receiver, Sender}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tokio::sync::RwLock; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tokio::time::interval; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use torii_core::cache::ModelCache; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use torii_core::error::{Error, ParseError}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use torii_core::model::build_sql_query; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -81,10 +83,8 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#[must_use = "Service does nothing unless polled"] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
#[allow(missing_debug_implementations)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub struct Service { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pool: Pool<Sqlite>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs_manager: Arc<EntityManager>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
model_cache: Arc<ModelCache>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
update_sender: Sender<Entity>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
impl Service { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -93,15 +93,49 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs_manager: Arc<EntityManager>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
model_cache: Arc<ModelCache>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let (update_sender, update_receiver) = channel(100); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let service = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Self { simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()), update_sender }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Spawn a task to process entity updates | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokio::spawn(Self::process_updates( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Arc::clone(&subs_manager), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Arc::clone(&model_cache), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pool, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs_manager, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
model_cache, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
update_receiver, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
service | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn process_updates( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs: Arc<EntityManager>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cache: Arc<ModelCache>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pool: Pool<Sqlite>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
mut update_receiver: Receiver<Entity>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut interval = interval(Duration::from_millis(100)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut pending_updates = Vec::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
loop { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokio::select! { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_ = interval.tick() => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if !pending_updates.is_empty() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for entity in pending_updates.drain(..) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &entity).await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Some(entity) = update_receiver.recv() => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pending_updates.push(entity); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+111
to
136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo sensei! Inefficient processing in In the Consider adjusting the design to process updates as they arrive or ensuring that Apply this diff to process updates immediately upon receiving them: - let mut interval = interval(Duration::from_millis(100));
loop {
- tokio::select! {
- _ = interval.tick() => {
- if !pending_updates.is_empty() {
- // Process pending updates
- }
- }
- Some(entity) = update_receiver.recv() => {
- pending_updates.push(entity);
- }
- }
+ if let Some(entity) = update_receiver.recv().await {
+ if let Err(e) = Self::publish_updates(
+ Arc::clone(&subs),
+ Arc::clone(&cache),
+ pool.clone(),
+ &entity
+ ).await {
+ error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
+ }
+ } else {
+ break; // Channel closed
+ }
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn publish_updates( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub async fn publish_updates( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs: Arc<EntityManager>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cache: Arc<ModelCache>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pool: Pool<Sqlite>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -259,16 +293,14 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
impl Future for Service { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type Output = (); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let pin = self.get_mut(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let subs = Arc::clone(&pin.subs_manager); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let cache = Arc::clone(&pin.model_cache); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let pool = pin.pool.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let sender = pin.update_sender.clone(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokio::spawn(async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = sender.send(entity).await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Sending entity update to channel."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+296
to
+303
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo sensei! Avoid spawning tasks inside a loop in the In the Consider processing the entities directly without spawning new tasks: while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
- let sender = pin.update_sender.clone();
- tokio::spawn(async move {
- if let Err(e) = sender.send(entity).await {
- error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
- }
- });
+ if let Err(e) = pin.update_sender.try_send(entity) {
+ error!(target = LOG_TARGET, error = %e, "Sending entity update to channel.");
+ }
} Alternatively, if asynchronous sending is required, batch the entities and process them in a controlled manner. Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,13 +4,15 @@ | |
use std::str::FromStr; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
use std::time::Duration; | ||
|
||
use futures::Stream; | ||
use futures_util::StreamExt; | ||
use rand::Rng; | ||
use starknet::core::types::Felt; | ||
use tokio::sync::mpsc::{channel, Receiver, Sender}; | ||
use tokio::sync::RwLock; | ||
use tokio::time::interval; | ||
use torii_core::error::{Error, ParseError}; | ||
use torii_core::simple_broker::SimpleBroker; | ||
use torii_core::sql::FELT_DELIMITER; | ||
|
@@ -62,13 +64,42 @@ | |
#[must_use = "Service does nothing unless polled"] | ||
#[allow(missing_debug_implementations)] | ||
pub struct Service { | ||
subs_manager: Arc<EventManager>, | ||
simple_broker: Pin<Box<dyn Stream<Item = Event> + Send>>, | ||
update_sender: Sender<Event>, | ||
} | ||
|
||
impl Service { | ||
pub fn new(subs_manager: Arc<EventManager>) -> Self { | ||
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()) } | ||
let (update_sender, update_receiver) = channel(100); | ||
let service = | ||
Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), update_sender }; | ||
|
||
// Spawn a task to process event updates | ||
tokio::spawn(Self::process_updates(Arc::clone(&subs_manager), update_receiver)); | ||
|
||
service | ||
} | ||
Comment on lines
+73
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo sensei! Consider handling the In the |
||
|
||
async fn process_updates(subs: Arc<EventManager>, mut update_receiver: Receiver<Event>) { | ||
let mut interval = interval(Duration::from_millis(100)); | ||
let mut pending_updates = Vec::new(); | ||
|
||
loop { | ||
tokio::select! { | ||
_ = interval.tick() => { | ||
if !pending_updates.is_empty() { | ||
for event in pending_updates.drain(..) { | ||
if let Err(e) = Self::publish_updates(Arc::clone(&subs), &event).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing events update."); | ||
} | ||
} | ||
} | ||
} | ||
Some(event) = update_receiver.recv() => { | ||
pending_updates.push(event); | ||
} | ||
} | ||
} | ||
} | ||
|
||
async fn publish_updates(subs: Arc<EventManager>, event: &Event) -> Result<(), Error> { | ||
|
@@ -151,10 +182,10 @@ | |
let pin = self.get_mut(); | ||
|
||
while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let subs = Arc::clone(&pin.subs_manager); | ||
let sender = pin.update_sender.clone(); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, &event).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing events update."); | ||
if let Err(e) = sender.send(event).await { | ||
error!(target = LOG_TARGET, error = %e, "Sending event update to channel."); | ||
} | ||
}); | ||
} | ||
Comment on lines
+185
to
191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo sensei! Potential issue with unbounded task spawning in In the Consider refactoring to avoid spawning a task per event. One approach is to use |
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,14 +4,16 @@ | |||||||||||||||||||||||||
use std::str::FromStr; | ||||||||||||||||||||||||||
use std::sync::Arc; | ||||||||||||||||||||||||||
use std::task::{Context, Poll}; | ||||||||||||||||||||||||||
use std::time::Duration; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
use futures::Stream; | ||||||||||||||||||||||||||
use futures_util::StreamExt; | ||||||||||||||||||||||||||
use rand::Rng; | ||||||||||||||||||||||||||
use sqlx::{Pool, Sqlite}; | ||||||||||||||||||||||||||
use starknet::core::types::Felt; | ||||||||||||||||||||||||||
use tokio::sync::mpsc::{channel, Receiver}; | ||||||||||||||||||||||||||
use tokio::sync::mpsc::{channel, Receiver, Sender}; | ||||||||||||||||||||||||||
use tokio::sync::RwLock; | ||||||||||||||||||||||||||
use tokio::time::interval; | ||||||||||||||||||||||||||
use torii_core::cache::ModelCache; | ||||||||||||||||||||||||||
use torii_core::error::{Error, ParseError}; | ||||||||||||||||||||||||||
use torii_core::model::build_sql_query; | ||||||||||||||||||||||||||
|
@@ -75,10 +77,8 @@ | |||||||||||||||||||||||||
#[must_use = "Service does nothing unless polled"] | ||||||||||||||||||||||||||
#[allow(missing_debug_implementations)] | ||||||||||||||||||||||||||
pub struct Service { | ||||||||||||||||||||||||||
pool: Pool<Sqlite>, | ||||||||||||||||||||||||||
subs_manager: Arc<EventMessageManager>, | ||||||||||||||||||||||||||
model_cache: Arc<ModelCache>, | ||||||||||||||||||||||||||
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>, | ||||||||||||||||||||||||||
update_sender: Sender<EventMessage>, | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
impl Service { | ||||||||||||||||||||||||||
|
@@ -87,11 +87,47 @@ | |||||||||||||||||||||||||
subs_manager: Arc<EventMessageManager>, | ||||||||||||||||||||||||||
model_cache: Arc<ModelCache>, | ||||||||||||||||||||||||||
) -> Self { | ||||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||||
pool, | ||||||||||||||||||||||||||
subs_manager, | ||||||||||||||||||||||||||
model_cache, | ||||||||||||||||||||||||||
let (update_sender, update_receiver) = channel(100); | ||||||||||||||||||||||||||
let service = Self { | ||||||||||||||||||||||||||
simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()), | ||||||||||||||||||||||||||
update_sender, | ||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Spawn a task to process event message updates | ||||||||||||||||||||||||||
tokio::spawn(Self::process_updates( | ||||||||||||||||||||||||||
Arc::clone(&subs_manager), | ||||||||||||||||||||||||||
Arc::clone(&model_cache), | ||||||||||||||||||||||||||
pool, | ||||||||||||||||||||||||||
update_receiver, | ||||||||||||||||||||||||||
)); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
Comment on lines
+96
to
+103
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle potential errors when spawning When spawning the |
||||||||||||||||||||||||||
service | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
async fn process_updates( | ||||||||||||||||||||||||||
subs: Arc<EventMessageManager>, | ||||||||||||||||||||||||||
cache: Arc<ModelCache>, | ||||||||||||||||||||||||||
pool: Pool<Sqlite>, | ||||||||||||||||||||||||||
mut update_receiver: Receiver<EventMessage>, | ||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||
let mut interval = interval(Duration::from_millis(100)); | ||||||||||||||||||||||||||
let mut pending_updates = Vec::new(); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
loop { | ||||||||||||||||||||||||||
tokio::select! { | ||||||||||||||||||||||||||
_ = interval.tick() => { | ||||||||||||||||||||||||||
if !pending_updates.is_empty() { | ||||||||||||||||||||||||||
for event_message in pending_updates.drain(..) { | ||||||||||||||||||||||||||
if let Err(e) = Self::publish_updates(Arc::clone(&subs), Arc::clone(&cache), pool.clone(), &event_message).await { | ||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Publishing event message update."); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
Comment on lines
+121
to
+124
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve error handling in In the |
||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
Some(event_message) = update_receiver.recv() => { | ||||||||||||||||||||||||||
pending_updates.push(event_message); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
Comment on lines
+107
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential accumulation of pending updates may affect timing The |
||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
@@ -241,13 +277,11 @@ | |||||||||||||||||||||||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { | ||||||||||||||||||||||||||
let pin = self.get_mut(); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | ||||||||||||||||||||||||||
let subs = Arc::clone(&pin.subs_manager); | ||||||||||||||||||||||||||
let cache = Arc::clone(&pin.model_cache); | ||||||||||||||||||||||||||
let pool = pin.pool.clone(); | ||||||||||||||||||||||||||
while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) { | ||||||||||||||||||||||||||
let sender = pin.update_sender.clone(); | ||||||||||||||||||||||||||
tokio::spawn(async move { | ||||||||||||||||||||||||||
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { | ||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||||||||||||||||||||||||||
if let Err(e) = sender.send(event_message).await { | ||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Sending event message update to channel."); | ||||||||||||||||||||||||||
Comment on lines
+280
to
+284
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid spawning a new task for each event message to enhance performance Currently, a new asynchronous task is spawned for each event message received in the Apply this diff to refactor the code: while let Poll::Ready(Some(event_message)) = pin.simple_broker.poll_next_unpin(cx) {
- let sender = pin.update_sender.clone();
- tokio::spawn(async move {
- if let Err(e) = sender.send(event_message).await {
- error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
- }
- });
+ if let Err(e) = pin.update_sender.try_send(event_message) {
+ error!(target = LOG_TARGET, error = %e, "Sending event message update to channel.");
+ }
} This modification replaces the spawned task with a direct Committable suggestion
Suggested change
|
||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! Potential unbounded task spawning without backpressure control.
In the
new
method, theprocess_updates
task is spawned to handle entity updates. However, since it's consuming from a channel with a buffer size of 100 (channel(100)
), there's a risk of theupdate_sender
becoming saturated if entities are produced faster than they are processed, which could lead to unbounded memory growth.Consider implementing backpressure or increasing the channel's capacity to prevent potential bottlenecks. You might also monitor the channel's usage to adjust its size dynamically based on load.