diff --git a/src/frontend/planner_test/tests/testdata/input/agg.yaml b/src/frontend/planner_test/tests/testdata/input/agg.yaml index 3013a056ec346..154680c8acbc8 100644 --- a/src/frontend/planner_test/tests/testdata/input/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/input/agg.yaml @@ -180,6 +180,7 @@ expected_outputs: - batch_plan - stream_plan + - sql: | create table t (v1 real); select v1, count(*) from t group by v1; @@ -192,24 +193,31 @@ select v1, max(v2) from mv group by v1; expected_outputs: - batch_plan -- name: Use BatchSortAgg, when output requires order - sql: | +- sql: | create table t(v1 int, v2 int); select v1, max(v2) from t group by v1 order by v1 desc; expected_outputs: - batch_plan -- name: Use BatchSortAgg, when required order satisfies input order - sql: | +- sql: | create table t(k1 int, k2 int, v1 int); SELECT max(v1), k1, k2 from t group by k1, k2 order by k1; expected_outputs: - batch_plan -- name: Use BatchSortAgg, when output requires order with swapped output - sql: | +- sql: | create table t(v1 int, v2 int); select max(v2), v1 from t group by v1 order by v1 desc; expected_outputs: - batch_plan +- sql: | + create table t (a int, b int, c int, primary key (a, b, c)); + select a, c, first_value(b order by b), count(*) from t group by a, c; + expected_outputs: + - batch_plan +- sql: | + create table t (a int, b int, c int, primary key (a, b, c)); + select a, c, first_value(b order by b), count(*) from t group by a, c having a = 1;; + expected_outputs: + - batch_plan - name: Not use BatchSortAgg, when input provides order sql: | create table t(v1 int, v2 int); @@ -243,6 +251,7 @@ RW_BATCH_ENABLE_SORT_AGG: 'false' expected_outputs: - batch_plan + - sql: | create table t (v1 real); select count(*) from t; diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index 48217f78635f3..b66da18839e2f 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -314,8 +314,7 @@ └─BatchSortAgg { group_key: [mv.v1], aggs: [max(mv.v2)] } └─BatchExchange { order: [mv.v1 DESC], dist: HashShard(mv.v1) } └─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: SomeShard } -- name: Use BatchSortAgg, when output requires order - sql: | +- sql: | create table t(v1 int, v2 int); select v1, max(v2) from t group by v1 order by v1 desc; batch_plan: |- @@ -324,8 +323,7 @@ └─BatchHashAgg { group_key: [t.v1], aggs: [max(t.v2)] } └─BatchExchange { order: [], dist: HashShard(t.v1) } └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } -- name: Use BatchSortAgg, when required order satisfies input order - sql: | +- sql: | create table t(k1 int, k2 int, v1 int); SELECT max(v1), k1, k2 from t group by k1, k2 order by k1; batch_plan: |- @@ -335,8 +333,7 @@ └─BatchHashAgg { group_key: [t.k1, t.k2], aggs: [max(t.v1)] } └─BatchExchange { order: [], dist: HashShard(t.k1, t.k2) } └─BatchScan { table: t, columns: [t.k1, t.k2, t.v1], distribution: SomeShard } -- name: Use BatchSortAgg, when output requires order with swapped output - sql: | +- sql: | create table t(v1 int, v2 int); select max(v2), v1 from t group by v1 order by v1 desc; batch_plan: |- @@ -346,6 +343,22 @@ └─BatchHashAgg { group_key: [t.v1], aggs: [max(t.v2)] } └─BatchExchange { order: [], dist: HashShard(t.v1) } └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } +- sql: | + create table t (a int, b int, c int, primary key (a, b, c)); + select a, c, first_value(b order by b), count(*) from t group by a, c; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashAgg { group_key: [t.a, t.c], aggs: [first_value(t.b order_by(t.b ASC)), count] } + └─BatchExchange { order: [], dist: HashShard(t.a, t.c) } + └─BatchScan { table: t, columns: [t.a, t.b, t.c], distribution: UpstreamHashShard(t.a, t.b, t.c) } +- sql: | + create table t (a int, b int, c int, primary key (a, b, c)); + select a, c, first_value(b order by b), count(*) from t group by a, c having a = 1;; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashAgg { group_key: [t.a, t.c], aggs: [first_value(t.b order_by(t.b ASC)), count] } + └─BatchExchange { order: [], dist: HashShard(t.a, t.c) } + └─BatchScan { table: t, columns: [t.a, t.b, t.c], scan_ranges: [t.a = Int32(1)], distribution: UpstreamHashShard(t.a, t.b, t.c) } - name: Not use BatchSortAgg, when input provides order sql: | create table t(v1 int, v2 int); diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 985d85906fc7f..33f6889e953b1 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -168,13 +168,14 @@ impl Agg { impl Agg { // Check if the input is already sorted on group keys. pub(crate) fn input_provides_order_on_group_keys(&self) -> bool { - self.group_key.indices().all(|group_by_idx| { - self.input - .order() - .column_orders - .iter() - .any(|order| order.column_index == group_by_idx) - }) + let mut input_order_prefix = IndexSet::empty(); + for input_order_col in &self.input.order().column_orders { + if !self.group_key.contains(input_order_col.column_index) { + break; + } + input_order_prefix.insert(input_order_col.column_index); + } + self.group_key == input_order_prefix } }