Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batch loading of event sourcing aggregates (Flows, Tasks) #1024

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

demusdev
Copy link
Contributor

@demusdev demusdev commented Jan 4, 2025

Description

Closes: #issue

Checklist before requesting a review

@demusdev demusdev changed the title mplement batch loading of event sourcing aggregates (Flows, Tasks) Implement batch loading of event sourcing aggregates (Flows, Tasks) Jan 4, 2025
@demusdev demusdev self-assigned this Jan 7, 2025
@@ -46,6 +46,18 @@ impl FlowQueryServiceImpl {
agent_config,
}
}
fn flow_state_stream<'a>(&'a self, flow_ids: FlowIDStream<'a>) -> FlowStateStream<'a> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting: missing blank line

@@ -46,6 +46,18 @@ impl FlowQueryServiceImpl {
agent_config,
}
}
fn flow_state_stream<'a>(&'a self, flow_ids: FlowIDStream<'a>) -> FlowStateStream<'a> {
Box::pin(async_stream::try_stream! {
let mut chunks = flow_ids.try_chunks(32);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make a named constant. I.e., I don't know why 32. Maybe constant name or a comment will explain.

.get_all_flow_ids_by_dataset(&dataset_id, &filters, pagination)
.try_collect()
.await?;
let relevant_flow_ids = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: A name like this looks as if it was a Vec. Since this is a stream, I recommend to name it with streaming in mind - here and in other places.

Comment on lines +25 to +41
fn get_events_multi(
&self,
queries: Vec<Proj::Query>,
) -> MultiEventStream<Proj::Query, Proj::Event> {
use tokio_stream::StreamExt;
let queries = queries.clone();

Box::pin(async_stream::try_stream! {
for query in queries {
let mut stream = self.get_events(&query, GetEventsOpts::default());
while let Some(event) = stream.next().await {
let (event_id, event) = event?;
yield (query.clone(), event_id, event)
}
}
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this method not using multi-load from event store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea was to have custom implementation for postges-based event stores, but use default one for inmem and sqlite (where performance is not critical)
would you like to have multi-load based implementation for non-prod cases as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants