Skip to content

Commit

Permalink
dont prune index
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Apr 9, 2024
1 parent 986df89 commit c7112c9
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
- name: test functional dependency for order key pruning
- name: test functional dependency for order key pruning (order by)
sql: |
create table t1 (id int primary key, i int);
select id, i from t1 order by id, i limit 2;
expected_outputs:
- logical_plan
- stream_plan
- batch_plan
- batch_plan
- name: test functional dependency for order key pruning (index)
sql: |
create table t1 (v1 int, v2 int);
create materialized view v as select count(*) cnt from t1;
explain (verbose) create index mv_idx on v(cnt);
expected_outputs:
- explain_output
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: test functional dependency for order key pruning
- name: test functional dependency for order key pruning (order by)
sql: |
create table t1 (id int primary key, i int);
select id, i from t1 order by id, i limit 2;
Expand All @@ -21,3 +21,12 @@
└─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
- name: test functional dependency for order key pruning (index)
sql: |
create table t1 (v1 int, v2 int);
create materialized view v as select count(*) cnt from t1;
explain (verbose) create index mv_idx on v(cnt);
explain_output: |
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(v.cnt) }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
9 changes: 7 additions & 2 deletions src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub(crate) fn derive_pk(
input: PlanRef,
user_order_by: Order,
columns: &[ColumnCatalog],
prune_order_by: bool,
) -> (Vec<ColumnOrder>, Vec<usize>) {
// Note(congyi): avoid pk duplication
let stream_key = input
Expand All @@ -106,8 +107,12 @@ pub(crate) fn derive_pk(
let mut in_order = FixedBitSet::with_capacity(schema.len());
let mut pk = vec![];

let func_dep = input.functional_dependency();
let user_order_by = func_dep.minimize_order_key(user_order_by);
let user_order_by = if prune_order_by {
let func_dep = input.functional_dependency();
func_dep.minimize_order_key(user_order_by)
} else {
user_order_by
};

for order in &user_order_by.column_orders {
let idx = order.column_index;
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,12 @@ impl StreamMaterialize {
// No order by for create table, so stream key is identical to table pk.
(table_pk, pk_column_indices)
} else {
derive_pk(input, user_order_by, &columns)
derive_pk(
input,
user_order_by,
&columns,
table_type != TableType::Index,
)
};
// assert: `stream_key` is a subset of `table_pk`

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl StreamSink {
) -> Result<(PlanRef, SinkDesc)> {
let sink_type =
Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?;
let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
let (pk, _) = derive_pk(input.clone(), user_order_by, &columns, true);
let mut downstream_pk =
Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?;
let mut extra_partition_col_idx = None;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl StreamSubscription {
properties: WithOptions,
user_id: UserId,
) -> Result<(PlanRef, SubscriptionCatalog)> {
let (pk, _) = derive_pk(input.clone(), user_order_by, &columns);
let (pk, _) = derive_pk(input.clone(), user_order_by, &columns, true);
let required_dist = match input.distribution() {
Distribution::Single => RequiredDist::single(),
_ => {
Expand Down

0 comments on commit c7112c9

Please sign in to comment.