From 802a5f80862ba85e55bb281c54dded3148468269 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sun, 21 Jul 2024 23:42:06 +0800 Subject: [PATCH] fix stream share --- src/expr/core/src/aggregate/def.rs | 1 + src/frontend/planner_test/tests/testdata/input/agg.yaml | 3 ++- src/frontend/planner_test/tests/testdata/output/agg.yaml | 7 +++++++ src/frontend/src/optimizer/plan_node/logical_agg.rs | 2 +- src/frontend/src/optimizer/plan_node/stream_share.rs | 9 +++++++++ 5 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/expr/core/src/aggregate/def.rs b/src/expr/core/src/aggregate/def.rs index b807d2b88b5e0..c7cdecd08b406 100644 --- a/src/expr/core/src/aggregate/def.rs +++ b/src/expr/core/src/aggregate/def.rs @@ -366,6 +366,7 @@ pub mod agg_kinds { | AggKind::Count | AggKind::Avg | AggKind::ApproxCountDistinct + | AggKind::ApproxPercentile | AggKind::VarPop | AggKind::VarSamp | AggKind::StddevPop diff --git a/src/frontend/planner_test/tests/testdata/input/agg.yaml b/src/frontend/planner_test/tests/testdata/input/agg.yaml index 7318e21b50eac..6ec3b6d3706d1 100644 --- a/src/frontend/planner_test/tests/testdata/input/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/input/agg.yaml @@ -1004,4 +1004,5 @@ CREATE TABLE t (v1 int); SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t; expected_outputs: - - logical_plan \ No newline at end of file + - logical_plan + - stream_plan \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index cc4cee22cb949..f4f170d0ba1bf 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -1871,3 +1871,10 @@ └─LogicalAgg { aggs: [approx_percentile($expr1 order_by(t.v1 ASC))] } └─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v1] } └─LogicalScan { table: t, columns: [t.v1, t._row_id] } + stream_plan: |- + StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + └─StreamProject { exprs: [approx_percentile($expr1 order_by(t.v1 ASC))] } + └─StreamSimpleAgg { aggs: [approx_percentile($expr1 order_by(t.v1 ASC)), count] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index ed24b88927479..178bf3cc9ba48 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -290,7 +290,7 @@ impl LogicalAgg { .into()); } - let shared_input = LogicalShare::new(stream_input).to_stream(ctx)?; + let shared_input: PlanRef = StreamShare::new_from_input(stream_input).into(); let (approx_percentile_agg_call, non_approx_percentile_agg_calls, lhs_mapping, rhs_mapping) = self.extract_approx_percentile(); let approx_percentile_agg = diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index b082d82b022d6..7e6f87fa5c271 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::RefCell; + use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::PbStreamNode; @@ -50,6 +52,13 @@ impl StreamShare { StreamShare { base, core } } + + pub fn new_from_input(input: PlanRef) -> Self { + let core = generic::Share { + input: RefCell::new(input), + }; + Self::new(core) + } } impl Distill for StreamShare {