Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: column index mapping bug of (#13090) #13107

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_12620.slt

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,8 @@
create table t(a int, b int, c int);
select a, count(distinct b) as distinct_b_num, sum(distinct c) filter(where c < 100) as distinct_c_sum from t group by a;
optimized_logical_plan_for_batch: |-
LogicalAgg { group_key: [t.a], aggs: [count(t.b) filter((flag = 0:Int64)), sum(t.c) filter((count filter((t.c < 100:Int32)) > 0:Int64) AND (flag = 1:Int64))] }
└─LogicalAgg { group_key: [t.a, t.b, t.c, flag], aggs: [count filter((t.c < 100:Int32))] }
LogicalAgg { group_key: [t.a_expanded], aggs: [count(t.b_expanded) filter((flag = 0:Int64)), sum(t.c_expanded) filter((count filter((t.c < 100:Int32)) > 0:Int64) AND (flag = 1:Int64))] }
└─LogicalAgg { group_key: [t.a_expanded, t.b_expanded, t.c_expanded, flag], aggs: [count filter((t.c < 100:Int32))] }
└─LogicalExpand { column_subsets: [[t.a, t.b], [t.a, t.c]] }
└─LogicalScan { table: t, columns: [t.a, t.b, t.c] }
- name: single distinct agg and non-disintct agg
Expand Down Expand Up @@ -834,16 +834,16 @@
create table t(a int, b int, c int);
select a, count(distinct b) as distinct_b_num, count(distinct c) as distinct_c_sum, sum(c) as sum_c from t group by a;
optimized_logical_plan_for_batch: |-
LogicalAgg { group_key: [t.a], aggs: [count(t.b) filter((flag = 1:Int64)), count(t.c) filter((flag = 0:Int64)), sum(sum(t.c)) filter((flag = 0:Int64))] }
└─LogicalAgg { group_key: [t.a, t.b, t.c, flag], aggs: [sum(t.c)] }
LogicalAgg { group_key: [t.a_expanded], aggs: [count(t.b_expanded) filter((flag = 1:Int64)), count(t.c_expanded) filter((flag = 0:Int64)), sum(sum(t.c_expanded)) filter((flag = 0:Int64))] }
└─LogicalAgg { group_key: [t.a_expanded, t.b_expanded, t.c_expanded, flag], aggs: [sum(t.c_expanded)] }
└─LogicalExpand { column_subsets: [[t.a, t.c], [t.a, t.b]] }
└─LogicalScan { table: t, columns: [t.a, t.b, t.c] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [t.a], aggs: [count(t.b) filter((flag = 1:Int64)), count(t.c) filter((flag = 0:Int64)), sum(sum(t.c)) filter((flag = 0:Int64))] }
└─BatchExchange { order: [], dist: HashShard(t.a) }
└─BatchHashAgg { group_key: [t.a, t.b, t.c, flag], aggs: [sum(t.c)] }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t.c, flag) }
└─BatchHashAgg { group_key: [t.a_expanded], aggs: [count(t.b_expanded) filter((flag = 1:Int64)), count(t.c_expanded) filter((flag = 0:Int64)), sum(sum(t.c_expanded)) filter((flag = 0:Int64))] }
└─BatchExchange { order: [], dist: HashShard(t.a_expanded) }
└─BatchHashAgg { group_key: [t.a_expanded, t.b_expanded, t.c_expanded, flag], aggs: [sum(t.c_expanded)] }
└─BatchExchange { order: [], dist: HashShard(t.a_expanded, t.b_expanded, t.c_expanded, flag) }
└─BatchExpand { column_subsets: [[t.a, t.c], [t.a, t.b]] }
└─BatchScan { table: t, columns: [t.a, t.b, t.c], distribution: SomeShard }
stream_plan: |-
Expand Down Expand Up @@ -1039,13 +1039,13 @@
create table t(x int, y int);
select count(distinct x), sum(distinct y) from t;
optimized_logical_plan_for_batch: |-
LogicalAgg { aggs: [count(t.x) filter((flag = 0:Int64)), sum(t.y) filter((flag = 1:Int64))] }
└─LogicalAgg { group_key: [t.x, t.y, flag], aggs: [] }
LogicalAgg { aggs: [count(t.x_expanded) filter((flag = 0:Int64)), sum(t.y_expanded) filter((flag = 1:Int64))] }
└─LogicalAgg { group_key: [t.x_expanded, t.y_expanded, flag], aggs: [] }
└─LogicalExpand { column_subsets: [[t.x], [t.y]] }
└─LogicalScan { table: t, columns: [t.x, t.y] }
optimized_logical_plan_for_stream: |-
LogicalAgg { aggs: [count(t.x) filter((flag = 0:Int64)), sum(t.y) filter((flag = 1:Int64))] }
└─LogicalAgg { group_key: [t.x, t.y, flag], aggs: [] }
LogicalAgg { aggs: [count(t.x_expanded) filter((flag = 0:Int64)), sum(t.y_expanded) filter((flag = 1:Int64))] }
└─LogicalAgg { group_key: [t.x_expanded, t.y_expanded, flag], aggs: [] }
└─LogicalExpand { column_subsets: [[t.x], [t.y]] }
└─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] }
with_config_map:
Expand Down
116 changes: 58 additions & 58 deletions src/frontend/planner_test/tests/testdata/output/grouping_sets.yaml

Large diffs are not rendered by default.

58 changes: 29 additions & 29 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml

Large diffs are not rendered by default.

88 changes: 44 additions & 44 deletions src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1278,10 +1278,10 @@
GROUP BY to_char(date_time, 'yyyy-MM-dd');
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [$expr2], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr4) filter((flag = 1:Int64)), count($expr4) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr5) filter((flag = 2:Int64)), count($expr5) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
└─BatchExchange { order: [], dist: HashShard($expr2) }
└─BatchHashAgg { group_key: [$expr2, $expr4, $expr5, flag], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32))] }
└─BatchExchange { order: [], dist: HashShard($expr2, $expr4, $expr5, flag) }
└─BatchHashAgg { group_key: [$expr2_expanded], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr4_expanded) filter((flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr5_expanded) filter((flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
└─BatchExchange { order: [], dist: HashShard($expr2_expanded) }
└─BatchHashAgg { group_key: [$expr2_expanded, $expr4_expanded, $expr5_expanded, flag], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32))] }
└─BatchExchange { order: [], dist: HashShard($expr2_expanded, $expr4_expanded, $expr5_expanded, flag) }
└─BatchExpand { column_subsets: [[$expr2], [$expr2, $expr4], [$expr2, $expr5]] }
└─BatchProject { exprs: [ToChar($expr1, 'yyyy-MM-dd':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5] }
└─BatchFilter { predicate: (event_type = 2:Int32) }
Expand Down Expand Up @@ -1358,10 +1358,10 @@
GROUP BY channel, to_char(date_time, 'yyyy-MM-dd');
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [$expr2, $expr3], aggs: [max(max($expr4)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr5 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr5 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr6) filter((flag = 1:Int64)), count($expr6) filter((count filter(($expr5 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr6) filter((count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr6) filter((count filter(($expr5 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr7) filter((flag = 2:Int64)), count($expr7) filter((count filter(($expr5 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr7) filter((count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr7) filter((count filter(($expr5 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
└─BatchExchange { order: [], dist: HashShard($expr2, $expr3) }
└─BatchHashAgg { group_key: [$expr2, $expr3, $expr6, $expr7, flag], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32))] }
└─BatchExchange { order: [], dist: HashShard($expr2, $expr3, $expr6, $expr7, flag) }
└─BatchHashAgg { group_key: [$expr2_expanded, $expr3_expanded], aggs: [max(max($expr4_expanded)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr5 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr5 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr6_expanded) filter((flag = 1:Int64)), count($expr6_expanded) filter((count filter(($expr5 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr6_expanded) filter((count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr6_expanded) filter((count filter(($expr5 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr7_expanded) filter((flag = 2:Int64)), count($expr7_expanded) filter((count filter(($expr5 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr7_expanded) filter((count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr7_expanded) filter((count filter(($expr5 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
└─BatchExchange { order: [], dist: HashShard($expr2_expanded, $expr3_expanded) }
└─BatchHashAgg { group_key: [$expr2_expanded, $expr3_expanded, $expr6_expanded, $expr7_expanded, flag], aggs: [max($expr4_expanded), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32))] }
└─BatchExchange { order: [], dist: HashShard($expr2_expanded, $expr3_expanded, $expr6_expanded, $expr7_expanded, flag) }
└─BatchExpand { column_subsets: [[$expr2, $expr3, $expr4], [$expr2, $expr3, $expr6], [$expr2, $expr3, $expr7]] }
└─BatchProject { exprs: [Field(bid, 3:Int32) as $expr2, ToChar($expr1, 'yyyy-MM-dd':Varchar) as $expr3, ToChar($expr1, 'HH:mm':Varchar) as $expr4, Field(bid, 2:Int32) as $expr5, Field(bid, 1:Int32) as $expr6, Field(bid, 0:Int32) as $expr7] }
└─BatchFilter { predicate: (event_type = 2:Int32) }
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/planner_test/tests/testdata/output/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -798,15 +798,15 @@
select * from integers where 2 in (select count(distinct k) + count(distinct v) from rows where correlated_col = integers.correlated_col);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: LeftSemi, predicate: integers.correlated_col IS NOT DISTINCT FROM integers.correlated_col AND 2:Int64 = $expr1, output: [integers.i, integers.correlated_col] }
└─BatchHashJoin { type: LeftSemi, predicate: integers.correlated_col IS NOT DISTINCT FROM integers.correlated_col_expanded AND 2:Int64 = $expr1, output: [integers.i, integers.correlated_col] }
├─BatchExchange { order: [], dist: HashShard(integers.correlated_col) }
│ └─BatchProject { exprs: [integers.i, integers.correlated_col, 2:Int64] }
│ └─BatchScan { table: integers, columns: [integers.i, integers.correlated_col], distribution: SomeShard }
└─BatchProject { exprs: [integers.correlated_col, (count(rows.k) filter((flag = 0:Int64)) + count(rows.v) filter((flag = 1:Int64))) as $expr1] }
└─BatchHashAgg { group_key: [integers.correlated_col], aggs: [count(rows.k) filter((flag = 0:Int64)), count(rows.v) filter((flag = 1:Int64))] }
└─BatchExchange { order: [], dist: HashShard(integers.correlated_col) }
└─BatchHashAgg { group_key: [integers.correlated_col, rows.k, rows.v, flag], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(integers.correlated_col, rows.k, rows.v, flag) }
└─BatchProject { exprs: [integers.correlated_col_expanded, (count(rows.k_expanded) filter((flag = 0:Int64)) + count(rows.v_expanded) filter((flag = 1:Int64))) as $expr1] }
└─BatchHashAgg { group_key: [integers.correlated_col_expanded], aggs: [count(rows.k_expanded) filter((flag = 0:Int64)), count(rows.v_expanded) filter((flag = 1:Int64))] }
└─BatchExchange { order: [], dist: HashShard(integers.correlated_col_expanded) }
└─BatchHashAgg { group_key: [integers.correlated_col_expanded, rows.k_expanded, rows.v_expanded, flag], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(integers.correlated_col_expanded, rows.k_expanded, rows.v_expanded, flag) }
└─BatchExpand { column_subsets: [[integers.correlated_col, rows.k], [integers.correlated_col, rows.v]] }
└─BatchHashJoin { type: LeftOuter, predicate: integers.correlated_col IS NOT DISTINCT FROM rows.correlated_col, output: [integers.correlated_col, rows.k, rows.v, 1:Int32] }
├─BatchHashAgg { group_key: [integers.correlated_col], aggs: [] }
Expand Down
10 changes: 8 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,14 @@ impl<PlanRef: GenericPlanRef> Expand<PlanRef> {

impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
fn schema(&self) -> Schema {
let mut fields = self.input.schema().clone().into_fields();
fields.extend(fields.clone());
let mut fields = self
.input
.schema()
.fields()
.iter()
.map(|f| Field::with_name(f.data_type(), format!("{}_expanded", f.name)))
.collect::<Vec<_>>();
fields.extend(self.input.schema().fields().iter().cloned());
fields.push(Field::with_name(DataType::Int64, "flag"));
Schema::new(fields)
}
Expand Down
32 changes: 24 additions & 8 deletions src/frontend/src/optimizer/plan_node/logical_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,30 @@ impl PlanTreeNodeUnary for LogicalExpand {
.collect_vec()
})
.collect_vec();
let (mut mapping, new_input_col_num) = input_col_change.into_parts();
mapping.extend({
mapping
.iter()
.map(|p| p.map(|i| i + new_input_col_num))
.collect_vec()
});
mapping.push(Some(2 * new_input_col_num));

let old_out_len = self.schema().len();
let old_in_len = self.input().schema().len();
let new_in_len = input.schema().len();
assert_eq!(
old_out_len,
old_in_len * 2 + 1 // expanded input cols + real input cols + flag
);

let mut mapping = Vec::with_capacity(old_out_len);
// map the expanded input columns
for i in 0..old_in_len {
mapping.push(input_col_change.try_map(i));
}
// map the real input columns
for i in 0..old_in_len {
mapping.push(
input_col_change
.try_map(i)
.map(|x| x + new_in_len /* # of new expanded input cols */),
);
}
// map the flag column
mapping.push(Some(2 * new_in_len));

let expand = Self::new(input, column_subsets);
let output_col_num = expand.schema().len();
Expand Down
Loading