diff --git a/src/frontend/planner_test/tests/testdata/input/agg.yaml b/src/frontend/planner_test/tests/testdata/input/agg.yaml index e02e071c70aba..b5b8e182703f8 100644 --- a/src/frontend/planner_test/tests/testdata/input/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/input/agg.yaml @@ -923,3 +923,9 @@ SELECT count(DISTINCT i) FROM integers; expected_outputs: - batch_plan +- sql: | + CREATE TABLE t(id int primary key, a int, b int); + SELECT count(*) FROM t group by a, id, b; + expected_outputs: + - batch_plan + - stream_plan diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index fd57f0dfe9fdb..d62ce89d0ed3b 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -1395,21 +1395,21 @@ sq_1.col_2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [max(max(lineitem.l_commitdate))] } - └─BatchHashAgg { group_key: [lineitem.l_commitdate], aggs: [max(max(lineitem.l_commitdate))] } - └─BatchExchange { order: [], dist: HashShard(lineitem.l_commitdate) } - └─BatchHashAgg { group_key: [lineitem.l_commitdate], aggs: [max(lineitem.l_commitdate)] } - └─BatchHashAgg { group_key: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], aggs: [] } - └─BatchScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], distribution: UpstreamHashShard(lineitem.l_orderkey) } + └─BatchProject { exprs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))))] } + └─BatchHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))))] } + └─BatchExchange { order: [], dist: HashShard(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))) } + └─BatchHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))] } + └─BatchSortAgg { group_key: [lineitem.l_orderkey], aggs: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))] } + └─BatchScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_commitdate], distribution: UpstreamHashShard(lineitem.l_orderkey) } stream_plan: |- - StreamMaterialize { columns: [col_0, lineitem.l_commitdate(hidden)], stream_key: [lineitem.l_commitdate], pk_columns: [lineitem.l_commitdate], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(lineitem.l_commitdate)), lineitem.l_commitdate] } - └─StreamHashAgg { group_key: [lineitem.l_commitdate], aggs: [max(max(lineitem.l_commitdate)), count] } - └─StreamExchange { dist: HashShard(lineitem.l_commitdate) } - └─StreamHashAgg { group_key: [lineitem.l_commitdate, $expr1], aggs: [max(lineitem.l_commitdate), count] } - └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct, Vnode(lineitem.l_orderkey) as $expr1] } - └─StreamHashAgg { group_key: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], aggs: [count] } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], pk: [lineitem.l_orderkey], dist: UpstreamHashShard(lineitem.l_orderkey) } + StreamMaterialize { columns: [col_0, first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))(hidden)], stream_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], pk_columns: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], pk_conflict: NoCheck } + └─StreamProject { exprs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))), first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))] } + └─StreamHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))), count] } + └─StreamExchange { dist: HashShard(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))) } + └─StreamHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), $expr1], aggs: [max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))), count] } + └─StreamProject { exprs: [lineitem.l_orderkey, first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), Vnode(lineitem.l_orderkey) as $expr1] } + └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), count] } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_commitdate], pk: [lineitem.l_orderkey], dist: UpstreamHashShard(lineitem.l_orderkey) } - name: two phase agg on hop window input should use two phase agg sql: | SET QUERY_MODE TO DISTRIBUTED; @@ -1636,3 +1636,16 @@ └─BatchHashAgg { group_key: [integers.i], aggs: [] } └─BatchExchange { order: [], dist: HashShard(integers.i) } └─BatchScan { table: integers, columns: [integers.i], distribution: SomeShard } +- sql: | + CREATE TABLE t(id int primary key, a int, b int); + SELECT count(*) FROM t group by a, id, b; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [count] } + └─BatchSortAgg { group_key: [t.id], aggs: [count] } + └─BatchScan { table: t, columns: [t.id], distribution: UpstreamHashShard(t.id) } + stream_plan: |- + StreamMaterialize { columns: [count, t.id(hidden)], stream_key: [t.id], pk_columns: [t.id], pk_conflict: NoCheck } + └─StreamProject { exprs: [count, t.id] } + └─StreamHashAgg { group_key: [t.id], aggs: [count] } + └─StreamTableScan { table: t, columns: [t.id], pk: [t.id], dist: UpstreamHashShard(t.id) } diff --git a/src/frontend/planner_test/tests/testdata/output/batch_index_join.yaml b/src/frontend/planner_test/tests/testdata/output/batch_index_join.yaml index 945c4a7e707b1..236bc31b2503e 100644 --- a/src/frontend/planner_test/tests/testdata/output/batch_index_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/batch_index_join.yaml @@ -60,9 +60,9 @@ select t2.c, t2.d, count(distinct t.a) from t join t2 on t.a = t2.c group by t2.c, t2.d; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchHashAgg { group_key: [t2.c, t2.d], aggs: [count(t.a)] } - └─BatchExchange { order: [], dist: HashShard(t2.c, t2.d) } - └─BatchHashAgg { group_key: [t2.c, t2.d, t.a], aggs: [] } + └─BatchHashAgg { group_key: [first_value(t2.c order_by(t2.c ASC))], aggs: [first_value(first_value(t2.d order_by(t2.d ASC)) order_by(first_value(t2.d order_by(t2.d ASC)) ASC)), count(t.a)] } + └─BatchExchange { order: [], dist: HashShard(first_value(t2.c order_by(t2.c ASC))) } + └─BatchHashAgg { group_key: [t.a], aggs: [first_value(t2.c order_by(t2.c ASC)), first_value(t2.d order_by(t2.d ASC))] } └─BatchLookupJoin { type: Inner, predicate: t.a = t2.c, output: [t2.c, t2.d, t.a] } └─BatchExchange { order: [], dist: UpstreamHashShard(t.a) } └─BatchScan { table: t, columns: [t.a], distribution: SomeShard } diff --git a/src/frontend/planner_test/tests/testdata/output/except.yaml b/src/frontend/planner_test/tests/testdata/output/except.yaml index e57bf061d5335..58cde9cb9db38 100644 --- a/src/frontend/planner_test/tests/testdata/output/except.yaml +++ b/src/frontend/planner_test/tests/testdata/output/except.yaml @@ -108,69 +108,105 @@ create table t2 (a int, b numeric, c bigint, primary key(a)); select * from t1 except select * from t2; optimized_logical_plan_for_batch: |- - LogicalAgg { group_key: [t1.a, t1.b, t1.c], aggs: [] } + LogicalAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] } └─LogicalJoin { type: LeftAnti, on: IsNotDistinctFrom(t1.a, t2.a) AND IsNotDistinctFrom(t1.b, t2.b) AND IsNotDistinctFrom(t1.c, t2.c), output: all } ├─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.c] } └─LogicalScan { table: t2, columns: [t2.a, t2.b, t2.c] } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [] } + └─BatchHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] } └─BatchLookupJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) } └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: UpstreamHashShard(t1.a) } stream_plan: |- - StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } - └─StreamProject { exprs: [t1.a, t1.b, t1.c] } - └─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } - └─StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } - ├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } - │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } - └─StreamExchange { dist: HashShard(t2.a, t2.b, t2.c) } - └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } + StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } + └─StreamProject { exprs: [t1.a, first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] } + └─StreamHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC)), count] } + └─StreamExchange { dist: HashShard(t1.a) } + └─StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } + ├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } + │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } + └─StreamExchange { dist: HashShard(t2.a, t2.b, t2.c) } + └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } { materialized table: 4294967294 } - └── StreamProject { exprs: [t1.a, t1.b, t1.c] } - └── StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } { intermediate state table: 0, state tables: [], distinct tables: [] } - └── StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } - ├── left table: 1 - ├── right table: 3 - ├── left degree table: 2 - ├── right degree table: 4 - ├── StreamExchange Hash([0, 1, 2]) from 1 - └── StreamExchange Hash([0, 1, 2]) from 2 + StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } { materialized table: 4294967294 } + └── StreamProject { exprs: [t1.a, first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] } + └── StreamHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC)), count] } + ├── intermediate state table: 2 + ├── state tables: [ 0, 1 ] + ├── distinct tables: [] + └── StreamExchange Hash([0]) from 1 Fragment 1 - Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 5 } + StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } + ├── left table: 3 + ├── right table: 5 + ├── left degree table: 4 + ├── right degree table: 6 + ├── StreamExchange Hash([0, 1, 2]) from 2 + └── StreamExchange Hash([0, 1, 2]) from 3 + + Fragment 2 + Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 7 } ├── Upstream └── BatchPlanNode - Fragment 2 - Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } { state table: 6 } + Fragment 3 + Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } { state table: 8 } ├── Upstream └── BatchPlanNode - Table 0 { columns: [ t1_a, t1_b, t1_c, count ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 0 + ├── columns: [ t1_a, t1_b, t1_c ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 - Table 1 { columns: [ t1_a, t1_b, t1_c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 1 + ├── columns: [ t1_a, t1_c, t1_b ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 Table 2 + ├── columns: [ t1_a, first_value(t1_b order_by(t1_b ASC)), first_value(t1_c order_by(t1_c ASC)), count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 + + Table 3 + ├── columns: [ t1_a, t1_b, t1_c ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0, 1, 2 ] + └── read pk prefix len hint: 3 + + Table 4 ├── columns: [ t1_a, t1_b, t1_c, _degree ] ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] ├── value indices: [ 3 ] ├── distribution key: [ 0, 1, 2 ] └── read pk prefix len hint: 3 - Table 3 { columns: [ t2_a, t2_b, t2_c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 5 + ├── columns: [ t2_a, t2_b, t2_c ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0, 1, 2 ] + └── read pk prefix len hint: 3 - Table 4 + Table 6 ├── columns: [ t2_a, t2_b, t2_c, _degree ] ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] ├── value indices: [ 3 ] ├── distribution key: [ 0, 1, 2 ] └── read pk prefix len hint: 3 - Table 5 + Table 7 ├── columns: [ vnode, a, t1_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -178,7 +214,7 @@ ├── read pk prefix len hint: 1 └── vnode column idx: 0 - Table 6 + Table 8 ├── columns: [ vnode, a, t2_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -186,7 +222,7 @@ ├── read pk prefix len hint: 1 └── vnode column idx: 0 - Table 4294967294 { columns: [ a, b, c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 4294967294 { columns: [ a, b, c ], primary key: [ $0 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 } - sql: | create table t1 (a int, b numeric, c bigint); diff --git a/src/frontend/planner_test/tests/testdata/output/intersect.yaml b/src/frontend/planner_test/tests/testdata/output/intersect.yaml index 06804444a4f9b..405966685afaf 100644 --- a/src/frontend/planner_test/tests/testdata/output/intersect.yaml +++ b/src/frontend/planner_test/tests/testdata/output/intersect.yaml @@ -108,69 +108,105 @@ create table t2 (a int, b numeric, c bigint, primary key(a)); select * from t1 intersect select * from t2; optimized_logical_plan_for_batch: |- - LogicalAgg { group_key: [t1.a, t1.b, t1.c], aggs: [] } + LogicalAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] } └─LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(t1.a, t2.a) AND IsNotDistinctFrom(t1.b, t2.b) AND IsNotDistinctFrom(t1.c, t2.c), output: all } ├─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.c] } └─LogicalScan { table: t2, columns: [t2.a, t2.b, t2.c] } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [] } + └─BatchHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] } └─BatchLookupJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) } └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: UpstreamHashShard(t1.a) } stream_plan: |- - StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } - └─StreamProject { exprs: [t1.a, t1.b, t1.c] } - └─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } - └─StreamHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } - ├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } - │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } - └─StreamExchange { dist: HashShard(t2.a, t2.b, t2.c) } - └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } + StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } + └─StreamProject { exprs: [t1.a, first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] } + └─StreamHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC)), count] } + └─StreamExchange { dist: HashShard(t1.a) } + └─StreamHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } + ├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } + │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } + └─StreamExchange { dist: HashShard(t2.a, t2.b, t2.c) } + └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } { materialized table: 4294967294 } - └── StreamProject { exprs: [t1.a, t1.b, t1.c] } - └── StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } { intermediate state table: 0, state tables: [], distinct tables: [] } - └── StreamHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } - ├── left table: 1 - ├── right table: 3 - ├── left degree table: 2 - ├── right degree table: 4 - ├── StreamExchange Hash([0, 1, 2]) from 1 - └── StreamExchange Hash([0, 1, 2]) from 2 + StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } { materialized table: 4294967294 } + └── StreamProject { exprs: [t1.a, first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] } + └── StreamHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC)), count] } + ├── intermediate state table: 2 + ├── state tables: [ 0, 1 ] + ├── distinct tables: [] + └── StreamExchange Hash([0]) from 1 Fragment 1 - Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 5 } + StreamHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } + ├── left table: 3 + ├── right table: 5 + ├── left degree table: 4 + ├── right degree table: 6 + ├── StreamExchange Hash([0, 1, 2]) from 2 + └── StreamExchange Hash([0, 1, 2]) from 3 + + Fragment 2 + Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 7 } ├── Upstream └── BatchPlanNode - Fragment 2 - Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } { state table: 6 } + Fragment 3 + Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } { state table: 8 } ├── Upstream └── BatchPlanNode - Table 0 { columns: [ t1_a, t1_b, t1_c, count ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 0 + ├── columns: [ t1_a, t1_b, t1_c ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 - Table 1 { columns: [ t1_a, t1_b, t1_c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 1 + ├── columns: [ t1_a, t1_c, t1_b ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 Table 2 + ├── columns: [ t1_a, first_value(t1_b order_by(t1_b ASC)), first_value(t1_c order_by(t1_c ASC)), count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 + + Table 3 + ├── columns: [ t1_a, t1_b, t1_c ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0, 1, 2 ] + └── read pk prefix len hint: 3 + + Table 4 ├── columns: [ t1_a, t1_b, t1_c, _degree ] ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] ├── value indices: [ 3 ] ├── distribution key: [ 0, 1, 2 ] └── read pk prefix len hint: 3 - Table 3 { columns: [ t2_a, t2_b, t2_c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 5 + ├── columns: [ t2_a, t2_b, t2_c ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0, 1, 2 ] + └── read pk prefix len hint: 3 - Table 4 + Table 6 ├── columns: [ t2_a, t2_b, t2_c, _degree ] ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] ├── value indices: [ 3 ] ├── distribution key: [ 0, 1, 2 ] └── read pk prefix len hint: 3 - Table 5 + Table 7 ├── columns: [ vnode, a, t1_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -178,7 +214,7 @@ ├── read pk prefix len hint: 1 └── vnode column idx: 0 - Table 6 + Table 8 ├── columns: [ vnode, a, t2_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -186,7 +222,7 @@ ├── read pk prefix len hint: 1 └── vnode column idx: 0 - Table 4294967294 { columns: [ a, b, c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 4294967294 { columns: [ a, b, c ], primary key: [ $0 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 } - sql: | create table t1 (a int, b numeric, c bigint); diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index c3bed3a331fc6..5e451505c1d2e 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -805,9 +805,9 @@ AND P.endtime = A.endtime; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchHashJoin { type: Inner, predicate: person.id = auction.seller AND $expr1 = $expr3 AND $expr2 = $expr4, output: [person.id, person.name, $expr1] } + └─BatchHashJoin { type: Inner, predicate: person.id = auction.seller AND $expr1 = $expr3 AND $expr2 = $expr4, output: [person.id, first_value(person.name order_by(person.name ASC)), $expr1] } ├─BatchExchange { order: [], dist: HashShard(person.id, $expr1, $expr2) } - │ └─BatchHashAgg { group_key: [person.id, person.name, $expr1, $expr2], aggs: [] } + │ └─BatchHashAgg { group_key: [person.id, $expr1, $expr2], aggs: [first_value(person.name order_by(person.name ASC))] } │ └─BatchProject { exprs: [person.id, person.name, $expr1, ($expr1 + '00:00:10':Interval) as $expr2] } │ └─BatchProject { exprs: [person.id, person.name, person.date_time, TumbleStart(person.date_time, '00:00:10':Interval) as $expr1] } │ └─BatchScan { table: person, columns: [person.id, person.name, person.date_time], distribution: UpstreamHashShard(person.id) } @@ -817,11 +817,11 @@ └─BatchProject { exprs: [auction.date_time, auction.seller, TumbleStart(auction.date_time, '00:00:10':Interval) as $expr3] } └─BatchScan { table: auction, columns: [auction.date_time, auction.seller], distribution: SomeShard } stream_plan: |- - StreamMaterialize { columns: [id, name, starttime, $expr2(hidden), auction.seller(hidden), $expr3(hidden), $expr4(hidden)], stream_key: [id, name, starttime, $expr2, auction.seller, $expr3, $expr4], pk_columns: [id, name, starttime, $expr2, auction.seller, $expr3, $expr4], pk_conflict: NoCheck } - └─StreamHashJoin { type: Inner, predicate: person.id = auction.seller AND $expr1 = $expr3 AND $expr2 = $expr4, output: all } + StreamMaterialize { columns: [id, name, starttime, $expr2(hidden), auction.seller(hidden), $expr3(hidden), $expr4(hidden)], stream_key: [id, starttime, $expr2, auction.seller, $expr3, $expr4], pk_columns: [id, starttime, $expr2, auction.seller, $expr3, $expr4], pk_conflict: NoCheck } + └─StreamHashJoin { type: Inner, predicate: person.id = auction.seller AND $expr1 = $expr3 AND $expr2 = $expr4, output: [person.id, first_value(person.name order_by(person.name ASC)), $expr1, $expr2, auction.seller, $expr3, $expr4] } ├─StreamExchange { dist: HashShard(person.id, $expr1, $expr2) } - │ └─StreamProject { exprs: [person.id, person.name, $expr1, $expr2] } - │ └─StreamHashAgg { group_key: [person.id, person.name, $expr1, $expr2], aggs: [count] } + │ └─StreamProject { exprs: [person.id, $expr1, $expr2, first_value(person.name order_by(person.name ASC))] } + │ └─StreamHashAgg { group_key: [person.id, $expr1, $expr2], aggs: [first_value(person.name order_by(person.name ASC)), count] } │ └─StreamProject { exprs: [person.id, person.name, $expr1, ($expr1 + '00:00:10':Interval) as $expr2] } │ └─StreamProject { exprs: [person.id, person.name, person.date_time, TumbleStart(person.date_time, '00:00:10':Interval) as $expr1] } │ └─StreamTableScan { table: person, columns: [person.id, person.name, person.date_time], pk: [person.id], dist: UpstreamHashShard(person.id) } @@ -833,47 +833,53 @@ └─StreamTableScan { table: auction, columns: [auction.date_time, auction.seller, auction.id], pk: [auction.id], dist: UpstreamHashShard(auction.id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [id, name, starttime, $expr2(hidden), auction.seller(hidden), $expr3(hidden), $expr4(hidden)], stream_key: [id, name, starttime, $expr2, auction.seller, $expr3, $expr4], pk_columns: [id, name, starttime, $expr2, auction.seller, $expr3, $expr4], pk_conflict: NoCheck } + StreamMaterialize { columns: [id, name, starttime, $expr2(hidden), auction.seller(hidden), $expr3(hidden), $expr4(hidden)], stream_key: [id, starttime, $expr2, auction.seller, $expr3, $expr4], pk_columns: [id, starttime, $expr2, auction.seller, $expr3, $expr4], pk_conflict: NoCheck } ├── materialized table: 4294967294 - └── StreamHashJoin { type: Inner, predicate: person.id = auction.seller AND $expr1 = $expr3 AND $expr2 = $expr4, output: all } { left table: 0, right table: 2, left degree table: 1, right degree table: 3 } - ├── StreamExchange Hash([0, 2, 3]) from 1 + └── StreamHashJoin { type: Inner, predicate: person.id = auction.seller AND $expr1 = $expr3 AND $expr2 = $expr4, output: [person.id, first_value(person.name order_by(person.name ASC)), $expr1, $expr2, auction.seller, $expr3, $expr4] } + ├── left table: 0 + ├── right table: 2 + ├── left degree table: 1 + ├── right degree table: 3 + ├── StreamExchange Hash([0, 1, 2]) from 1 └── StreamProject { exprs: [auction.seller, $expr3, $expr4] } - └── StreamHashAgg { group_key: [auction.seller, $expr3, $expr4], aggs: [count] } { intermediate state table: 6, state tables: [], distinct tables: [] } + └── StreamHashAgg { group_key: [auction.seller, $expr3, $expr4], aggs: [count] } { intermediate state table: 7, state tables: [], distinct tables: [] } └── StreamExchange Hash([0, 1, 2]) from 2 Fragment 1 - StreamProject { exprs: [person.id, person.name, $expr1, $expr2] } - └── StreamHashAgg { group_key: [person.id, person.name, $expr1, $expr2], aggs: [count] } { intermediate state table: 4, state tables: [], distinct tables: [] } + StreamProject { exprs: [person.id, $expr1, $expr2, first_value(person.name order_by(person.name ASC))] } + └── StreamHashAgg { group_key: [person.id, $expr1, $expr2], aggs: [first_value(person.name order_by(person.name ASC)), count] } { intermediate state table: 5, state tables: [ 4 ], distinct tables: [] } └── StreamProject { exprs: [person.id, person.name, $expr1, ($expr1 + '00:00:10':Interval) as $expr2] } └── StreamProject { exprs: [person.id, person.name, person.date_time, TumbleStart(person.date_time, '00:00:10':Interval) as $expr1] } - └── Chain { table: person, columns: [person.id, person.name, person.date_time], pk: [person.id], dist: UpstreamHashShard(person.id) } { state table: 5 } + └── Chain { table: person, columns: [person.id, person.name, person.date_time], pk: [person.id], dist: UpstreamHashShard(person.id) } { state table: 6 } ├── Upstream └── BatchPlanNode Fragment 2 StreamProject { exprs: [auction.seller, $expr3, ($expr3 + '00:00:10':Interval) as $expr4, auction.id] } └── StreamProject { exprs: [auction.date_time, auction.seller, TumbleStart(auction.date_time, '00:00:10':Interval) as $expr3, auction.id] } - └── Chain { table: auction, columns: [auction.date_time, auction.seller, auction.id], pk: [auction.id], dist: UpstreamHashShard(auction.id) } { state table: 7 } + └── Chain { table: auction, columns: [auction.date_time, auction.seller, auction.id], pk: [auction.id], dist: UpstreamHashShard(auction.id) } { state table: 8 } ├── Upstream └── BatchPlanNode - Table 0 { columns: [ person_id, person_name, $expr1, $expr2 ], primary key: [ $0 ASC, $2 ASC, $3 ASC, $1 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 2, 3 ], read pk prefix len hint: 3 } + Table 0 { columns: [ person_id, $expr1, $expr2, first_value(person_name order_by(person_name ASC)) ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } - Table 1 { columns: [ person_id, $expr1, $expr2, person_name, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 1 { columns: [ person_id, $expr1, $expr2, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } Table 2 { columns: [ auction_seller, $expr3, $expr4 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } Table 3 { columns: [ auction_seller, $expr3, $expr4, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } - Table 4 { columns: [ person_id, person_name, $expr1, $expr2, count ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4 ], distribution key: [ 0 ], read pk prefix len hint: 4 } + Table 4 { columns: [ person_id, $expr1, $expr2, person_name ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 0, 3 ], distribution key: [ 0 ], read pk prefix len hint: 3 } - Table 5 { columns: [ vnode, id, person_backfill_finished ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 5 { columns: [ person_id, $expr1, $expr2, first_value(person_name order_by(person_name ASC)), count ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3, 4 ], distribution key: [ 0 ], read pk prefix len hint: 3 } - Table 6 { columns: [ auction_seller, $expr3, $expr4, count ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } + Table 6 { columns: [ vnode, id, person_backfill_finished ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } - Table 7 { columns: [ vnode, id, auction_backfill_finished ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 7 { columns: [ auction_seller, $expr3, $expr4, count ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 } - Table 4294967294 { columns: [ id, name, starttime, $expr2, auction.seller, $expr3, $expr4 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC, $5 ASC, $6 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6 ], distribution key: [ 0, 2, 3 ], read pk prefix len hint: 7 } + Table 8 { columns: [ vnode, id, auction_backfill_finished ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4294967294 { columns: [ id, name, starttime, $expr2, auction.seller, $expr3, $expr4 ], primary key: [ $0 ASC, $2 ASC, $3 ASC, $4 ASC, $5 ASC, $6 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6 ], distribution key: [ 0, 2, 3 ], read pk prefix len hint: 6 } - id: nexmark_q9 before: @@ -1962,9 +1968,9 @@ SELECT COUNT(*) / COUNT(DISTINCT auction) FROM bid ) batch_plan: |- - BatchNestedLoopJoin { type: Inner, predicate: (count(bid.auction) >= $expr1), output: [auction.id, auction.item_name, count(bid.auction)] } + BatchNestedLoopJoin { type: Inner, predicate: (count(bid.auction) >= $expr1), output: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } ├─BatchExchange { order: [], dist: Single } - │ └─BatchHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction)] } + │ └─BatchHashAgg { group_key: [auction.id], aggs: [first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } │ └─BatchHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } │ ├─BatchExchange { order: [], dist: HashShard(auction.id) } │ │ └─BatchScan { table: auction, columns: [auction.id, auction.item_name], distribution: UpstreamHashShard(auction.id) } @@ -1978,10 +1984,10 @@ └─BatchExchange { order: [], dist: HashShard(bid.auction) } └─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard } stream_plan: |- - StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck } - └─StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: [auction.id, auction.item_name, count(bid.auction)] } - ├─StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction)] } - │ └─StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } + StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id], pk_columns: [auction_id], pk_conflict: NoCheck } + └─StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } + ├─StreamProject { exprs: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } + │ └─StreamHashAgg { group_key: [auction.id], aggs: [first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction), count] } │ └─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } │ ├─StreamExchange { dist: HashShard(auction.id) } │ │ └─StreamTableScan { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } @@ -1997,44 +2003,53 @@ └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck } + StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id], pk_columns: [auction_id], pk_conflict: NoCheck } ├── materialized table: 4294967294 - └── StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: [auction.id, auction.item_name, count(bid.auction)] } { left table: 0, right table: 1 } - ├── StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction)] } - │ └── StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } { intermediate state table: 2, state tables: [], distinct tables: [] } - │ └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } { left table: 3, right table: 5, left degree table: 4, right degree table: 6 } + └── StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } + ├── left table: 0 + ├── right table: 1 + ├── StreamProject { exprs: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } + │ └── StreamHashAgg { group_key: [auction.id], aggs: [first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction), count] } + │ ├── intermediate state table: 3 + │ ├── state tables: [ 2 ] + │ ├── distinct tables: [] + │ └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } + │ ├── left table: 4 + │ ├── right table: 6 + │ ├── left degree table: 5 + │ ├── right degree table: 7 │ ├── StreamExchange Hash([0]) from 1 │ └── StreamExchange Hash([0]) from 2 └── StreamExchange Broadcast from 3 Fragment 1 - Chain { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } { state table: 7 } + Chain { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } { state table: 8 } ├── Upstream └── BatchPlanNode Fragment 2 - Chain { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } { state table: 8 } + Chain { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } { state table: 9 } ├── Upstream └── BatchPlanNode Fragment 3 StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] } - └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } { intermediate state table: 9, state tables: [], distinct tables: [] } + └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } { intermediate state table: 10, state tables: [], distinct tables: [] } └── StreamExchange Single from 4 Fragment 4 StreamStatelessSimpleAgg { aggs: [sum0(count), count(bid.auction)] } - └── StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] } { intermediate state table: 10, state tables: [], distinct tables: [] } + └── StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] } { intermediate state table: 11, state tables: [], distinct tables: [] } └── StreamExchange Hash([0]) from 5 Fragment 5 - Chain { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } { state table: 11 } + Chain { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } { state table: 12 } ├── Upstream └── BatchPlanNode Table 0 - ├── columns: [ auction_id, auction_item_name, count(bid_auction) ] - ├── primary key: [ $2 ASC, $0 ASC, $1 ASC ] + ├── columns: [ auction_id, first_value(auction_item_name order_by(auction_item_name ASC)), count(bid_auction) ] + ├── primary key: [ $2 ASC, $0 ASC ] ├── value indices: [ 0, 1, 2 ] ├── distribution key: [ 0 ] └── read pk prefix len hint: 1 @@ -2042,23 +2057,36 @@ Table 1 { columns: [ $expr1 ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 } Table 2 - ├── columns: [ auction_id, auction_item_name, count(bid_auction), count ] - ├── primary key: [ $0 ASC, $1 ASC ] - ├── value indices: [ 2, 3 ] + ├── columns: [ auction_id, auction_item_name, bid__row_id ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] ├── distribution key: [ 0 ] - └── read pk prefix len hint: 2 + └── read pk prefix len hint: 1 - Table 3 { columns: [ auction_id, auction_item_name ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 3 + ├── columns: [ auction_id, first_value(auction_item_name order_by(auction_item_name ASC)), count(bid_auction), count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 - Table 4 { columns: [ auction_id, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 4 { columns: [ auction_id, auction_item_name ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } - Table 5 { columns: [ bid_auction, bid__row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 5 { columns: [ auction_id, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } - Table 6 { columns: [ bid_auction, bid__row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 6 { columns: [ bid_auction, bid__row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } - Table 7 { columns: [ vnode, id, auction_backfill_finished ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 7 { columns: [ bid_auction, bid__row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 } Table 8 + ├── columns: [ vnode, id, auction_backfill_finished ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 9 ├── columns: [ vnode, _row_id, bid_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -2066,11 +2094,11 @@ ├── read pk prefix len hint: 1 └── vnode column idx: 0 - Table 9 { columns: [ sum0(sum0(count)), sum0(count(bid_auction)), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 } + Table 10 { columns: [ sum0(sum0(count)), sum0(count(bid_auction)), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 } - Table 10 { columns: [ bid_auction, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 11 { columns: [ bid_auction, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } - Table 11 + Table 12 ├── columns: [ vnode, _row_id, bid_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -2080,10 +2108,10 @@ Table 4294967294 ├── columns: [ auction_id, auction_item_name, bid_count ] - ├── primary key: [ $0 ASC, $1 ASC ] + ├── primary key: [ $0 ASC ] ├── value indices: [ 0, 1, 2 ] ├── distribution key: [ 0 ] - └── read pk prefix len hint: 2 + └── read pk prefix len hint: 1 - id: nexmark_q103 before: @@ -2341,20 +2369,20 @@ BatchTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 } └─BatchExchange { order: [], dist: Single } └─BatchTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 } - └─BatchHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction)] } + └─BatchHashAgg { group_key: [auction.id], aggs: [first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } └─BatchHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } ├─BatchExchange { order: [], dist: HashShard(auction.id) } │ └─BatchScan { table: auction, columns: [auction.id, auction.item_name], distribution: UpstreamHashShard(auction.id) } └─BatchExchange { order: [], dist: HashShard(bid.auction) } └─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard } stream_plan: |- - StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [bid_count, auction_id, auction_item_name], pk_conflict: NoCheck } - └─StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction)] } + StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id], pk_columns: [bid_count, auction_id], pk_conflict: NoCheck } + └─StreamProject { exprs: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } └─StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 } └─StreamExchange { dist: Single } └─StreamGroupTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0, group_key: [$expr1] } - └─StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction), Vnode(auction.id) as $expr1] } - └─StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } + └─StreamProject { exprs: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction), Vnode(auction.id) as $expr1] } + └─StreamHashAgg { group_key: [auction.id], aggs: [first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction), count] } └─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } ├─StreamExchange { dist: HashShard(auction.id) } │ └─StreamTableScan { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } @@ -2362,60 +2390,106 @@ └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [bid_count, auction_id, auction_item_name], pk_conflict: NoCheck } + StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id], pk_columns: [bid_count, auction_id], pk_conflict: NoCheck } ├── materialized table: 4294967294 - └── StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction)] } + └── StreamProject { exprs: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction)] } └── StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 } { state table: 0 } └── StreamExchange Single from 1 Fragment 1 StreamGroupTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0, group_key: [$expr1] } { state table: 1 } - └── StreamProject { exprs: [auction.id, auction.item_name, count(bid.auction), Vnode(auction.id) as $expr1] } - └── StreamHashAgg { group_key: [auction.id, auction.item_name], aggs: [count(bid.auction), count] } { intermediate state table: 2, state tables: [], distinct tables: [] } - └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } { left table: 3, right table: 5, left degree table: 4, right degree table: 6 } + └── StreamProject { exprs: [auction.id, first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction), Vnode(auction.id) as $expr1] } + └── StreamHashAgg { group_key: [auction.id], aggs: [first_value(auction.item_name order_by(auction.item_name ASC)), count(bid.auction), count] } + ├── intermediate state table: 3 + ├── state tables: [ 2 ] + ├── distinct tables: [] + └── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all } + ├── left table: 4 + ├── right table: 6 + ├── left degree table: 5 + ├── right degree table: 7 ├── StreamExchange Hash([0]) from 2 └── StreamExchange Hash([0]) from 3 Fragment 2 - Chain { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } { state table: 7 } + Chain { table: auction, columns: [auction.id, auction.item_name], pk: [auction.id], dist: UpstreamHashShard(auction.id) } { state table: 8 } ├── Upstream └── BatchPlanNode Fragment 3 - Chain { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } { state table: 8 } + Chain { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } { state table: 9 } ├── Upstream └── BatchPlanNode Table 0 - ├── columns: [ auction_id, auction_item_name, count(bid_auction), $expr1 ] - ├── primary key: [ $2 DESC, $0 ASC, $1 ASC, $3 ASC ] + ├── columns: [ auction_id, first_value(auction_item_name order_by(auction_item_name ASC)), count(bid_auction), $expr1 ] + ├── primary key: [ $2 DESC, $0 ASC, $3 ASC ] ├── value indices: [ 0, 1, 2, 3 ] ├── distribution key: [] └── read pk prefix len hint: 0 Table 1 - ├── columns: [ auction_id, auction_item_name, count(bid_auction), $expr1 ] - ├── primary key: [ $3 ASC, $2 DESC, $0 ASC, $1 ASC ] + ├── columns: [ auction_id, first_value(auction_item_name order_by(auction_item_name ASC)), count(bid_auction), $expr1 ] + ├── primary key: [ $3 ASC, $2 DESC, $0 ASC ] ├── value indices: [ 0, 1, 2, 3 ] ├── distribution key: [ 0 ] ├── read pk prefix len hint: 1 └── vnode column idx: 3 - Table 2 { columns: [ auction_id, auction_item_name, count(bid_auction), count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 2 } + Table 2 + ├── columns: [ auction_id, auction_item_name, bid__row_id ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 - Table 3 { columns: [ auction_id, auction_item_name ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 3 + ├── columns: [ auction_id, first_value(auction_item_name order_by(auction_item_name ASC)), count(bid_auction), count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 - Table 4 { columns: [ auction_id, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 4 { columns: [ auction_id, auction_item_name ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } - Table 5 { columns: [ bid_auction, bid__row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 5 { columns: [ auction_id, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } - Table 6 { columns: [ bid_auction, bid__row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 6 + ├── columns: [ bid_auction, bid__row_id ] + ├── primary key: [ $0 ASC, $1 ASC ] + ├── value indices: [ 0, 1 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 - Table 7 { columns: [ vnode, id, auction_backfill_finished ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 7 + ├── columns: [ bid_auction, bid__row_id, _degree ] + ├── primary key: [ $0 ASC, $1 ASC ] + ├── value indices: [ 2 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 - Table 8 { columns: [ vnode, _row_id, bid_backfill_finished ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 8 + ├── columns: [ vnode, id, auction_backfill_finished ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 9 + ├── columns: [ vnode, _row_id, bid_backfill_finished ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 - Table 4294967294 { columns: [ auction_id, auction_item_name, bid_count ], primary key: [ $2 DESC, $0 ASC, $1 ASC ], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 2 } + Table 4294967294 + ├── columns: [ auction_id, auction_item_name, bid_count ] + ├── primary key: [ $2 DESC, $0 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [] + └── read pk prefix len hint: 1 - id: nexmark_q106 before: diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index 21c3cfb03847a..ca6375cfaace9 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -582,29 +582,29 @@ create table t(x int[], y int[], k int primary key); select *, (select sum(i) from (select unnest(x) i, 1 c) Q where k = c ) as sum_x from t; optimized_logical_plan_for_batch: |- - LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(t.x, t.x) AND IsNotDistinctFrom(t.k, t.k), output: [t.x, t.y, t.k, sum(Unnest($0))] } + LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(t.x, first_value(t.x order_by(t.x ASC))) AND IsNotDistinctFrom(t.k, t.k), output: [t.x, t.y, t.k, sum(Unnest($0))] } ├─LogicalScan { table: t, columns: [t.x, t.y, t.k] } - └─LogicalAgg { group_key: [t.x, t.k], aggs: [sum(Unnest($0))] } - └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(t.x, t.x) AND IsNotDistinctFrom(t.k, t.k), output: [t.x, t.k, Unnest($0)] } - ├─LogicalAgg { group_key: [t.x, t.k], aggs: [] } + └─LogicalAgg { group_key: [first_value(t.x order_by(t.x ASC)), t.k], aggs: [sum(Unnest($0))] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(first_value(t.x order_by(t.x ASC)), first_value(t.x order_by(t.x ASC))) AND IsNotDistinctFrom(t.k, t.k), output: [first_value(t.x order_by(t.x ASC)), t.k, Unnest($0)] } + ├─LogicalAgg { group_key: [t.k], aggs: [first_value(t.x order_by(t.x ASC))] } │ └─LogicalScan { table: t, columns: [t.x, t.k] } - └─LogicalProject { exprs: [t.x, t.k, Unnest($0)] } + └─LogicalProject { exprs: [first_value(t.x order_by(t.x ASC)), t.k, Unnest($0)] } └─LogicalProjectSet { select_list: [$0, $1, Unnest($0)] } - └─LogicalJoin { type: Inner, on: true, output: all } - ├─LogicalAgg { group_key: [t.x, t.k], aggs: [] } + └─LogicalJoin { type: Inner, on: true, output: [first_value(t.x order_by(t.x ASC)), t.k] } + ├─LogicalAgg { group_key: [t.k], aggs: [first_value(t.x order_by(t.x ASC))] } │ └─LogicalScan { table: t, columns: [t.x, t.k], predicate: (t.k = 1:Int32) } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } optimized_logical_plan_for_stream: |- - LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(t.x, t.x) AND IsNotDistinctFrom(t.k, t.k), output: [t.x, t.y, t.k, sum(Unnest($0))] } + LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(t.x, first_value(t.x order_by(t.x ASC))) AND IsNotDistinctFrom(t.k, t.k), output: [t.x, t.y, t.k, sum(Unnest($0))] } ├─LogicalScan { table: t, columns: [t.x, t.y, t.k] } - └─LogicalAgg { group_key: [t.x, t.k], aggs: [sum(Unnest($0))] } - └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(t.x, t.x) AND IsNotDistinctFrom(t.k, t.k), output: [t.x, t.k, Unnest($0)] } - ├─LogicalAgg { group_key: [t.x, t.k], aggs: [] } + └─LogicalAgg { group_key: [first_value(t.x order_by(t.x ASC)), t.k], aggs: [sum(Unnest($0))] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(first_value(t.x order_by(t.x ASC)), first_value(t.x order_by(t.x ASC))) AND IsNotDistinctFrom(t.k, t.k), output: [first_value(t.x order_by(t.x ASC)), t.k, Unnest($0)] } + ├─LogicalAgg { group_key: [t.k], aggs: [first_value(t.x order_by(t.x ASC))] } │ └─LogicalScan { table: t, columns: [t.x, t.k] } - └─LogicalProject { exprs: [t.x, t.k, Unnest($0)] } + └─LogicalProject { exprs: [first_value(t.x order_by(t.x ASC)), t.k, Unnest($0)] } └─LogicalProjectSet { select_list: [$0, $1, Unnest($0)] } - └─LogicalJoin { type: Inner, on: true, output: all } - ├─LogicalAgg { group_key: [t.x, t.k], aggs: [] } + └─LogicalJoin { type: Inner, on: true, output: [first_value(t.x order_by(t.x ASC)), t.k] } + ├─LogicalAgg { group_key: [t.k], aggs: [first_value(t.x order_by(t.x ASC))] } │ └─LogicalScan { table: t, columns: [t.x, t.k], predicate: (t.k = 1:Int32) } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } - name: CorrelatedInputRef in ProjectSet and apply on condition refers to table function. diff --git a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml index 797bdda0f5bf0..6d216ad9c81c4 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml @@ -466,14 +466,14 @@ └─LogicalScan { table: c, columns: [c.c1, c.c2, c.c3, c._row_id] } optimized_logical_plan_for_batch: |- LogicalAgg { aggs: [count] } - └─LogicalJoin { type: Inner, on: IsNotDistinctFrom(a.a3, a.a3) AND IsNotDistinctFrom(b.b2, b.b2), output: [] } + └─LogicalJoin { type: Inner, on: IsNotDistinctFrom(a.a3, first_value(a.a3 order_by(a.a3 ASC))) AND IsNotDistinctFrom(b.b2, b.b2), output: [] } ├─LogicalJoin { type: Inner, on: (a.a3 = b.b2), output: all } │ ├─LogicalScan { table: a, columns: [a.a3] } │ └─LogicalScan { table: b, columns: [b.b2] } └─LogicalFilter { predicate: (3:Int32 = count(1:Int32)) } - └─LogicalAgg { group_key: [a.a3, b.b2], aggs: [count(1:Int32)] } - └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a3, c.c3) AND IsNotDistinctFrom(b.b2, c.c2), output: [a.a3, b.b2, 1:Int32] } - ├─LogicalAgg { group_key: [a.a3, b.b2], aggs: [] } + └─LogicalAgg { group_key: [first_value(a.a3 order_by(a.a3 ASC)), b.b2], aggs: [count(1:Int32)] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(first_value(a.a3 order_by(a.a3 ASC)), c.c3) AND IsNotDistinctFrom(b.b2, c.c2), output: [first_value(a.a3 order_by(a.a3 ASC)), b.b2, 1:Int32] } + ├─LogicalAgg { group_key: [b.b2], aggs: [first_value(a.a3 order_by(a.a3 ASC))] } │ └─LogicalJoin { type: Inner, on: (a.a3 = b.b2), output: all } │ ├─LogicalScan { table: a, columns: [a.a3] } │ └─LogicalScan { table: b, columns: [b.b2] } diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index 4d958d21ec044..b292d229347ac 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -239,7 +239,11 @@ static PUSH_CALC_OF_JOIN: LazyLock = LazyLock::new(|| { static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Convert Distinct Aggregation", - vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)], + vec![ + UnionToDistinctRule::create(), + DistinctAggRule::create(true), + AggGroupBySimplifyRule::create(), + ], ApplyOrder::TopDown, ) }); @@ -250,6 +254,7 @@ static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock = LazyLock::n vec![ UnionToDistinctRule::create(), DistinctAggRule::create(false), + AggGroupBySimplifyRule::create(), ], ApplyOrder::TopDown, ) diff --git a/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs b/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs new file mode 100644 index 0000000000000..29471a8f64b63 --- /dev/null +++ b/src/frontend/src/optimizer/rule/agg_group_by_simplify_rule.rs @@ -0,0 +1,95 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_expr::agg::AggKind; +use crate::expr::InputRef; +use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef}; + +use super::super::plan_node::*; +use super::{BoxedRule, Rule}; +use crate::utils::{Condition, IndexSet}; + +/// Use functional dependencies to simplify aggregation's group by +/// Before: +/// group by = [a, b, c], where b -> [a, c] +/// After +/// group by b, `first_value`(a), `first_value`(c), +pub struct AggGroupBySimplifyRule {} +impl Rule for AggGroupBySimplifyRule { + fn apply(&self, plan: PlanRef) -> Option { + let agg: &LogicalAgg = plan.as_logical_agg()?; + let (agg_calls, group_key, grouping_sets, agg_input, _two_phase) = agg.clone().decompose(); + if !grouping_sets.is_empty() { + return None; + } + let functional_dependency = agg_input.functional_dependency(); + let group_key = group_key.to_vec(); + if !functional_dependency.is_key(&group_key) { + return None; + } + let minimized_group_key = functional_dependency.minimize_key(&group_key); + if minimized_group_key.len() < group_key.len() { + let new_group_key = IndexSet::from(minimized_group_key); + let new_group_key_len = new_group_key.len(); + let mut new_agg_calls = vec![]; + for &i in &group_key { + if !new_group_key.contains(i) { + let data_type = agg_input.schema().fields[i].data_type(); + new_agg_calls.push(PlanAggCall { + agg_kind: AggKind::FirstValue, + return_type: data_type.clone(), + inputs: vec![InputRef::new(i, data_type)], + distinct: false, + order_by: vec![ColumnOrder::new(i, OrderType::ascending())], + filter: Condition::true_cond(), + direct_args: vec![], + }); + } + } + new_agg_calls.extend(agg_calls); + + // Use project to align schema type + let mut out_fields = vec![]; + let mut remained_group_key_offset = 0; + let mut removed_group_key_offset = new_group_key_len; + for &i in &group_key { + if new_group_key.contains(i) { + out_fields.push(remained_group_key_offset); + remained_group_key_offset += 1; + } else { + out_fields.push(removed_group_key_offset); + removed_group_key_offset += 1; + } + } + for i in group_key.len()..agg.base.schema().len() { + out_fields.push(i); + } + let new_agg = Agg::new(new_agg_calls, new_group_key, agg.input()); + + Some(LogicalProject::with_out_col_idx( + new_agg.into(), + out_fields.into_iter(), + ).into()) + } else { + None + } + } +} + +impl AggGroupBySimplifyRule { + pub fn create() -> BoxedRule { + Box::new(AggGroupBySimplifyRule {}) + } +} diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 6542ac8e163bb..388a8ea632c89 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -144,6 +144,8 @@ mod apply_expand_transpose_rule; pub use apply_expand_transpose_rule::*; mod expand_to_project_rule; pub use expand_to_project_rule::*; +mod agg_group_by_simplify_rule; +pub use agg_group_by_simplify_rule::*; #[macro_export] macro_rules! for_all_rules { @@ -206,6 +208,7 @@ macro_rules! for_all_rules { , { ApplyOverWindowTransposeRule } , { ApplyExpandTransposeRule } , { ExpandToProjectRule } + , { AggGroupBySimplifyRule } } }; }