From 1ab4f59917883b2afd5df91aede7de7f51128703 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 10 Apr 2024 22:49:59 +0800 Subject: [PATCH] minimize order key while making sure dist key is not pruned --- .../testdata/output/functional_dependency.yaml | 17 +++++++++-------- .../tests/testdata/output/limit.yaml | 4 ++-- .../tests/testdata/output/topn.yaml | 4 ++-- src/frontend/src/optimizer/plan_node/derive.rs | 17 ++--------------- src/frontend/src/optimizer/property/func_dep.rs | 10 ++++++++-- src/frontend/src/planner/query.rs | 4 ---- 6 files changed, 23 insertions(+), 33 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml b/src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml index bb3234cfcf1dd..4a36aaa26dcc3 100644 --- a/src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml +++ b/src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml @@ -4,20 +4,20 @@ create table t1 (id int primary key, i int); select id, i from t1 order by id, i limit 2; logical_plan: |- - LogicalTopN { order: [t1.id ASC], limit: 2, offset: 0 } + LogicalTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0 } └─LogicalProject { exprs: [t1.id, t1.i] } └─LogicalScan { table: t1, columns: [t1.id, t1.i] } batch_plan: |- - BatchTopN { order: [t1.id ASC], limit: 2, offset: 0 } + BatchTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0 } └─BatchExchange { order: [], dist: Single } - └─BatchLimit { limit: 2, offset: 0 } - └─BatchScan { table: t1, columns: [t1.id, t1.i], limit: 2, distribution: UpstreamHashShard(t1.id) } + └─BatchTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0 } + └─BatchScan { table: t1, columns: [t1.id, t1.i], distribution: UpstreamHashShard(t1.id) } stream_plan: |- StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [id], pk_conflict: NoCheck } └─StreamProject { exprs: [t1.id, t1.i] } - └─StreamTopN { order: [t1.id ASC], limit: 2, offset: 0 } + └─StreamTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0 } └─StreamExchange { dist: Single } - └─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] } + └─StreamGroupTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0, group_key: [_vnode] } └─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] } └─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) } - name: test functional dependency for order key pruning (order by - suffix fd) @@ -50,8 +50,9 @@ LogicalProject { exprs: [v.cnt] } └─LogicalScan { table: v, columns: [v.cnt], cardinality: 0..=1 } batch_plan: |- - BatchExchange { order: [], dist: Single } - └─BatchScan { table: v, columns: [v.cnt], distribution: Single } + BatchExchange { order: [v.cnt ASC], dist: Single } + └─BatchSort { order: [v.cnt ASC] } + └─BatchScan { table: v, columns: [v.cnt], distribution: Single } stream_plan: |- StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single } diff --git a/src/frontend/planner_test/tests/testdata/output/limit.yaml b/src/frontend/planner_test/tests/testdata/output/limit.yaml index 22fb2add9d30c..d5bf755b96c5b 100644 --- a/src/frontend/planner_test/tests/testdata/output/limit.yaml +++ b/src/frontend/planner_test/tests/testdata/output/limit.yaml @@ -140,13 +140,13 @@ create table t (a int); select count(*) from t order by 1 limit 1; logical_plan: |- - LogicalLimit { limit: 1, offset: 0 } + LogicalTopN { order: [count ASC], limit: 1, offset: 0 } └─LogicalProject { exprs: [count] } └─LogicalAgg { aggs: [count] } └─LogicalProject { exprs: [] } └─LogicalScan { table: t, columns: [t.a, t._row_id] } batch_plan: |- - BatchLimit { limit: 1, offset: 0 } + BatchTopN { order: [sum0(count) ASC], limit: 1, offset: 0 } └─BatchSimpleAgg { aggs: [sum0(count)] } └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [count] } diff --git a/src/frontend/planner_test/tests/testdata/output/topn.yaml b/src/frontend/planner_test/tests/testdata/output/topn.yaml index 52e15c35a8f7f..d3bc247fb11df 100644 --- a/src/frontend/planner_test/tests/testdata/output/topn.yaml +++ b/src/frontend/planner_test/tests/testdata/output/topn.yaml @@ -4,8 +4,8 @@ CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1; SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50; batch_plan: |- - BatchExchange { order: [], dist: Single } - └─BatchLimit { limit: 50, offset: 50 } + BatchExchange { order: [count DESC], dist: Single } + └─BatchTopN { order: [count DESC], limit: 50, offset: 50 } └─BatchSimpleAgg { aggs: [internal_last_seen_value(t1_mv.sa), count] } └─BatchScan { table: t1_mv, columns: [t1_mv.sa], distribution: Single } - sql: | diff --git a/src/frontend/src/optimizer/plan_node/derive.rs b/src/frontend/src/optimizer/plan_node/derive.rs index 07096782c53f3..8f1fff48d60c2 100644 --- a/src/frontend/src/optimizer/plan_node/derive.rs +++ b/src/frontend/src/optimizer/plan_node/derive.rs @@ -108,7 +108,8 @@ pub(crate) fn derive_pk( let mut pk = vec![]; let func_dep = input.functional_dependency(); - let user_order_by = func_dep.minimize_order_key(user_order_by); + let user_order_by = + func_dep.minimize_order_key(user_order_by, input.distribution().dist_column_indices()); for order in &user_order_by.column_orders { let idx = order.column_index; @@ -124,19 +125,5 @@ pub(crate) fn derive_pk( in_order.insert(idx); } - // We need to ensure distribution key is part of pk. - // If it is not part of either stream_key or order_key, - // It must mean that it is only necessary for storage distribution. - // Such a case is rare, but it is possible, - // for example in the case of index created on singleton mv. - // In this case, we can simply append these columns to pk. - for &idx in input.distribution().dist_column_indices() { - if in_order.contains(idx) { - continue; - } - pk.push(ColumnOrder::new(idx, OrderType::ascending())); - in_order.insert(idx); - } - (pk, stream_key) } diff --git a/src/frontend/src/optimizer/property/func_dep.rs b/src/frontend/src/optimizer/property/func_dep.rs index ba4b389a0257a..edbbc0cbf751c 100644 --- a/src/frontend/src/optimizer/property/func_dep.rs +++ b/src/frontend/src/optimizer/property/func_dep.rs @@ -327,7 +327,11 @@ impl FunctionalDependencySet { /// Wrapper around `Self::minimize_order_key_bitset` to minimize `order_key`. /// View the documentation of `Self::minimize_order_key_bitset` for more information. - pub fn minimize_order_key(&self, order_key: Order) -> Order { + /// In the process of minimizing the order key, + /// we must ensure that if the indices are part of + /// distribution key, they must not be pruned. + pub fn minimize_order_key(&self, order_key: Order, dist_key_indices: &[usize]) -> Order { + let dist_key_bitset = FixedBitSet::from_iter(dist_key_indices.iter().copied()); let order_key_indices = order_key .column_orders .iter() @@ -337,7 +341,9 @@ impl FunctionalDependencySet { let order = order_key .column_orders .iter() - .filter(|o| min_bitset.contains(o.column_index)) + .filter(|o| { + min_bitset.contains(o.column_index) || dist_key_bitset.contains(o.column_index) + }) .cloned() .collect_vec(); Order::new(order) diff --git a/src/frontend/src/planner/query.rs b/src/frontend/src/planner/query.rs index 96b5ccece3c92..8be6a41178056 100644 --- a/src/frontend/src/planner/query.rs +++ b/src/frontend/src/planner/query.rs @@ -41,10 +41,6 @@ impl Planner { let order = Order { column_orders: order, }; - // Optimize order key before using it for TopN. - let func_dep = plan.functional_dependency(); - let order = func_dep.minimize_order_key(order); - if limit.is_some() || offset.is_some() { let limit = limit.unwrap_or(LIMIT_ALL_COUNT); let offset = offset.unwrap_or_default();