Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): prune order key using functional dependencies #16204

Merged
merged 16 commits into from
Apr 11, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
- name: test functional dependency for order key pruning (order by - prefix fd)
sql: |
create table t1 (id int primary key, i int);
select id, i from t1 order by id, i limit 2;
expected_outputs:
- logical_plan
- stream_plan
- batch_plan

# Order key should not be pruned for suffix functional dependency.
- name: test functional dependency for order key pruning (order by - suffix fd)
sql: |
create table t1 (id int primary key, i int);
select id, i from t1 order by i, id limit 2;
expected_outputs:
- logical_plan
- stream_plan
- batch_plan
# Order key should not be pruned for suffix functional dependency.
- name: test functional dependency for order key pruning on singleton
sql: |
create table t1 (id int primary key, i int);
create materialized view v as select count(*) cnt from t1;
select cnt from v order by cnt;
expected_outputs:
- logical_plan
- stream_plan
- batch_plan
# Order key should not be pruned for index,
# since it uses it as distribution key as well.
- name: test functional dependency for order key pruning (index)
sql: |
create table t1 (v1 int, v2 int);
create materialized view v as select count(*) cnt from t1;
explain (verbose) create index mv_idx on v(cnt);
expected_outputs:
- explain_output
- name: test functional dependency for order key pruning (index 2)
sql: |
create table t1 (v1 int primary key, v2 int);
explain (verbose) create index mv_idx on t1(v1, v2);
expected_outputs:
- explain_output
- name: test functional dependency for order key pruning (index 3)
sql: |
create table t1 (v1 int primary key, v2 int);
explain (verbose) create index mv_idx on t1(v2, v1);
expected_outputs:
- explain_output
1 change: 1 addition & 0 deletions src/frontend/planner_test/tests/testdata/input/limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
create table t (a int);
select count(*) from t order by 1 limit 1;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- sql: |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: test functional dependency for order key pruning (order by - prefix fd)
sql: |
create table t1 (id int primary key, i int);
select id, i from t1 order by id, i limit 2;
logical_plan: |-
LogicalTopN { order: [t1.id ASC], limit: 2, offset: 0 }
└─LogicalProject { exprs: [t1.id, t1.i] }
└─LogicalScan { table: t1, columns: [t1.id, t1.i] }
batch_plan: |-
BatchTopN { order: [t1.id ASC], limit: 2, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 2, offset: 0 }
└─BatchScan { table: t1, columns: [t1.id, t1.i], limit: 2, distribution: UpstreamHashShard(t1.id) }
stream_plan: |-
StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
- name: test functional dependency for order key pruning (order by - suffix fd)
sql: |
create table t1 (id int primary key, i int);
select id, i from t1 order by i, id limit 2;
logical_plan: |-
LogicalTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0 }
└─LogicalProject { exprs: [t1.id, t1.i] }
└─LogicalScan { table: t1, columns: [t1.id, t1.i] }
batch_plan: |-
BatchTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0 }
└─BatchScan { table: t1, columns: [t1.id, t1.i], distribution: UpstreamHashShard(t1.id) }
stream_plan: |-
StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [i, id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
- name: test functional dependency for order key pruning on singleton
sql: |
create table t1 (id int primary key, i int);
create materialized view v as select count(*) cnt from t1;
select cnt from v order by cnt;
logical_plan: |-
LogicalProject { exprs: [v.cnt] }
└─LogicalScan { table: v, columns: [v.cnt], cardinality: 0..=1 }
batch_plan: |-
BatchExchange { order: [v.cnt ASC], dist: Single }
└─BatchSort { order: [v.cnt ASC] }
└─BatchScan { table: v, columns: [v.cnt], distribution: Single }
stream_plan: |-
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
- name: test functional dependency for order key pruning (index)
sql: |
create table t1 (v1 int, v2 int);
create materialized view v as select count(*) cnt from t1;
explain (verbose) create index mv_idx on v(cnt);
explain_output: |
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(v.cnt) }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
- name: test functional dependency for order key pruning (index 2)
sql: |
create table t1 (v1 int primary key, v2 int);
explain (verbose) create index mv_idx on t1(v1, v2);
explain_output: |
StreamMaterialize { columns: [v1, v2], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(t1.v1) }
└─StreamTableScan { table: t1, columns: [t1.v1, t1.v2], stream_scan_type: ArrangementBackfill, stream_key: [t1.v1], pk: [v1], dist: UpstreamHashShard(t1.v1) }
- name: test functional dependency for order key pruning (index 3)
sql: |
create table t1 (v1 int primary key, v2 int);
explain (verbose) create index mv_idx on t1(v2, v1);
explain_output: |
StreamMaterialize { columns: [v2, v1], stream_key: [v1], pk_columns: [v2, v1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(t1.v2) }
└─StreamProject { exprs: [t1.v2, t1.v1] }
└─StreamTableScan { table: t1, columns: [t1.v1, t1.v2], stream_scan_type: ArrangementBackfill, stream_key: [t1.v1], pk: [v1], dist: UpstreamHashShard(t1.v1) }
10 changes: 8 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,20 @@
- sql: |
create table t (a int);
select count(*) from t order by 1 limit 1;
logical_plan: |-
LogicalLimit { limit: 1, offset: 0 }
└─LogicalProject { exprs: [count] }
└─LogicalAgg { aggs: [count] }
└─LogicalProject { exprs: [] }
└─LogicalScan { table: t, columns: [t.a, t._row_id] }
batch_plan: |-
BatchTopN { order: [sum0(count) ASC], limit: 1, offset: 0 }
BatchLimit { limit: 1, offset: 0 }
└─BatchSimpleAgg { aggs: [sum0(count)] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [count] }
└─BatchScan { table: t, columns: [], distribution: SomeShard }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why prune TopN columns can also rewrite the BatchTopN 🤔 . Regardless, here it could be rewritten to BatchLimit instead I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

stream_plan: |-
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [count], pk_conflict: NoCheck }
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamTopN { order: [sum0(count) ASC], limit: 1, offset: 0 }
└─StreamProject { exprs: [sum0(count)] }
└─StreamSimpleAgg { aggs: [sum0(count), count] }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/topn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1;
SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50;
batch_plan: |-
BatchExchange { order: [count DESC], dist: Single }
└─BatchTopN { order: [count DESC], limit: 50, offset: 50 }
BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 50, offset: 50 }
└─BatchSimpleAgg { aggs: [internal_last_seen_value(t1_mv.sa), count] }
└─BatchScan { table: t1_mv, columns: [t1_mv.sa], distribution: Single }
- sql: |
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ fn assemble_materialize(

PlanRoot::new(
logical_project,
// schema of logical_project is such that index columns come first.
// so we can use distributed_by_columns_len to represent distributed by columns indices.
RequiredDist::PhysicalDist(Distribution::HashShard(
(0..distributed_by_columns_len).collect(),
)),
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use super::PlanRef;
use crate::error::{ErrorCode, Result};
use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::property::Order;

pub(crate) fn derive_columns(
Expand Down Expand Up @@ -106,6 +107,10 @@ pub(crate) fn derive_pk(
let mut in_order = FixedBitSet::with_capacity(schema.len());
let mut pk = vec![];

let func_dep = input.functional_dependency();
let user_order_by =
func_dep.minimize_order_key(user_order_by, input.distribution().dist_column_indices());

for order in &user_order_by.column_orders {
let idx = order.column_index;
pk.push(order.clone());
Expand Down
Loading
Loading