From 8c6ea397f7830c96d45b71fd1fb29c7aa2469fe2 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 22 Jul 2024 02:13:27 +0800 Subject: [PATCH] test case of approx_percentile alone --- .../tests/testdata/output/agg.yaml | 16 ++-- .../src/optimizer/plan_node/generic/agg.rs | 1 + .../src/optimizer/plan_node/logical_agg.rs | 76 +++++++++++++++---- .../stream_global_approx_percentile.rs | 7 +- .../optimizer/plan_node/stream_keyed_merge.rs | 1 + .../stream_local_approx_percentile.rs | 10 +-- 6 files changed, 81 insertions(+), 30 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index f4f170d0ba1bf..05bf361e88494 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -1867,14 +1867,14 @@ CREATE TABLE t (v1 int); SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t; logical_plan: |- - LogicalProject { exprs: [approx_percentile($expr1 order_by(t.v1 ASC))] } - └─LogicalAgg { aggs: [approx_percentile($expr1 order_by(t.v1 ASC))] } - └─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v1] } + LogicalProject { exprs: [approx_percentile($expr1)] } + └─LogicalAgg { aggs: [approx_percentile($expr1)] } + └─LogicalProject { exprs: [t.v1::Float64 as $expr1] } └─LogicalScan { table: t, columns: [t.v1, t._row_id] } stream_plan: |- - StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [approx_percentile($expr1 order_by(t.v1 ASC))] } - └─StreamSimpleAgg { aggs: [approx_percentile($expr1 order_by(t.v1 ASC)), count] } - └─StreamExchange { dist: Single } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } + StreamMaterialize { columns: [approx_percentile], stream_key: [approx_percentile], pk_columns: [approx_percentile], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(approx_percentile) } + └─StreamGlobalApproxPercentile + └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Decimal, relative_error: 0.01:Decimal } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 8f999988c4824..6b59f790ad4ef 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -135,6 +135,7 @@ impl Agg { self.agg_calls.iter().all(|c| { matches!(c.agg_kind, agg_kinds::single_value_state!()) || (matches!(c.agg_kind, agg_kinds::single_value_state_iff_in_append_only!() if stream_input_append_only)) + || (matches!(c.agg_kind, AggKind::ApproxPercentile)) }) } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 178bf3cc9ba48..d573096d2304e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -61,9 +61,39 @@ impl LogicalAgg { /// Generate plan for stateless 2-phase streaming agg. /// Should only be used iff input is distributed. Input must be converted to stream form. fn gen_stateless_two_phase_streaming_agg_plan(&self, stream_input: PlanRef) -> Result { + println!("generating stateless simple agg plan"); debug_assert!(self.group_key().is_empty()); let mut core = self.core.clone(); - core.input = stream_input; + + // First, handle approx percentile. + let has_approx_percentile = self + .agg_calls() + .iter() + .any(|agg_call| agg_call.agg_kind == AggKind::ApproxPercentile); + let approx_percentile_info = if has_approx_percentile { + let ( + approx_percentile_agg_call, + non_approx_percentile_agg_calls, + lhs_mapping, + rhs_mapping, + ) = self.extract_approx_percentile(); + if non_approx_percentile_agg_calls.is_empty() { + let approx_percentile_agg: PlanRef = + self.build_approx_percentile_agg(stream_input.clone(), &approx_percentile_agg_call); + return Ok(approx_percentile_agg); + } else { + let stream_input: PlanRef = StreamShare::new_from_input(stream_input).into(); + let approx_percentile_agg: PlanRef = + self.build_approx_percentile_agg(stream_input.clone(), &approx_percentile_agg_call); + + core.input = stream_input; + core.agg_calls = non_approx_percentile_agg_calls; + Some((approx_percentile_agg, lhs_mapping, rhs_mapping)) + } + } else { + core.input = stream_input; + None + }; let local_agg = StreamStatelessSimpleAgg::new(core); let exchange = RequiredDist::single().enforce_if_not_satisfies(local_agg.into(), &Order::any())?; @@ -78,7 +108,17 @@ impl LogicalAgg { IndexSet::empty(), exchange, )); - Ok(global_agg.into()) + if let Some((approx_percentile_agg, lhs_mapping, rhs_mapping)) = approx_percentile_info { + let keyed_merge = StreamKeyedMerge::new( + global_agg.into(), + approx_percentile_agg, + lhs_mapping, + rhs_mapping, + ); + Ok(keyed_merge.into()) + } else { + Ok(global_agg.into()) + } } /// Generate plan for stateless/stateful 2-phase streaming agg. @@ -198,15 +238,14 @@ impl LogicalAgg { } else { self.core.can_two_phase_agg() }); - - // Handle 2-phase approx_percentile aggregation. - if self - .agg_calls() - .iter() - .any(|agg_call| agg_call.agg_kind == AggKind::ApproxPercentile) - { - return self.gen_approx_percentile_plan(stream_input, ctx); - } + // // Handle 2-phase approx_percentile aggregation. + // if self + // .agg_calls() + // .iter() + // .any(|agg_call| agg_call.agg_kind == AggKind::ApproxPercentile) + // { + // return self.gen_two_phase_approx_percentile_plan(stream_input, ctx); + // } // Stateless 2-phase simple agg // can be applied on stateless simple agg calls, @@ -277,7 +316,7 @@ impl LogicalAgg { // for the agg step, we need to count by bucket id, and not just dispatch a single record // downstream. So we need a custom executor for this logic. // TODO: Ban distinct agg? - fn gen_approx_percentile_plan( + fn gen_two_phase_approx_percentile_plan( &self, stream_input: PlanRef, ctx: &mut ToStreamContext, @@ -295,7 +334,7 @@ impl LogicalAgg { self.extract_approx_percentile(); let approx_percentile_agg = self.build_approx_percentile_agg(shared_input.clone(), &approx_percentile_agg_call); - let simple_agg_without_approx_percentile = Agg::new( + let simple_agg_without_approx_percentile: PlanRef = Agg::new( non_approx_percentile_agg_calls, self.group_key().clone(), shared_input, @@ -303,6 +342,8 @@ impl LogicalAgg { .with_grouping_sets(self.grouping_sets().clone()) .with_enable_two_phase(self.core().enable_two_phase) .into(); + let simple_agg_without_approx_percentile = + simple_agg_without_approx_percentile.to_stream(ctx)?; let agg_merge = StreamKeyedMerge::new( simple_agg_without_approx_percentile, approx_percentile_agg, @@ -1246,6 +1287,15 @@ impl ToStream for LogicalAgg { }, final_agg.agg_calls().len(), ) + } else if let Some(approx_percentile_agg) = plan.as_stream_global_approx_percentile() { + if eowc { + return Err(ErrorCode::InvalidInputSyntax( + "`EMIT ON WINDOW CLOSE` cannot be used for aggregation without `GROUP BY`" + .to_string(), + ) + .into()); + } + (plan.clone(), 1) } else { panic!("the root PlanNode must be either StreamHashAgg or StreamSimpleAgg"); }; diff --git a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs index 4729bddfd1e03..62d93138856b8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs +++ b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use fixedbitset::FixedBitSet; use pretty_xmlish::XmlNode; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; @@ -38,15 +39,16 @@ pub struct StreamGlobalApproxPercentile { impl StreamGlobalApproxPercentile { pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self { let schema = Schema::new(vec![Field::new("approx_percentile", DataType::Float64)]); + let watermark_columns = FixedBitSet::with_capacity(1); let base = PlanBase::new_stream( input.ctx(), schema, - input.stream_key().map(|k| k.to_vec()), + Some(vec![0]), input.functional_dependency().clone(), input.distribution().clone(), input.append_only(), input.emit_on_window_close(), - input.watermark_columns().clone(), + watermark_columns, input.columns_monotonicity().clone(), ); Self { base, input } @@ -92,6 +94,5 @@ impl ExprRewritable for StreamGlobalApproxPercentile { impl ExprVisitable for StreamGlobalApproxPercentile { fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - unimplemented!() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs b/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs index c917f0c5522b2..493b10400e03c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs +++ b/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs @@ -47,6 +47,7 @@ impl StreamKeyedMerge { lhs_mapping: ColIndexMapping, rhs_mapping: ColIndexMapping, ) -> Self { + // FIXME: schema is wrong. let base = PlanBase::new_stream( lhs_input.ctx(), lhs_input.schema().clone(), diff --git a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs index 30ee60fcd3223..64a355c471a75 100644 --- a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs +++ b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use fixedbitset::FixedBitSet; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; @@ -41,7 +42,6 @@ pub struct StreamLocalApproxPercentile { quantile: Literal, relative_error: Literal, percentile_col: InputRef, - order_type: OrderType, } impl StreamLocalApproxPercentile { @@ -50,6 +50,8 @@ impl StreamLocalApproxPercentile { Field::new("bucket_id", DataType::Int64), Field::new("count", DataType::Int64), ]); + // FIXME(kwannoel): How does watermark work with FixedBitSet + let watermark_columns = FixedBitSet::with_capacity(2); let base = PlanBase::new_stream( input.ctx(), schema, @@ -58,7 +60,7 @@ impl StreamLocalApproxPercentile { input.distribution().clone(), input.append_only(), input.emit_on_window_close(), - input.watermark_columns().clone(), + watermark_columns, input.columns_monotonicity().clone(), ); Self { @@ -67,7 +69,6 @@ impl StreamLocalApproxPercentile { quantile: approx_percentile_agg_call.direct_args[0].clone(), relative_error: approx_percentile_agg_call.direct_args[1].clone(), percentile_col: approx_percentile_agg_call.inputs[0].clone(), - order_type: approx_percentile_agg_call.order_by[0].order_type, } } } @@ -84,7 +85,6 @@ impl Distill for StreamLocalApproxPercentile { )); out.push(("quantile", Pretty::debug(&self.quantile))); out.push(("relative_error", Pretty::debug(&self.relative_error))); - out.push(("order_type", Pretty::display(&self.order_type))); if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { out.push(("output_watermarks", ow)); } @@ -104,7 +104,6 @@ impl PlanTreeNodeUnary for StreamLocalApproxPercentile { quantile: self.quantile.clone(), relative_error: self.relative_error.clone(), percentile_col: self.percentile_col.clone(), - order_type: self.order_type, } } } @@ -129,6 +128,5 @@ impl ExprRewritable for StreamLocalApproxPercentile { impl ExprVisitable for StreamLocalApproxPercentile { fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - unimplemented!() } }