diff --git a/src/frontend/planner_test/tests/testdata/input/functional_dependency.yaml b/src/frontend/planner_test/tests/testdata/input/functional_dependency.yaml index e7e53333ef2f4..e5813047f1e49 100644 --- a/src/frontend/planner_test/tests/testdata/input/functional_dependency.yaml +++ b/src/frontend/planner_test/tests/testdata/input/functional_dependency.yaml @@ -1,8 +1,15 @@ -- name: test functional dependency for order key pruning +- name: test functional dependency for order key pruning (order by) sql: | create table t1 (id int primary key, i int); select id, i from t1 order by id, i limit 2; expected_outputs: - logical_plan - stream_plan - - batch_plan \ No newline at end of file + - batch_plan +- name: test functional dependency for order key pruning (index) + sql: | + create table t1 (v1 int, v2 int); + create materialized view v as select count(*) cnt from t1; + explain (verbose) create index mv_idx on v(cnt); + expected_outputs: + - explain_output diff --git a/src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml b/src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml index fe6dbbcd8e608..ec76fb1fb341c 100644 --- a/src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml +++ b/src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml @@ -1,5 +1,5 @@ # This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. -- name: test functional dependency for order key pruning +- name: test functional dependency for order key pruning (order by) sql: | create table t1 (id int primary key, i int); select id, i from t1 order by id, i limit 2; @@ -21,3 +21,12 @@ └─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] } └─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] } └─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) } +- name: test functional dependency for order key pruning (index) + sql: | + create table t1 (v1 int, v2 int); + create materialized view v as select count(*) cnt from t1; + explain (verbose) create index mv_idx on v(cnt); + explain_output: | + StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(v.cnt) } + └─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single } diff --git a/src/frontend/src/optimizer/plan_node/derive.rs b/src/frontend/src/optimizer/plan_node/derive.rs index aff6bea2be6a3..0d190f27d6ad6 100644 --- a/src/frontend/src/optimizer/plan_node/derive.rs +++ b/src/frontend/src/optimizer/plan_node/derive.rs @@ -80,6 +80,7 @@ pub(crate) fn derive_pk( input: PlanRef, user_order_by: Order, columns: &[ColumnCatalog], + prune_order_by: bool, ) -> (Vec, Vec) { // Note(congyi): avoid pk duplication let stream_key = input @@ -106,8 +107,12 @@ pub(crate) fn derive_pk( let mut in_order = FixedBitSet::with_capacity(schema.len()); let mut pk = vec![]; - let func_dep = input.functional_dependency(); - let user_order_by = func_dep.minimize_order_key(user_order_by); + let user_order_by = if prune_order_by { + let func_dep = input.functional_dependency(); + func_dep.minimize_order_key(user_order_by) + } else { + user_order_by + }; for order in &user_order_by.column_orders { let idx = order.column_index; diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 792eaf4d7a068..29870c68f9cc8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -222,7 +222,7 @@ impl StreamMaterialize { // No order by for create table, so stream key is identical to table pk. (table_pk, pk_column_indices) } else { - derive_pk(input, user_order_by, &columns) + derive_pk(input, user_order_by, &columns, table_type != TableType::Index) }; // assert: `stream_key` is a subset of `table_pk` diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 7a724a20be46a..1e1679cc2f682 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -312,7 +312,7 @@ impl StreamSink { ) -> Result<(PlanRef, SinkDesc)> { let sink_type = Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; - let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); + let (pk, _) = derive_pk(input.clone(), user_order_by, &columns, true); let mut downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; let mut extra_partition_col_idx = None; diff --git a/src/frontend/src/optimizer/plan_node/stream_subscription.rs b/src/frontend/src/optimizer/plan_node/stream_subscription.rs index e9d8936961586..bf4a05b9e93d0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_subscription.rs +++ b/src/frontend/src/optimizer/plan_node/stream_subscription.rs @@ -110,7 +110,7 @@ impl StreamSubscription { properties: WithOptions, user_id: UserId, ) -> Result<(PlanRef, SubscriptionCatalog)> { - let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); + let (pk, _) = derive_pk(input.clone(), user_order_by, &columns, true); let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => {