-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement batch loading of event sourcing aggregates (Flows, Tasks) #1024
base: master
Are you sure you want to change the base?
Conversation
@@ -46,6 +46,18 @@ impl FlowQueryServiceImpl { | |||
agent_config, | |||
} | |||
} | |||
fn flow_state_stream<'a>(&'a self, flow_ids: FlowIDStream<'a>) -> FlowStateStream<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting: missing blank line
@@ -46,6 +46,18 @@ impl FlowQueryServiceImpl { | |||
agent_config, | |||
} | |||
} | |||
fn flow_state_stream<'a>(&'a self, flow_ids: FlowIDStream<'a>) -> FlowStateStream<'a> { | |||
Box::pin(async_stream::try_stream! { | |||
let mut chunks = flow_ids.try_chunks(32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make a named constant. I.e., I don't know why 32. Maybe constant name or a comment will explain.
.get_all_flow_ids_by_dataset(&dataset_id, &filters, pagination) | ||
.try_collect() | ||
.await?; | ||
let relevant_flow_ids = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: A name like this looks as if it was a Vec
. Since this is a stream, I recommend to name it with streaming in mind - here and in other places.
fn get_events_multi( | ||
&self, | ||
queries: Vec<Proj::Query>, | ||
) -> MultiEventStream<Proj::Query, Proj::Event> { | ||
use tokio_stream::StreamExt; | ||
let queries = queries.clone(); | ||
|
||
Box::pin(async_stream::try_stream! { | ||
for query in queries { | ||
let mut stream = self.get_events(&query, GetEventsOpts::default()); | ||
while let Some(event) = stream.next().await { | ||
let (event_id, event) = event?; | ||
yield (query.clone(), event_id, event) | ||
} | ||
} | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this method not using multi-load from event store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the idea was to have custom implementation for postges-based event stores, but use default one for inmem and sqlite (where performance is not critical)
would you like to have multi-load based implementation for non-prod cases as well?
Description
Closes: #issue
Checklist before requesting a review