From 1e195fc4af8f863182ede93fbe8bb7e4e42c091a Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 24 Jul 2024 16:18:14 +0800 Subject: [PATCH] refactor SeparatedAggInfo + reuse Field::with_name --- src/common/src/catalog/schema.rs | 9 ---- .../src/optimizer/plan_node/logical_agg.rs | 43 ++++++++++++------- .../stream_global_approx_percentile.rs | 5 ++- .../stream_local_approx_percentile.rs | 4 +- 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index e4da4b48ddf56..113d9f804b3d4 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -47,15 +47,6 @@ impl Field { name: self.name.to_string(), } } - - pub fn new(name: impl Into, data_type: DataType) -> Self { - Self { - data_type, - name: name.into(), - sub_fields: vec![], - type_name: String::new(), - } - } } impl From<&ColumnDesc> for Field { diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index f70098bbb091c..b73f014f13186 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -45,11 +45,15 @@ use crate::utils::{ ColIndexMapping, ColIndexMappingRewriteExt, Condition, GroupBy, IndexSet, Substitute, }; +pub struct AggInfo { + pub calls: Vec, + pub col_mapping: ColIndexMapping, +} + +/// `SeparatedAggInfo` is used to separate normal and approx percentile aggs. pub struct SeparatedAggInfo { - pub approx_percentile_agg_calls: Vec, - pub non_approx_percentile_agg_calls: Vec, - pub approx_percentile_col_mapping: ColIndexMapping, - pub non_approx_percentile_col_mapping: ColIndexMapping, + normal: AggInfo, + approx: AggInfo, } /// `LogicalAgg` groups input data by their group key and computes aggregation functions. @@ -72,12 +76,16 @@ impl LogicalAgg { let mut core = self.core.clone(); // ====== Handle approx percentile aggs - let SeparatedAggInfo { - approx_percentile_agg_calls, - non_approx_percentile_agg_calls, - approx_percentile_col_mapping, - non_approx_percentile_col_mapping, - } = self.separate_normal_and_special_agg(); + let SeparatedAggInfo { normal, approx } = self.separate_normal_and_special_agg(); + + let AggInfo { + calls: non_approx_percentile_agg_calls, + col_mapping: non_approx_percentile_col_mapping, + } = normal; + let AggInfo { + calls: approx_percentile_agg_calls, + col_mapping: approx_percentile_col_mapping, + } = approx; let needs_keyed_merge = (!non_approx_percentile_agg_calls.is_empty() && !approx_percentile_agg_calls.is_empty()) @@ -305,18 +313,21 @@ impl LogicalAgg { non_approx_percentile_col_mapping.push(Some(output_idx)); } } - SeparatedAggInfo { - approx_percentile_agg_calls, - non_approx_percentile_agg_calls, - approx_percentile_col_mapping: ColIndexMapping::new( + let normal = AggInfo { + calls: approx_percentile_agg_calls, + col_mapping: ColIndexMapping::new( approx_percentile_col_mapping, self.agg_calls().len(), ), - non_approx_percentile_col_mapping: ColIndexMapping::new( + }; + let approx = AggInfo { + calls: non_approx_percentile_agg_calls, + col_mapping: ColIndexMapping::new( non_approx_percentile_col_mapping, self.agg_calls().len(), ), - } + }; + SeparatedAggInfo { normal, approx } } fn build_approx_percentile_agg( diff --git a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs index 4741c28661bbd..22fe3b33ab69b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs +++ b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs @@ -42,7 +42,10 @@ pub struct StreamGlobalApproxPercentile { impl StreamGlobalApproxPercentile { pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self { - let schema = Schema::new(vec![Field::new("approx_percentile", DataType::Float64)]); + let schema = Schema::new(vec![Field::with_name( + DataType::Float64, + "approx_percentile", + )]); let watermark_columns = FixedBitSet::with_capacity(1); let base = PlanBase::new_stream( input.ctx(), diff --git a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs index 4f312524ee409..a4fb2a6029179 100644 --- a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs +++ b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs @@ -43,8 +43,8 @@ pub struct StreamLocalApproxPercentile { impl StreamLocalApproxPercentile { pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self { let schema = Schema::new(vec![ - Field::new("bucket_id", DataType::Int64), - Field::new("count", DataType::Int64), + Field::with_name(DataType::Int64, "bucket_id"), + Field::with_name(DataType::Int64, "count"), ]); // FIXME(kwannoel): How does watermark work with FixedBitSet let watermark_columns = FixedBitSet::with_capacity(2);