Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(optimizer): fix batch two phase topn (#15045) #15055

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/topn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1;
SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50;
expected_outputs:
- batch_plan
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1;
SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50;
expected_outputs:
- batch_plan
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1 order by a desc;
SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50;
expected_outputs:
- batch_plan
28 changes: 28 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/topn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1;
SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50;
batch_plan: |-
BatchExchange { order: [count DESC], dist: Single }
└─BatchTopN { order: [count DESC], limit: 50, offset: 50 }
└─BatchSimpleAgg { aggs: [internal_last_seen_value(t1_mv.sa), count] }
└─BatchScan { table: t1_mv, columns: [t1_mv.sa], distribution: Single }
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1;
SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50;
batch_plan: |-
BatchTopN { order: [t1_mv.a DESC], limit: 50, offset: 50 }
└─BatchExchange { order: [], dist: Single }
└─BatchTopN { order: [t1_mv.a DESC], limit: 100, offset: 0 }
└─BatchScan { table: t1_mv, columns: [t1_mv.pk, t1_mv.a, t1_mv.b, t1_mv.c, t1_mv.d], distribution: SomeShard }
- sql: |
CREATE TABLE t1 (pk int, a int, b int, c bigint, d int);
CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1 order by a desc;
SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50;
batch_plan: |-
BatchTopN { order: [t1_mv.a DESC], limit: 50, offset: 50 }
└─BatchExchange { order: [], dist: Single }
└─BatchLimit { limit: 100, offset: 0 }
└─BatchScan { table: t1_mv, columns: [t1_mv.pk, t1_mv.a, t1_mv.b, t1_mv.c, t1_mv.d], limit: 100, distribution: SomeShard }
40 changes: 30 additions & 10 deletions src/frontend/src/optimizer/plan_node/batch_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ impl BatchTopN {
self.core.limit_attr.with_ties(),
);
let new_offset = 0;
let partial_input: PlanRef = if input.order().satisfies(&self.core.order) {
let partial_input: PlanRef = if input.order().satisfies(&self.core.order)
&& !self.core.limit_attr.with_ties()
{
let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset);
let batch_partial_limit = BatchLimit::new(logical_partial_limit);
batch_partial_limit.into()
Expand All @@ -63,17 +65,23 @@ impl BatchTopN {
batch_partial_topn.into()
};

let single_dist = RequiredDist::single();
let ensure_single_dist = if !partial_input.distribution().satisfies(&single_dist) {
single_dist.enforce_if_not_satisfies(partial_input, &Order::any())?
} else {
// The input's distribution is singleton, so use one phase topn is enough.
return Ok(partial_input);
};
let ensure_single_dist =
RequiredDist::single().enforce_if_not_satisfies(partial_input, &Order::any())?;

let batch_global_topn = self.clone_with_input(ensure_single_dist);
Ok(batch_global_topn.into())
}

fn one_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
if input.order().satisfies(&self.core.order) && !self.core.limit_attr.with_ties() {
let logical_limit =
generic::Limit::new(input, self.core.limit_attr.limit(), self.core.offset);
let batch_limit = BatchLimit::new(logical_limit);
Ok(batch_limit.into())
} else {
Ok(self.clone_with_input(input).into())
}
}
}

impl_distill_by_unit!(BatchTopN, core, "BatchTopN");
Expand All @@ -94,7 +102,13 @@ impl_plan_tree_node_for_unary! {BatchTopN}

impl ToDistributedBatch for BatchTopN {
fn to_distributed(&self) -> Result<PlanRef> {
self.two_phase_topn(self.input().to_distributed()?)
let input = self.input().to_distributed()?;
let single_dist = RequiredDist::single();
if input.distribution().satisfies(&single_dist) {
self.one_phase_topn(input)
} else {
self.two_phase_topn(input)
}
}
}

Expand All @@ -112,7 +126,13 @@ impl ToBatchPb for BatchTopN {

impl ToLocalBatch for BatchTopN {
fn to_local(&self) -> Result<PlanRef> {
self.two_phase_topn(self.input().to_local()?)
let input = self.input().to_local()?;
let single_dist = RequiredDist::single();
if input.distribution().satisfies(&single_dist) {
self.one_phase_topn(input)
} else {
self.two_phase_topn(input)
}
}
}

Expand Down
Loading