Skip to content

Commit

Permalink
refactor SeparatedAggInfo + reuse Field::with_name
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 24, 2024
1 parent 1dd7c2b commit 1e195fc
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 28 deletions.
9 changes: 0 additions & 9 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ impl Field {
name: self.name.to_string(),
}
}

pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
Self {
data_type,
name: name.into(),
sub_fields: vec![],
type_name: String::new(),
}
}
}

impl From<&ColumnDesc> for Field {
Expand Down
43 changes: 27 additions & 16 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ use crate::utils::{
ColIndexMapping, ColIndexMappingRewriteExt, Condition, GroupBy, IndexSet, Substitute,
};

pub struct AggInfo {
pub calls: Vec<PlanAggCall>,
pub col_mapping: ColIndexMapping,
}

/// `SeparatedAggInfo` is used to separate normal and approx percentile aggs.
pub struct SeparatedAggInfo {
pub approx_percentile_agg_calls: Vec<PlanAggCall>,
pub non_approx_percentile_agg_calls: Vec<PlanAggCall>,
pub approx_percentile_col_mapping: ColIndexMapping,
pub non_approx_percentile_col_mapping: ColIndexMapping,
normal: AggInfo,
approx: AggInfo,
}

/// `LogicalAgg` groups input data by their group key and computes aggregation functions.
Expand All @@ -72,12 +76,16 @@ impl LogicalAgg {
let mut core = self.core.clone();

// ====== Handle approx percentile aggs
let SeparatedAggInfo {
approx_percentile_agg_calls,
non_approx_percentile_agg_calls,
approx_percentile_col_mapping,
non_approx_percentile_col_mapping,
} = self.separate_normal_and_special_agg();
let SeparatedAggInfo { normal, approx } = self.separate_normal_and_special_agg();

let AggInfo {
calls: non_approx_percentile_agg_calls,
col_mapping: non_approx_percentile_col_mapping,
} = normal;
let AggInfo {
calls: approx_percentile_agg_calls,
col_mapping: approx_percentile_col_mapping,
} = approx;

let needs_keyed_merge = (!non_approx_percentile_agg_calls.is_empty()
&& !approx_percentile_agg_calls.is_empty())
Expand Down Expand Up @@ -305,18 +313,21 @@ impl LogicalAgg {
non_approx_percentile_col_mapping.push(Some(output_idx));
}
}
SeparatedAggInfo {
approx_percentile_agg_calls,
non_approx_percentile_agg_calls,
approx_percentile_col_mapping: ColIndexMapping::new(
let normal = AggInfo {
calls: approx_percentile_agg_calls,
col_mapping: ColIndexMapping::new(
approx_percentile_col_mapping,
self.agg_calls().len(),
),
non_approx_percentile_col_mapping: ColIndexMapping::new(
};
let approx = AggInfo {
calls: non_approx_percentile_agg_calls,
col_mapping: ColIndexMapping::new(
non_approx_percentile_col_mapping,
self.agg_calls().len(),
),
}
};
SeparatedAggInfo { normal, approx }
}

fn build_approx_percentile_agg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ pub struct StreamGlobalApproxPercentile {

impl StreamGlobalApproxPercentile {
pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
let schema = Schema::new(vec![Field::new("approx_percentile", DataType::Float64)]);
let schema = Schema::new(vec![Field::with_name(
DataType::Float64,
"approx_percentile",
)]);
let watermark_columns = FixedBitSet::with_capacity(1);
let base = PlanBase::new_stream(
input.ctx(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ pub struct StreamLocalApproxPercentile {
impl StreamLocalApproxPercentile {
pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
let schema = Schema::new(vec![
Field::new("bucket_id", DataType::Int64),
Field::new("count", DataType::Int64),
Field::with_name(DataType::Int64, "bucket_id"),
Field::with_name(DataType::Int64, "count"),
]);
// FIXME(kwannoel): How does watermark work with FixedBitSet
let watermark_columns = FixedBitSet::with_capacity(2);
Expand Down

0 comments on commit 1e195fc

Please sign in to comment.