-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(frontend): support two phase vnode based simple agg with approx_percentile #18007
Conversation
select * from m1; | ||
---- | ||
-982.5779489474152 0 0 2001 982.5779489474152 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need this extra recovery step. We already did it in the lines before. Hence we can just remove it.
let approx_percentile = | ||
self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?; | ||
|
||
// ====== Handle normal aggs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored into a separate function.
} | ||
} else { | ||
Ok(global_agg.into()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored into a separate function.
return Ok(approx_percentile); | ||
}; | ||
bail!("expected at least one agg call"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just reuse the logic via the refactored functions above, from two phase stateless agg.
global_agg, | ||
approx_percentile_col_mapping, | ||
non_approx_percentile_col_mapping, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just reuse the logic via the refactored functions above, from two phase stateless agg.
/// Prepares metadata and the `approx_percentile` plan, if there's one present. | ||
fn prepare_approx_percentile( | ||
&self, | ||
core: &mut Agg<PlanRef>, | ||
stream_input: PlanRef, | ||
) -> Result<(bool, ColIndexMapping, ColIndexMapping, Option<PlanRef>)> { | ||
let SeparatedAggInfo { normal, approx } = self.separate_normal_and_special_agg(); | ||
|
||
let AggInfo { | ||
calls: non_approx_percentile_agg_calls, | ||
col_mapping: non_approx_percentile_col_mapping, | ||
} = normal; | ||
let AggInfo { | ||
calls: approx_percentile_agg_calls, | ||
col_mapping: approx_percentile_col_mapping, | ||
} = approx; | ||
if !self.group_key().is_empty() && !non_approx_percentile_agg_calls.is_empty() { | ||
bail_not_implemented!("two-phase approx percentile agg with group key, please use single phase agg for approx_percentile with group key"); | ||
} | ||
|
||
let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty() | ||
&& !approx_percentile_agg_calls.is_empty()) | ||
|| approx_percentile_agg_calls.len() >= 2; | ||
core.input = if needs_row_merge { | ||
// If there's row merge, we need to share the input. | ||
StreamShare::new_from_input(stream_input.clone()).into() | ||
} else { | ||
stream_input | ||
}; | ||
core.agg_calls = non_approx_percentile_agg_calls; | ||
|
||
let approx_percentile = | ||
self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?; | ||
Ok(( | ||
needs_row_merge, | ||
non_approx_percentile_col_mapping, | ||
approx_percentile_col_mapping, | ||
approx_percentile, | ||
)) | ||
} | ||
|
||
/// Add `RowMerge` if needed | ||
fn add_row_merge_if_needed( | ||
needs_row_merge: bool, | ||
approx_percentile: Option<PlanRef>, | ||
global_agg: PlanRef, | ||
approx_percentile_col_mapping: ColIndexMapping, | ||
non_approx_percentile_col_mapping: ColIndexMapping, | ||
) -> Result<PlanRef> { | ||
if let Some(approx_percentile) = approx_percentile { | ||
if needs_row_merge { | ||
let row_merge = StreamRowMerge::new( | ||
approx_percentile, | ||
global_agg, | ||
approx_percentile_col_mapping, | ||
non_approx_percentile_col_mapping, | ||
)?; | ||
Ok(row_merge.into()) | ||
} else { | ||
Ok(approx_percentile) | ||
} | ||
} else { | ||
Ok(global_agg) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactored functionalities from above.
create materialized view m1 as | ||
select | ||
approx_percentile(0.01, 0.01) within group (order by p_col) as p01, | ||
min(p_col), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we test a stateful agg, so we can test the two phase vnode based simple agg path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a planner test for this case as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
create materialized view m1 as | ||
select | ||
approx_percentile(0.01, 0.01) within group (order by p_col) as p01, | ||
min(p_col), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a planner test for this case as well?
)?; | ||
Ok(row_merge.into()) | ||
} else { | ||
Ok(approx_percentile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a reachable path? Inputs indicate that we have a global_agg
, but we discard it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
bail!("row_merge not needed, but approx_percentile and normal agg are present"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that is the case, does it mean we don't need this flag needs_row_merge
? Because we can determine whether to merge by approx_percentile: Option<PlanRef>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's true. Simplified it 21019ae
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
@@ -304,6 +291,68 @@ impl LogicalAgg { | |||
} | |||
} | |||
|
|||
/// Prepares metadata and the `approx_percentile` plan, if there's one present. | |||
fn prepare_approx_percentile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function would modify the core.agg_calls and core.input
. We'd better document this behavior.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
As per title. Previously only stateless agg is supported. Now we support two phase vnode based agg too.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.