From f09f195499381ee54904bfdcdc879d4903a7366c Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Wed, 14 Aug 2024 17:36:46 +0800 Subject: [PATCH] feat(frontend): support two phase vnode based simple agg with approx_percentile (#18007) --- ...e_approx_percentile_merge_stateful_agg.slt | 80 ++++++++++ ...approx_percentile_merge_stateless_agg.slt} | 13 -- .../tests/testdata/input/agg.yaml | 7 + .../tests/testdata/output/agg.yaml | 30 ++++ .../src/optimizer/plan_node/logical_agg.rs | 148 ++++++++++++------ 5 files changed, 217 insertions(+), 61 deletions(-) create mode 100644 e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateful_agg.slt rename e2e_test/streaming/aggregate/{two_phase_approx_percentile_merge_normal_agg.slt => two_phase_approx_percentile_merge_stateless_agg.slt} (90%) diff --git a/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateful_agg.slt b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateful_agg.slt new file mode 100644 index 0000000000000..012b1ffffb762 --- /dev/null +++ b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateful_agg.slt @@ -0,0 +1,80 @@ +# Single phase approx percentile +statement ok +create table t(p_col double, grp_col int); + +statement ok +insert into t select a, 1 from generate_series(-1000, 1000) t(a); + +statement ok +flush; + +query I +select + percentile_cont(0.01) within group (order by p_col) as p01, + min(p_col), + percentile_cont(0.5) within group (order by p_col) as p50, + count(*), + percentile_cont(0.99) within group (order by p_col) as p99 +from t; +---- +-980 -1000 0 2001 980 + +statement ok +create materialized view m1 as + select + approx_percentile(0.01, 0.01) within group (order by p_col) as p01, + min(p_col), + approx_percentile(0.5, 0.01) within group (order by p_col) as p50, + count(*), + approx_percentile(0.99, 0.01) within group (order by p_col) as p99 + from t; + +query I +select * from m1; +---- +-982.5779489474152 -1000 0 2001 982.5779489474152 + +# Test state encode / decode +onlyif can-use-recover +statement ok +recover; + +onlyif can-use-recover +sleep 10s + +query I +select * from m1; +---- +-982.5779489474152 -1000 0 2001 982.5779489474152 + +# Test 0= 2; - core.input = if needs_row_merge { - // If there's row merge, we need to share the input. - StreamShare::new_from_input(stream_input.clone()).into() - } else { - stream_input - }; - core.agg_calls = non_approx_percentile_agg_calls; + let (non_approx_percentile_col_mapping, approx_percentile_col_mapping, approx_percentile) = + self.prepare_approx_percentile(&mut core, stream_input.clone())?; - let approx_percentile = - self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?; - - // ====== Handle normal aggs if core.agg_calls.is_empty() { if let Some(approx_percentile) = approx_percentile { return Ok(approx_percentile); }; bail!("expected at least one agg call"); } + + // ====== Handle normal aggs let total_agg_calls = core .agg_calls .iter() @@ -123,21 +102,12 @@ impl LogicalAgg { new_stream_simple_agg(Agg::new(total_agg_calls, IndexSet::empty(), exchange)); // ====== Merge approx percentile and normal aggs - if let Some(approx_percentile) = approx_percentile { - if needs_row_merge { - let row_merge = StreamRowMerge::new( - approx_percentile, - global_agg.into(), - approx_percentile_col_mapping, - non_approx_percentile_col_mapping, - )?; - Ok(row_merge.into()) - } else { - Ok(approx_percentile) - } - } else { - Ok(global_agg.into()) - } + Self::add_row_merge_if_needed( + approx_percentile, + global_agg.into(), + approx_percentile_col_mapping, + non_approx_percentile_col_mapping, + ) } /// Generate plan for stateless/stateful 2-phase streaming agg. @@ -148,10 +118,21 @@ impl LogicalAgg { stream_input: PlanRef, dist_key: &[usize], ) -> Result { - let input_col_num = stream_input.schema().len(); + let mut core = self.core.clone(); + + let (non_approx_percentile_col_mapping, approx_percentile_col_mapping, approx_percentile) = + self.prepare_approx_percentile(&mut core, stream_input.clone())?; + + if core.agg_calls.is_empty() { + if let Some(approx_percentile) = approx_percentile { + return Ok(approx_percentile); + }; + bail!("expected at least one agg call"); + } // Generate vnode via project // TODO(kwannoel): We should apply Project optimization rules here. + let input_col_num = stream_input.schema().len(); // get schema len before moving `stream_input`. let project = StreamProject::new(generic::Project::with_vnode_col(stream_input, dist_key)); let vnode_col_idx = project.base.schema().len() - 1; @@ -160,7 +141,7 @@ impl LogicalAgg { local_group_key.insert(vnode_col_idx); let n_local_group_key = local_group_key.len(); let local_agg = new_stream_hash_agg( - Agg::new(self.agg_calls().to_vec(), local_group_key, project.into()), + Agg::new(core.agg_calls.to_vec(), local_group_key, project.into()), Some(vnode_col_idx), ); // Global group key excludes vnode. @@ -173,11 +154,11 @@ impl LogicalAgg { .expect("some input group key could not be mapped"); // Generate global agg step - if self.group_key().is_empty() { + let global_agg = if self.group_key().is_empty() { let exchange = RequiredDist::single().enforce_if_not_satisfies(local_agg.into(), &Order::any())?; let global_agg = new_stream_simple_agg(Agg::new( - self.agg_calls() + core.agg_calls .iter() .enumerate() .map(|(partial_output_idx, agg_call)| { @@ -187,7 +168,7 @@ impl LogicalAgg { global_group_key.into_iter().collect(), exchange, )); - Ok(global_agg.into()) + global_agg.into() } else { let exchange = RequiredDist::shard_by_key(input_col_num, &global_group_key) .enforce_if_not_satisfies(local_agg.into(), &Order::any())?; @@ -195,7 +176,7 @@ impl LogicalAgg { // we can just follow it. let global_agg = new_stream_hash_agg( Agg::new( - self.agg_calls() + core.agg_calls .iter() .enumerate() .map(|(partial_output_idx, agg_call)| { @@ -208,8 +189,14 @@ impl LogicalAgg { ), None, ); - Ok(global_agg.into()) - } + global_agg.into() + }; + Self::add_row_merge_if_needed( + approx_percentile, + global_agg, + approx_percentile_col_mapping, + non_approx_percentile_col_mapping, + ) } fn gen_single_plan(&self, stream_input: PlanRef) -> Result { @@ -304,6 +291,71 @@ impl LogicalAgg { } } + /// Prepares metadata and the `approx_percentile` plan, if there's one present. + /// It may modify `core.agg_calls` to separate normal agg and approx percentile agg, + /// and `core.input` to share the input via `StreamShare`, + /// to both approx percentile agg and normal agg. + fn prepare_approx_percentile( + &self, + core: &mut Agg, + stream_input: PlanRef, + ) -> Result<(ColIndexMapping, ColIndexMapping, Option)> { + let SeparatedAggInfo { normal, approx } = self.separate_normal_and_special_agg(); + + let AggInfo { + calls: non_approx_percentile_agg_calls, + col_mapping: non_approx_percentile_col_mapping, + } = normal; + let AggInfo { + calls: approx_percentile_agg_calls, + col_mapping: approx_percentile_col_mapping, + } = approx; + if !self.group_key().is_empty() && !approx_percentile_agg_calls.is_empty() { + bail_not_implemented!("two-phase approx percentile agg with group key, please use single phase agg for approx_percentile with group key"); + } + + // Either we have approx percentile aggs and non_approx percentile aggs, + // or we have at least 2 approx percentile aggs. + let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty() + && !approx_percentile_agg_calls.is_empty()) + || approx_percentile_agg_calls.len() >= 2; + core.input = if needs_row_merge { + // If there's row merge, we need to share the input. + StreamShare::new_from_input(stream_input.clone()).into() + } else { + stream_input + }; + core.agg_calls = non_approx_percentile_agg_calls; + + let approx_percentile = + self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?; + Ok(( + non_approx_percentile_col_mapping, + approx_percentile_col_mapping, + approx_percentile, + )) + } + + /// Add `RowMerge` if needed + fn add_row_merge_if_needed( + approx_percentile: Option, + global_agg: PlanRef, + approx_percentile_col_mapping: ColIndexMapping, + non_approx_percentile_col_mapping: ColIndexMapping, + ) -> Result { + if let Some(approx_percentile) = approx_percentile { + let row_merge = StreamRowMerge::new( + approx_percentile, + global_agg, + approx_percentile_col_mapping, + non_approx_percentile_col_mapping, + )?; + Ok(row_merge.into()) + } else { + Ok(global_agg) + } + } + fn separate_normal_and_special_agg(&self) -> SeparatedAggInfo { let estimated_len = self.agg_calls().len() - 1; let mut approx_percentile_agg_calls = Vec::with_capacity(estimated_len);