Skip to content

Commit

Permalink
Improve flow queries perfomance (#1023)
Browse files Browse the repository at this point in the history
* Improve flow queries perfomance
  • Loading branch information
rmn-boiko authored Jan 8, 2025
1 parent a574f62 commit 8f2aa8e
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 341 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Fixed
- GQL api flows queries now fetch dataset polling source only once per dataset(and only if Ingest flow type is here)

## [0.216.0] - 2024-12-30
### Changed
- Flight SQL protocol now supports anonymous and bearer token authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ impl DatasetFlowRunsMut {
})?;

Ok(TriggerFlowResult::Success(TriggerFlowSuccess {
flow: Flow::new(flow_state),
flow: Flow::build_batch(vec![flow_state], ctx)
.await?
.pop()
.unwrap(),
}))
}

Expand Down Expand Up @@ -139,7 +142,10 @@ impl DatasetFlowRunsMut {

Ok(CancelScheduledTasksResult::Success(
CancelScheduledTasksSuccess {
flow: Flow::new(flow_state),
flow: Flow::build_batch(vec![flow_state], ctx)
.await?
.pop()
.unwrap(),
},
))
}
Expand Down
7 changes: 2 additions & 5 deletions src/adapter/graphql/src/queries/accounts/account_flow_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,9 @@ impl AccountFlowRuns {
.await
.int_err()?;

let matched_flows: Vec<_> = flows_state_listing
.matched_stream
.map_ok(Flow::new)
.try_collect()
.await?;
let matched_flow_states: Vec<_> = flows_state_listing.matched_stream.try_collect().await?;
let total_count = flows_state_listing.total_count;
let matched_flows = Flow::build_batch(matched_flow_states, ctx).await?;

Ok(FlowConnection::new(
matched_flows,
Expand Down
12 changes: 6 additions & 6 deletions src/adapter/graphql/src/queries/datasets/dataset_flow_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ impl DatasetFlowRuns {
.int_err()?;

Ok(GetFlowResult::Success(GetFlowSuccess {
flow: Flow::new(flow_state),
flow: Flow::build_batch(vec![flow_state], ctx)
.await?
.pop()
.unwrap(),
}))
}

Expand Down Expand Up @@ -106,12 +109,9 @@ impl DatasetFlowRuns {
.await
.int_err()?;

let matched_flows: Vec<_> = flows_state_listing
.matched_stream
.map_ok(Flow::new)
.try_collect()
.await?;
let matched_flow_states: Vec<_> = flows_state_listing.matched_stream.try_collect().await?;
let total_count = flows_state_listing.total_count;
let matched_flows = Flow::build_batch(matched_flow_states, ctx).await?;

Ok(FlowConnection::new(
matched_flows,
Expand Down
Loading

0 comments on commit 8f2aa8e

Please sign in to comment.