Skip to content

Commit

Permalink
fix: incorrect batch sort agg on ordered input when group key is not …
Browse files Browse the repository at this point in the history
…prefix of input order columns (#14621)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Jan 17, 2024
1 parent 2c2085f commit fbfc912
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
21 changes: 15 additions & 6 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
expected_outputs:
- batch_plan
- stream_plan

- sql: |
create table t (v1 real);
select v1, count(*) from t group by v1;
Expand All @@ -192,24 +193,31 @@
select v1, max(v2) from mv group by v1;
expected_outputs:
- batch_plan
- name: Use BatchSortAgg, when output requires order
sql: |
- sql: |
create table t(v1 int, v2 int);
select v1, max(v2) from t group by v1 order by v1 desc;
expected_outputs:
- batch_plan
- name: Use BatchSortAgg, when required order satisfies input order
sql: |
- sql: |
create table t(k1 int, k2 int, v1 int);
SELECT max(v1), k1, k2 from t group by k1, k2 order by k1;
expected_outputs:
- batch_plan
- name: Use BatchSortAgg, when output requires order with swapped output
sql: |
- sql: |
create table t(v1 int, v2 int);
select max(v2), v1 from t group by v1 order by v1 desc;
expected_outputs:
- batch_plan
- sql: |
create table t (a int, b int, c int, primary key (a, b, c));
select a, c, first_value(b order by b), count(*) from t group by a, c;
expected_outputs:
- batch_plan
- sql: |
create table t (a int, b int, c int, primary key (a, b, c));
select a, c, first_value(b order by b), count(*) from t group by a, c having a = 1;;
expected_outputs:
- batch_plan
- name: Not use BatchSortAgg, when input provides order
sql: |
create table t(v1 int, v2 int);
Expand Down Expand Up @@ -243,6 +251,7 @@
RW_BATCH_ENABLE_SORT_AGG: 'false'
expected_outputs:
- batch_plan

- sql: |
create table t (v1 real);
select count(*) from t;
Expand Down
25 changes: 19 additions & 6 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@
└─BatchSortAgg { group_key: [mv.v1], aggs: [max(mv.v2)] }
└─BatchExchange { order: [mv.v1 DESC], dist: HashShard(mv.v1) }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: SomeShard }
- name: Use BatchSortAgg, when output requires order
sql: |
- sql: |
create table t(v1 int, v2 int);
select v1, max(v2) from t group by v1 order by v1 desc;
batch_plan: |-
Expand All @@ -324,8 +323,7 @@
└─BatchHashAgg { group_key: [t.v1], aggs: [max(t.v2)] }
└─BatchExchange { order: [], dist: HashShard(t.v1) }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
- name: Use BatchSortAgg, when required order satisfies input order
sql: |
- sql: |
create table t(k1 int, k2 int, v1 int);
SELECT max(v1), k1, k2 from t group by k1, k2 order by k1;
batch_plan: |-
Expand All @@ -335,8 +333,7 @@
└─BatchHashAgg { group_key: [t.k1, t.k2], aggs: [max(t.v1)] }
└─BatchExchange { order: [], dist: HashShard(t.k1, t.k2) }
└─BatchScan { table: t, columns: [t.k1, t.k2, t.v1], distribution: SomeShard }
- name: Use BatchSortAgg, when output requires order with swapped output
sql: |
- sql: |
create table t(v1 int, v2 int);
select max(v2), v1 from t group by v1 order by v1 desc;
batch_plan: |-
Expand All @@ -346,6 +343,22 @@
└─BatchHashAgg { group_key: [t.v1], aggs: [max(t.v2)] }
└─BatchExchange { order: [], dist: HashShard(t.v1) }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
- sql: |
create table t (a int, b int, c int, primary key (a, b, c));
select a, c, first_value(b order by b), count(*) from t group by a, c;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [t.a, t.c], aggs: [first_value(t.b order_by(t.b ASC)), count] }
└─BatchExchange { order: [], dist: HashShard(t.a, t.c) }
└─BatchScan { table: t, columns: [t.a, t.b, t.c], distribution: UpstreamHashShard(t.a, t.b, t.c) }
- sql: |
create table t (a int, b int, c int, primary key (a, b, c));
select a, c, first_value(b order by b), count(*) from t group by a, c having a = 1;;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [t.a, t.c], aggs: [first_value(t.b order_by(t.b ASC)), count] }
└─BatchExchange { order: [], dist: HashShard(t.a, t.c) }
└─BatchScan { table: t, columns: [t.a, t.b, t.c], scan_ranges: [t.a = Int32(1)], distribution: UpstreamHashShard(t.a, t.b, t.c) }
- name: Not use BatchSortAgg, when input provides order
sql: |
create table t(v1 int, v2 int);
Expand Down
15 changes: 8 additions & 7 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,14 @@ impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
impl<PlanRef: BatchPlanRef> Agg<PlanRef> {
// Check if the input is already sorted on group keys.
pub(crate) fn input_provides_order_on_group_keys(&self) -> bool {
self.group_key.indices().all(|group_by_idx| {
self.input
.order()
.column_orders
.iter()
.any(|order| order.column_index == group_by_idx)
})
let mut input_order_prefix = IndexSet::empty();
for input_order_col in &self.input.order().column_orders {
if !self.group_key.contains(input_order_col.column_index) {
break;
}
input_order_prefix.insert(input_order_col.column_index);
}
self.group_key == input_order_prefix
}
}

Expand Down

0 comments on commit fbfc912

Please sign in to comment.