From da15db9624b719e9cf076a11010d563531ff9673 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 7 Feb 2024 17:05:24 +0800 Subject: [PATCH] fix batch two phase topn --- .../tests/testdata/input/topn.yaml | 18 +++++++++ .../tests/testdata/output/topn.yaml | 28 +++++++++++++ .../src/optimizer/plan_node/batch_topn.rs | 40 ++++++++++++++----- 3 files changed, 76 insertions(+), 10 deletions(-) create mode 100644 src/frontend/planner_test/tests/testdata/input/topn.yaml create mode 100644 src/frontend/planner_test/tests/testdata/output/topn.yaml diff --git a/src/frontend/planner_test/tests/testdata/input/topn.yaml b/src/frontend/planner_test/tests/testdata/input/topn.yaml new file mode 100644 index 0000000000000..676ac327efc90 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/topn.yaml @@ -0,0 +1,18 @@ +- sql: | + CREATE TABLE t1 (pk int, a int, b int, c bigint, d int); + CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1; + SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50; + expected_outputs: + - batch_plan +- sql: | + CREATE TABLE t1 (pk int, a int, b int, c bigint, d int); + CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1; + SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50; + expected_outputs: + - batch_plan +- sql: | + CREATE TABLE t1 (pk int, a int, b int, c bigint, d int); + CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1 order by a desc; + SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50; + expected_outputs: + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/topn.yaml b/src/frontend/planner_test/tests/testdata/output/topn.yaml new file mode 100644 index 0000000000000..d3bc247fb11df --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/topn.yaml @@ -0,0 +1,28 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + CREATE TABLE t1 (pk int, a int, b int, c bigint, d int); + CREATE MATERIALIZED VIEW t1_mv AS SELECT SUM(a) as sa, SUM(b) as sb, SUM(c) as sc, SUM(d) as sd FROM t1; + SELECT sa, count(*) as cnt2 FROM t1_mv GROUP BY sa ORDER BY cnt2 DESC LIMIT 50 OFFSET 50; + batch_plan: |- + BatchExchange { order: [count DESC], dist: Single } + └─BatchTopN { order: [count DESC], limit: 50, offset: 50 } + └─BatchSimpleAgg { aggs: [internal_last_seen_value(t1_mv.sa), count] } + └─BatchScan { table: t1_mv, columns: [t1_mv.sa], distribution: Single } +- sql: | + CREATE TABLE t1 (pk int, a int, b int, c bigint, d int); + CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1; + SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50; + batch_plan: |- + BatchTopN { order: [t1_mv.a DESC], limit: 50, offset: 50 } + └─BatchExchange { order: [], dist: Single } + └─BatchTopN { order: [t1_mv.a DESC], limit: 100, offset: 0 } + └─BatchScan { table: t1_mv, columns: [t1_mv.pk, t1_mv.a, t1_mv.b, t1_mv.c, t1_mv.d], distribution: SomeShard } +- sql: | + CREATE TABLE t1 (pk int, a int, b int, c bigint, d int); + CREATE MATERIALIZED VIEW t1_mv AS SELECT * from t1 order by a desc; + SELECT * FROM t1_mv ORDER BY a DESC LIMIT 50 OFFSET 50; + batch_plan: |- + BatchTopN { order: [t1_mv.a DESC], limit: 50, offset: 50 } + └─BatchExchange { order: [], dist: Single } + └─BatchLimit { limit: 100, offset: 0 } + └─BatchScan { table: t1_mv, columns: [t1_mv.pk, t1_mv.a, t1_mv.b, t1_mv.c, t1_mv.d], limit: 100, distribution: SomeShard } diff --git a/src/frontend/src/optimizer/plan_node/batch_topn.rs b/src/frontend/src/optimizer/plan_node/batch_topn.rs index 3b0072821e389..d8f96668f88a7 100644 --- a/src/frontend/src/optimizer/plan_node/batch_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_topn.rs @@ -52,7 +52,9 @@ impl BatchTopN { self.core.limit_attr.with_ties(), ); let new_offset = 0; - let partial_input: PlanRef = if input.order().satisfies(&self.core.order) { + let partial_input: PlanRef = if input.order().satisfies(&self.core.order) + && !self.core.limit_attr.with_ties() + { let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset); let batch_partial_limit = BatchLimit::new(logical_partial_limit); batch_partial_limit.into() @@ -63,17 +65,23 @@ impl BatchTopN { batch_partial_topn.into() }; - let single_dist = RequiredDist::single(); - let ensure_single_dist = if !partial_input.distribution().satisfies(&single_dist) { - single_dist.enforce_if_not_satisfies(partial_input, &Order::any())? - } else { - // The input's distribution is singleton, so use one phase topn is enough. - return Ok(partial_input); - }; + let ensure_single_dist = + RequiredDist::single().enforce_if_not_satisfies(partial_input, &Order::any())?; let batch_global_topn = self.clone_with_input(ensure_single_dist); Ok(batch_global_topn.into()) } + + fn one_phase_topn(&self, input: PlanRef) -> Result { + if input.order().satisfies(&self.core.order) && !self.core.limit_attr.with_ties() { + let logical_limit = + generic::Limit::new(input, self.core.limit_attr.limit(), self.core.offset); + let batch_limit = BatchLimit::new(logical_limit); + Ok(batch_limit.into()) + } else { + Ok(self.clone_with_input(input).into()) + } + } } impl_distill_by_unit!(BatchTopN, core, "BatchTopN"); @@ -94,7 +102,13 @@ impl_plan_tree_node_for_unary! {BatchTopN} impl ToDistributedBatch for BatchTopN { fn to_distributed(&self) -> Result { - self.two_phase_topn(self.input().to_distributed()?) + let input = self.input().to_distributed()?; + let single_dist = RequiredDist::single(); + if input.distribution().satisfies(&single_dist) { + self.one_phase_topn(input) + } else { + self.two_phase_topn(input) + } } } @@ -112,7 +126,13 @@ impl ToBatchPb for BatchTopN { impl ToLocalBatch for BatchTopN { fn to_local(&self) -> Result { - self.two_phase_topn(self.input().to_local()?) + let input = self.input().to_local()?; + let single_dist = RequiredDist::single(); + if input.distribution().satisfies(&single_dist) { + self.one_phase_topn(input) + } else { + self.two_phase_topn(input) + } } }