Skip to content

Commit

Permalink
fix(optimizer): fix batch two phase topn (#15045) (#15055)
Browse files Browse the repository at this point in the history
Co-authored-by: Dylan <[email protected]>
  • Loading branch information
github-actions[bot] and chenzl25 authored Feb 8, 2024
1 parent 9cf1b06 commit 454bf2f
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 10 deletions.
18 changes: 18 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/topn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1;
SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50;
expected_outputs:
- batch_plan
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1;
SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50;
expected_outputs:
- batch_plan
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1 order by a desc;
SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50;
expected_outputs:
- batch_plan
28 changes: 28 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/topn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1;
SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50;
batch_plan: |-
BatchExchange { order: [count DESC], dist: Single }
└─BatchTopN { order: [count DESC], limit: 50, offset: 50 }
└─BatchSimpleAgg { aggs: [internal_last_seen_value(t1_mv.sa), count] }
└─BatchScan { table: t1_mv, columns: [t1_mv.sa], distribution: Single }
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1;
SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50;
batch_plan: |-
BatchTopN { order: [t1_mv.a DESC], limit: 50, offset: 50 }
└─BatchExchange { order: [], dist: Single }
└─BatchTopN { order: [t1_mv.a DESC], limit: 100, offset: 0 }
└─BatchScan { table: t1_mv, columns: [t1_mv.pk, t1_mv.a, t1_mv.b, t1_mv.c, t1_mv.d], distribution: SomeShard }
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1 order by a desc;
SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50;
batch_plan: |-
BatchTopN { order: [t1_mv.a DESC], limit: 50, offset: 50 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 100, offset: 0 }
└─BatchScan { table: t1_mv, columns: [t1_mv.pk, t1_mv.a, t1_mv.b, t1_mv.c, t1_mv.d], limit: 100, distribution: SomeShard }
40 changes: 30 additions & 10 deletions src/frontend/src/optimizer/plan_node/batch_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ impl BatchTopN {
self.core.limit_attr.with_ties(),
);
let new_offset = 0;
let partial_input: PlanRef = if input.order().satisfies(&self.core.order) {
let partial_input: PlanRef = if input.order().satisfies(&self.core.order)
&& !self.core.limit_attr.with_ties()
{
let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset);
let batch_partial_limit = BatchLimit::new(logical_partial_limit);
batch_partial_limit.into()
Expand All @@ -63,17 +65,23 @@ impl BatchTopN {
batch_partial_topn.into()
};

let single_dist = RequiredDist::single();
let ensure_single_dist = if !partial_input.distribution().satisfies(&single_dist) {
single_dist.enforce_if_not_satisfies(partial_input, &Order::any())?
} else {
// The input's distribution is singleton, so use one phase topn is enough.
return Ok(partial_input);
};
let ensure_single_dist =
RequiredDist::single().enforce_if_not_satisfies(partial_input, &Order::any())?;

let batch_global_topn = self.clone_with_input(ensure_single_dist);
Ok(batch_global_topn.into())
}

fn one_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
if input.order().satisfies(&self.core.order) && !self.core.limit_attr.with_ties() {
let logical_limit =
generic::Limit::new(input, self.core.limit_attr.limit(), self.core.offset);
let batch_limit = BatchLimit::new(logical_limit);
Ok(batch_limit.into())
} else {
Ok(self.clone_with_input(input).into())
}
}
}

impl_distill_by_unit!(BatchTopN, core, "BatchTopN");
Expand All @@ -94,7 +102,13 @@ impl_plan_tree_node_for_unary! {BatchTopN}

impl ToDistributedBatch for BatchTopN {
fn to_distributed(&self) -> Result<PlanRef> {
self.two_phase_topn(self.input().to_distributed()?)
let input = self.input().to_distributed()?;
let single_dist = RequiredDist::single();
if input.distribution().satisfies(&single_dist) {
self.one_phase_topn(input)
} else {
self.two_phase_topn(input)
}
}
}

Expand All @@ -112,7 +126,13 @@ impl ToBatchPb for BatchTopN {

impl ToLocalBatch for BatchTopN {
fn to_local(&self) -> Result<PlanRef> {
self.two_phase_topn(self.input().to_local()?)
let input = self.input().to_local()?;
let single_dist = RequiredDist::single();
if input.distribution().satisfies(&single_dist) {
self.one_phase_topn(input)
} else {
self.two_phase_topn(input)
}
}
}

Expand Down

0 comments on commit 454bf2f

Please sign in to comment.