Skip to content

Commit

Permalink
minimize order key while making sure dist key is not pruned
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Apr 10, 2024
1 parent 73221fe commit 1ab4f59
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
create table t1 (id int primary key, i int);
select id, i from t1 order by id, i limit 2;
logical_plan: |-
LogicalTopN { order: [t1.id ASC], limit: 2, offset: 0 }
LogicalTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0 }
└─LogicalProject { exprs: [t1.id, t1.i] }
└─LogicalScan { table: t1, columns: [t1.id, t1.i] }
batch_plan: |-
BatchTopN { order: [t1.id ASC], limit: 2, offset: 0 }
BatchTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 2, offset: 0 }
└─BatchScan { table: t1, columns: [t1.id, t1.i], limit: 2, distribution: UpstreamHashShard(t1.id) }
└─BatchTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0 }
└─BatchScan { table: t1, columns: [t1.id, t1.i], distribution: UpstreamHashShard(t1.id) }
stream_plan: |-
StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.id ASC], limit: 2, offset: 0 }
└─StreamTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamGroupTopN { order: [t1.id ASC, t1.i ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
- name: test functional dependency for order key pruning (order by - suffix fd)
Expand Down Expand Up @@ -50,8 +50,9 @@
LogicalProject { exprs: [v.cnt] }
└─LogicalScan { table: v, columns: [v.cnt], cardinality: 0..=1 }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: v, columns: [v.cnt], distribution: Single }
BatchExchange { order: [v.cnt ASC], dist: Single }
└─BatchSort { order: [v.cnt ASC] }
└─BatchScan { table: v, columns: [v.cnt], distribution: Single }
stream_plan: |-
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@
create table t (a int);
select count(*) from t order by 1 limit 1;
logical_plan: |-
LogicalLimit { limit: 1, offset: 0 }
LogicalTopN { order: [count ASC], limit: 1, offset: 0 }
└─LogicalProject { exprs: [count] }
└─LogicalAgg { aggs: [count] }
└─LogicalProject { exprs: [] }
└─LogicalScan { table: t, columns: [t.a, t._row_id] }
batch_plan: |-
BatchLimit { limit: 1, offset: 0 }
BatchTopN { order: [sum0(count) ASC], limit: 1, offset: 0 }
└─BatchSimpleAgg { aggs: [sum0(count)] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [count] }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/topn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1;
SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 50, offset: 50 }
BatchExchange { order: [count DESC], dist: Single }
└─BatchTopN { order: [count DESC], limit: 50, offset: 50 }
└─BatchSimpleAgg { aggs: [internal_last_seen_value(t1_mv.sa), count] }
└─BatchScan { table: t1_mv, columns: [t1_mv.sa], distribution: Single }
- sql: |
Expand Down
17 changes: 2 additions & 15 deletions src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ pub(crate) fn derive_pk(
let mut pk = vec![];

let func_dep = input.functional_dependency();
let user_order_by = func_dep.minimize_order_key(user_order_by);
let user_order_by =
func_dep.minimize_order_key(user_order_by, input.distribution().dist_column_indices());

for order in &user_order_by.column_orders {
let idx = order.column_index;
Expand All @@ -124,19 +125,5 @@ pub(crate) fn derive_pk(
in_order.insert(idx);
}

// We need to ensure distribution key is part of pk.
// If it is not part of either stream_key or order_key,
// It must mean that it is only necessary for storage distribution.
// Such a case is rare, but it is possible,
// for example in the case of index created on singleton mv.
// In this case, we can simply append these columns to pk.
for &idx in input.distribution().dist_column_indices() {
if in_order.contains(idx) {
continue;
}
pk.push(ColumnOrder::new(idx, OrderType::ascending()));
in_order.insert(idx);
}

(pk, stream_key)
}
10 changes: 8 additions & 2 deletions src/frontend/src/optimizer/property/func_dep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ impl FunctionalDependencySet {

/// Wrapper around `Self::minimize_order_key_bitset` to minimize `order_key`.
/// View the documentation of `Self::minimize_order_key_bitset` for more information.
pub fn minimize_order_key(&self, order_key: Order) -> Order {
/// In the process of minimizing the order key,
/// we must ensure that if the indices are part of
/// distribution key, they must not be pruned.
pub fn minimize_order_key(&self, order_key: Order, dist_key_indices: &[usize]) -> Order {
let dist_key_bitset = FixedBitSet::from_iter(dist_key_indices.iter().copied());
let order_key_indices = order_key
.column_orders
.iter()
Expand All @@ -337,7 +341,9 @@ impl FunctionalDependencySet {
let order = order_key
.column_orders
.iter()
.filter(|o| min_bitset.contains(o.column_index))
.filter(|o| {
min_bitset.contains(o.column_index) || dist_key_bitset.contains(o.column_index)
})
.cloned()
.collect_vec();
Order::new(order)
Expand Down
4 changes: 0 additions & 4 deletions src/frontend/src/planner/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ impl Planner {
let order = Order {
column_orders: order,
};
// Optimize order key before using it for TopN.
let func_dep = plan.functional_dependency();
let order = func_dep.minimize_order_key(order);

if limit.is_some() || offset.is_some() {
let limit = limit.unwrap_or(LIMIT_ALL_COUNT);
let offset = offset.unwrap_or_default();
Expand Down

0 comments on commit 1ab4f59

Please sign in to comment.