From c9a8d63f3c1412008a420542a3f432013369e117 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 24 Apr 2024 21:48:36 -0400 Subject: [PATCH 01/13] add generic nodes and logical nodes --- .../optimizer/plan_node/generic/cte_ref.rs | 96 +++++++++++++++ .../src/optimizer/plan_node/generic/mod.rs | 4 + .../plan_node/generic/recursive_union.rs | 67 +++++++++++ .../optimizer/plan_node/logical_cte_ref.rs | 91 +++++++++++++++ .../plan_node/logical_recursive_union.rs | 109 ++++++++++++++++++ src/frontend/src/optimizer/plan_node/mod.rs | 8 ++ 6 files changed, 375 insertions(+) create mode 100644 src/frontend/src/optimizer/plan_node/generic/cte_ref.rs create mode 100644 src/frontend/src/optimizer/plan_node/generic/recursive_union.rs create mode 100644 src/frontend/src/optimizer/plan_node/logical_cte_ref.rs create mode 100644 src/frontend/src/optimizer/plan_node/logical_recursive_union.rs diff --git a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs new file mode 100644 index 000000000000..527a978749ca --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs @@ -0,0 +1,96 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cell::RefCell; +use std::hash::Hash; +use std::rc::Weak; + +use pretty_xmlish::StrAssocArr; +use risingwave_common::catalog::Schema; + +use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; +use crate::binder::ShareId; +use crate::optimizer::property::FunctionalDependencySet; +use crate::OptimizerContextRef; + +#[derive(Clone, Debug)] +pub struct CteRef { + share_id: ShareId, + pub r#ref: Weak, + base: PlanRef, + derived_stream_key: RefCell>>>, + deriving: RefCell, +} + +impl PartialEq for CteRef { + fn eq(&self, other: &Self) -> bool { + self.share_id == other.share_id + } +} + +impl Eq for CteRef {} + +impl Hash for CteRef { + fn hash(&self, state: &mut H) { + self.share_id.hash(state); + } +} + +impl CteRef { + pub fn new(share_id: ShareId, r#ref: Weak, base: PlanRef) -> Self { + Self { + share_id, + r#ref, + base, + derived_stream_key: RefCell::new(None), + deriving: RefCell::new(false), + } + } +} + +impl GenericPlanNode for CteRef { + fn schema(&self) -> Schema { + self.base.schema().clone() + } + + fn stream_key(&self) -> Option> { + if let Some(derived_stream_key) = self.derived_stream_key.borrow().as_ref() { + return derived_stream_key.clone(); + } + if *self.deriving.borrow() { + return self.base.stream_key().map(Into::into); + } + *self.deriving.borrow_mut() = true; + let derived_stream_key = self.r#ref.upgrade().unwrap().stream_key().map(Into::into); + *self.deriving.borrow_mut() = false; + *self.derived_stream_key.borrow_mut() = Some(derived_stream_key.clone()); + derived_stream_key + } + + fn ctx(&self) -> OptimizerContextRef { + self.r#ref.upgrade().unwrap().ctx() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + todo!() + } +} + +impl CteRef { + pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> { + vec![] + } +} + +impl_distill_unit_from_fields! {CteRef, GenericPlanRef} \ No newline at end of file diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 5154c84017b8..6f0d5db773f3 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -72,6 +72,10 @@ mod limit; pub use limit::*; mod max_one_row; pub use max_one_row::*; +mod cte_ref; +pub use cte_ref::*; +mod recursive_union; +pub use recursive_union::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; diff --git a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs new file mode 100644 index 000000000000..b5c3521cf5dc --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs @@ -0,0 +1,67 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pretty_xmlish::StrAssocArr; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::DataType; + +use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; +use crate::optimizer::plan_node::ColPrunable; +use crate::optimizer::property::FunctionalDependencySet; +use crate::OptimizerContextRef; + +/// `Union` returns the union of the rows of its inputs. +/// If `all` is false, it needs to eliminate duplicates. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RecursiveUnion { + pub base: PlanRef, + pub recursive: PlanRef, +} + +impl GenericPlanNode for RecursiveUnion { + fn functional_dependency(&self) -> FunctionalDependencySet { + todo!() + } + + fn schema(&self) -> Schema { + let mut base = self.base.schema().clone(); + let iter_field = Field::with_name(DataType::Int16, "$iter"); + base.fields.push(iter_field); + base + } + + fn stream_key(&self) -> Option> { + let fields_len = self.base.schema().len(); + let base = self.base.stream_key(); + if let Some(base) = base { + let mut base = base.to_vec(); + base.push(fields_len); + Some(base) + } else { + None + } + } + + fn ctx(&self) -> OptimizerContextRef { + self.base.ctx() + } +} + +impl RecursiveUnion { + pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> { + vec![] + } +} + +impl_distill_unit_from_fields!(RecursiveUnion, GenericPlanRef); \ No newline at end of file diff --git a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs new file mode 100644 index 000000000000..33acf21484d9 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs @@ -0,0 +1,91 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::rc::Weak; + +use risingwave_common::bail_not_implemented; +use risingwave_common::util::column_index_mapping::ColIndexMapping; + +use super::expr_visitable::ExprVisitable; +use super::utils::impl_distill_by_unit; +use super::{ + gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, + LogicalProject, PlanBase, PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, + ToBatch, ToStream, ToStreamContext, +}; +use crate::binder::ShareId; +use crate::error::Result; +use crate::utils::Condition; +use crate::PlanRef; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalCteRef { + pub base: PlanBase, + core: generic::CteRef, +} + +impl LogicalCteRef { + pub fn new(share_id: ShareId, base_plan: PlanRef, r#ref: Weak) -> Self { + let core = generic::CteRef::new(share_id, r#ref, base_plan); + let base = PlanBase::new_logical_with_core(&core); + Self { base, core } + } + + pub fn create(share_id: ShareId, base_plan: PlanRef, r#ref: Weak) -> PlanRef { + Self::new(share_id, base_plan, r#ref).into() + } +} + +impl_plan_tree_node_for_leaf! {LogicalCteRef} + +impl_distill_by_unit! {LogicalCteRef, core, "LogicalCteRef"} + +impl ExprRewritable for LogicalCteRef {} + +impl ExprVisitable for LogicalCteRef {} + +impl ColPrunable for LogicalCteRef { + fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { + LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().copied()).into() + } +} + +impl PredicatePushdown for LogicalCteRef { + fn predicate_pushdown( + &self, + predicate: Condition, + ctx: &mut PredicatePushdownContext, + ) -> PlanRef { + todo!() + } +} + +impl ToBatch for LogicalCteRef { + fn to_batch(&self) -> Result { + bail_not_implemented!(issue = 15135, "recursive CTE not supported") + } +} + +impl ToStream for LogicalCteRef { + fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + bail_not_implemented!(issue = 15135, "recursive CTE not supported") + } + + fn logical_rewrite_for_stream( + &self, + ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + bail_not_implemented!(issue = 15135, "recursive CTE not supported") + } +} \ No newline at end of file diff --git a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs new file mode 100644 index 000000000000..afa8c010a164 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs @@ -0,0 +1,109 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::bail_not_implemented; +use risingwave_common::util::column_index_mapping::ColIndexMapping; +use smallvec::{smallvec, SmallVec}; + +use super::expr_visitable::ExprVisitable; +use super::utils::impl_distill_by_unit; +use super::{ + gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, + PlanBase, PlanTreeNode, PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, + ToBatch, ToStream, ToStreamContext, +}; +use crate::error::Result; +use crate::utils::Condition; +use crate::PlanRef; + +/// `LogicalUnion` returns the union of the rows of its inputs. +/// If `all` is false, it needs to eliminate duplicates. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalRecursiveUnion { + pub base: PlanBase, + core: generic::RecursiveUnion, +} + +impl LogicalRecursiveUnion { + pub fn new(base_plan: PlanRef, recursive: PlanRef) -> Self { + let core = generic::RecursiveUnion { + base: base_plan, + recursive, + }; + let base = PlanBase::new_logical_with_core(&core); + LogicalRecursiveUnion { base, core } + } + + pub fn create(base_plan: PlanRef, recursive: PlanRef) -> PlanRef { + Self::new(base_plan, recursive).into() + } +} + +impl PlanTreeNode for LogicalRecursiveUnion { + fn inputs(&self) -> SmallVec<[PlanRef; 2]> { + smallvec![self.core.base.clone(), self.core.recursive.clone()] + } + + fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef { + let mut inputs = inputs.into_iter().cloned(); + Self::create(inputs.next().unwrap(), inputs.next().unwrap()) + } +} + +impl_distill_by_unit!(LogicalRecursiveUnion, core, "LogicalRecursiveUnion"); + +impl ColPrunable for LogicalRecursiveUnion { + fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { + let new_inputs = self + .inputs() + .iter() + .map(|input| input.prune_col(required_cols, ctx)) + .collect_vec(); + self.clone_with_inputs(&new_inputs) + } +} + +impl ExprRewritable for LogicalRecursiveUnion {} + +impl ExprVisitable for LogicalRecursiveUnion {} + +impl PredicatePushdown for LogicalRecursiveUnion { + fn predicate_pushdown( + &self, + predicate: Condition, + ctx: &mut PredicatePushdownContext, + ) -> PlanRef { + todo!() + } +} + +impl ToBatch for LogicalRecursiveUnion { + fn to_batch(&self) -> Result { + bail_not_implemented!(issue = 15135, "recursive CTE not supported") + } +} + +impl ToStream for LogicalRecursiveUnion { + fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + bail_not_implemented!(issue = 15135, "recursive CTE not supported") + } + + fn logical_rewrite_for_stream( + &self, + ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + bail_not_implemented!(issue = 15135, "recursive CTE not supported") + } +} \ No newline at end of file diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index e7ad78f373ba..2e0a99486ecd 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -873,6 +873,8 @@ mod logical_topn; mod logical_union; mod logical_update; mod logical_values; +mod logical_recursive_union; +mod logical_cte_ref; mod stream_dedup; mod stream_delta_join; mod stream_dml; @@ -973,6 +975,8 @@ pub use logical_topn::LogicalTopN; pub use logical_union::LogicalUnion; pub use logical_update::LogicalUpdate; pub use logical_values::LogicalValues; +pub use logical_recursive_union::LogicalRecursiveUnion; +pub use logical_cte_ref::LogicalCteRef; pub use stream_cdc_table_scan::StreamCdcTableScan; pub use stream_dedup::StreamDedup; pub use stream_delta_join::StreamDeltaJoin; @@ -1063,6 +1067,8 @@ macro_rules! for_all_plan_nodes { , { Logical, MaxOneRow } , { Logical, KafkaScan } , { Logical, IcebergScan } + , { Logical, RecursiveUnion } + , { Logical, CteRef } , { Batch, SimpleAgg } , { Batch, HashAgg } , { Batch, SortAgg } @@ -1165,6 +1171,8 @@ macro_rules! for_logical_plan_nodes { , { Logical, MaxOneRow } , { Logical, KafkaScan } , { Logical, IcebergScan } + , { Logical, RecursiveUnion } + , { Logical, CteRef } } }; } From 7a435271b096672d55fadcfa6e3eb5ce63c58ba0 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 24 Apr 2024 22:24:33 -0400 Subject: [PATCH 02/13] add planner related functions; update fmt --- src/frontend/src/binder/mod.rs | 5 ++- src/frontend/src/binder/relation/mod.rs | 2 +- .../optimizer/plan_node/generic/cte_ref.rs | 2 +- .../plan_node/generic/recursive_union.rs | 3 +- .../optimizer/plan_node/logical_cte_ref.rs | 14 +++---- .../plan_node/logical_recursive_union.rs | 12 +++--- src/frontend/src/optimizer/plan_node/mod.rs | 8 ++-- src/frontend/src/planner/mod.rs | 1 + src/frontend/src/planner/recursive_union.rs | 30 ++++++++++++++ src/frontend/src/planner/relation.rs | 39 ++++++++++++------- 10 files changed, 78 insertions(+), 38 deletions(-) create mode 100644 src/frontend/src/planner/recursive_union.rs diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 3648f53d5027..37cb369c99b7 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -49,8 +49,9 @@ pub use insert::BoundInsert; use pgwire::pg_server::{Session, SessionId}; pub use query::BoundQuery; pub use relation::{ - BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, BoundWatermark, - BoundWindowTableFunction, Relation, ResolveQualifiedNameError, WindowTableFunctionKind, + BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, + BoundWatermark, BoundWindowTableFunction, Relation, ResolveQualifiedNameError, + WindowTableFunctionKind, }; pub use select::{BoundDistinct, BoundSelect}; pub use set_expr::*; diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index b7c288551191..0843989aff2a 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -26,7 +26,6 @@ use risingwave_sqlparser::ast::{ use thiserror::Error; use thiserror_ext::AsReport; -use self::cte_ref::BoundBackCteRef; use super::bind_context::ColumnBinding; use super::statement::RewriteExprsRecursive; use crate::binder::bind_context::{BindingCte, BindingCteState}; @@ -43,6 +42,7 @@ mod table_or_source; mod watermark; mod window_table_function; +pub use cte_ref::BoundBackCteRef; pub use join::BoundJoin; pub use share::BoundShare; pub use subquery::BoundSubquery; diff --git a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs index 527a978749ca..5dbbf02f741d 100644 --- a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs @@ -93,4 +93,4 @@ impl CteRef { } } -impl_distill_unit_from_fields! {CteRef, GenericPlanRef} \ No newline at end of file +impl_distill_unit_from_fields! {CteRef, GenericPlanRef} diff --git a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs index b5c3521cf5dc..1f1bfb4acf46 100644 --- a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs @@ -17,7 +17,6 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; -use crate::optimizer::plan_node::ColPrunable; use crate::optimizer::property::FunctionalDependencySet; use crate::OptimizerContextRef; @@ -64,4 +63,4 @@ impl RecursiveUnion { } } -impl_distill_unit_from_fields!(RecursiveUnion, GenericPlanRef); \ No newline at end of file +impl_distill_unit_from_fields!(RecursiveUnion, GenericPlanRef); diff --git a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs index 33acf21484d9..5e8fefb84c6d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs @@ -20,7 +20,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use super::expr_visitable::ExprVisitable; use super::utils::impl_distill_by_unit; use super::{ - gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, + generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalProject, PlanBase, PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, ToBatch, ToStream, ToStreamContext, }; @@ -56,7 +56,7 @@ impl ExprRewritable for LogicalCteRef {} impl ExprVisitable for LogicalCteRef {} impl ColPrunable for LogicalCteRef { - fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { + fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().copied()).into() } } @@ -64,8 +64,8 @@ impl ColPrunable for LogicalCteRef { impl PredicatePushdown for LogicalCteRef { fn predicate_pushdown( &self, - predicate: Condition, - ctx: &mut PredicatePushdownContext, + _predicate: Condition, + _ctx: &mut PredicatePushdownContext, ) -> PlanRef { todo!() } @@ -78,14 +78,14 @@ impl ToBatch for LogicalCteRef { } impl ToStream for LogicalCteRef { - fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { bail_not_implemented!(issue = 15135, "recursive CTE not supported") } fn logical_rewrite_for_stream( &self, - ctx: &mut RewriteStreamContext, + _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { bail_not_implemented!(issue = 15135, "recursive CTE not supported") } -} \ No newline at end of file +} diff --git a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs index afa8c010a164..100c45ead144 100644 --- a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs @@ -20,7 +20,7 @@ use smallvec::{smallvec, SmallVec}; use super::expr_visitable::ExprVisitable; use super::utils::impl_distill_by_unit; use super::{ - gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, + generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, PlanBase, PlanTreeNode, PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, ToBatch, ToStream, ToStreamContext, }; @@ -82,8 +82,8 @@ impl ExprVisitable for LogicalRecursiveUnion {} impl PredicatePushdown for LogicalRecursiveUnion { fn predicate_pushdown( &self, - predicate: Condition, - ctx: &mut PredicatePushdownContext, + _predicate: Condition, + _ctx: &mut PredicatePushdownContext, ) -> PlanRef { todo!() } @@ -96,14 +96,14 @@ impl ToBatch for LogicalRecursiveUnion { } impl ToStream for LogicalRecursiveUnion { - fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { bail_not_implemented!(issue = 15135, "recursive CTE not supported") } fn logical_rewrite_for_stream( &self, - ctx: &mut RewriteStreamContext, + _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { bail_not_implemented!(issue = 15135, "recursive CTE not supported") } -} \ No newline at end of file +} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 2e0a99486ecd..b91a8235c606 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -847,6 +847,7 @@ mod batch_values; mod logical_agg; mod logical_apply; mod logical_cdc_scan; +mod logical_cte_ref; mod logical_dedup; mod logical_delete; mod logical_except; @@ -864,6 +865,7 @@ mod logical_now; mod logical_over_window; mod logical_project; mod logical_project_set; +mod logical_recursive_union; mod logical_scan; mod logical_share; mod logical_source; @@ -873,8 +875,6 @@ mod logical_topn; mod logical_union; mod logical_update; mod logical_values; -mod logical_recursive_union; -mod logical_cte_ref; mod stream_dedup; mod stream_delta_join; mod stream_dml; @@ -948,6 +948,7 @@ pub use batch_values::BatchValues; pub use logical_agg::LogicalAgg; pub use logical_apply::LogicalApply; pub use logical_cdc_scan::LogicalCdcScan; +pub use logical_cte_ref::LogicalCteRef; pub use logical_dedup::LogicalDedup; pub use logical_delete::LogicalDelete; pub use logical_except::LogicalExcept; @@ -966,6 +967,7 @@ pub use logical_now::LogicalNow; pub use logical_over_window::LogicalOverWindow; pub use logical_project::LogicalProject; pub use logical_project_set::LogicalProjectSet; +pub use logical_recursive_union::LogicalRecursiveUnion; pub use logical_scan::LogicalScan; pub use logical_share::LogicalShare; pub use logical_source::LogicalSource; @@ -975,8 +977,6 @@ pub use logical_topn::LogicalTopN; pub use logical_union::LogicalUnion; pub use logical_update::LogicalUpdate; pub use logical_values::LogicalValues; -pub use logical_recursive_union::LogicalRecursiveUnion; -pub use logical_cte_ref::LogicalCteRef; pub use stream_cdc_table_scan::StreamCdcTableScan; pub use stream_dedup::StreamDedup; pub use stream_delta_join::StreamDeltaJoin; diff --git a/src/frontend/src/planner/mod.rs b/src/frontend/src/planner/mod.rs index d2f695faa5ab..12bf25c0372d 100644 --- a/src/frontend/src/planner/mod.rs +++ b/src/frontend/src/planner/mod.rs @@ -21,6 +21,7 @@ use crate::optimizer::{OptimizerContextRef, PlanRoot}; mod delete; mod insert; mod query; +mod recursive_union; mod relation; mod select; mod set_expr; diff --git a/src/frontend/src/planner/recursive_union.rs b/src/frontend/src/planner/recursive_union.rs new file mode 100644 index 000000000000..bc6cec3a6013 --- /dev/null +++ b/src/frontend/src/planner/recursive_union.rs @@ -0,0 +1,30 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::binder::BoundSetExpr; +use crate::error::Result; +use crate::optimizer::plan_node::LogicalRecursiveUnion; +use crate::{PlanRef, Planner}; + +impl Planner { + pub(super) fn plan_recursive_union( + &mut self, + base: BoundSetExpr, + recursive: BoundSetExpr, + ) -> Result { + let base = self.plan_set_expr(base, vec![], &[])?; + let recursive = self.plan_set_expr(recursive, vec![], &[])?; + Ok(LogicalRecursiveUnion::create(base, recursive)) + } +} diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index d593a7308f39..bf6173962b9e 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -22,8 +22,8 @@ use risingwave_common::types::{DataType, Interval, ScalarImpl}; use risingwave_sqlparser::ast::AsOf; use crate::binder::{ - BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, BoundWatermark, - BoundWindowTableFunction, Relation, WindowTableFunctionKind, + BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, + BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind, }; use crate::error::{ErrorCode, Result}; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; @@ -218,20 +218,24 @@ impl Planner { } pub(super) fn plan_share(&mut self, share: BoundShare) -> Result { - let Either::Left(nonrecursive_query) = share.input else { - bail_not_implemented!(issue = 15135, "recursive CTE is not supported"); - }; - let id = share.share_id; - match self.share_cache.get(&id) { - None => { - let result = self - .plan_query(nonrecursive_query)? - .into_unordered_subplan(); - let logical_share = LogicalShare::create(result); - self.share_cache.insert(id, logical_share.clone()); - Ok(logical_share) + match share.input { + Either::Left(nonrecursive_query) => { + let id = share.share_id; + match self.share_cache.get(&id) { + None => { + let result = self + .plan_query(nonrecursive_query)? + .into_unordered_subplan(); + let logical_share = LogicalShare::create(result); + self.share_cache.insert(id, logical_share.clone()); + Ok(logical_share) + } + Some(result) => Ok(result.clone()), + } + } + Either::Right(recursive_union) => { + self.plan_recursive_union(*recursive_union.base, *recursive_union.recursive) } - Some(result) => Ok(result.clone()), } } @@ -239,6 +243,11 @@ impl Planner { todo!("plan watermark"); } + #[allow(dead_code)] + pub(super) fn plan_cte_ref(&mut self, _cte_ref: BoundBackCteRef) -> Result { + todo!("plan cte ref"); + } + fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result> { let col_data_types = match relation { Relation::Source(s) => s From 52148e8d8300d49e331519b4ff22c1f7dddfaa82 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 24 Apr 2024 22:46:08 -0400 Subject: [PATCH 03/13] update comment; tiny refactor --- src/frontend/src/planner/relation.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index bf6173962b9e..0b66e4e743a0 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -57,9 +57,7 @@ impl Planner { Relation::Watermark(tf) => self.plan_watermark(*tf), // note that rcte (i.e., RecursiveUnion) is included *implicitly* in share. Relation::Share(share) => self.plan_share(*share), - Relation::BackCteRef(..) => { - bail_not_implemented!(issue = 15135, "recursive CTE is not supported") - } + Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref) } } @@ -233,6 +231,7 @@ impl Planner { Some(result) => Ok(result.clone()), } } + // for the recursive union in rcte Either::Right(recursive_union) => { self.plan_recursive_union(*recursive_union.base, *recursive_union.recursive) } @@ -243,9 +242,8 @@ impl Planner { todo!("plan watermark"); } - #[allow(dead_code)] pub(super) fn plan_cte_ref(&mut self, _cte_ref: BoundBackCteRef) -> Result { - todo!("plan cte ref"); + bail_not_implemented!(issue = 15135, "recursive CTE not supported") } fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result> { From e18d11a31dc2f278c09c293f99d207356bd0e1da Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 24 Apr 2024 23:08:11 -0400 Subject: [PATCH 04/13] add rcte_cache in OptimizerContext --- src/frontend/src/optimizer/optimizer_context.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/frontend/src/optimizer/optimizer_context.rs b/src/frontend/src/optimizer/optimizer_context.rs index eb6e41037874..3c3a3c696cac 100644 --- a/src/frontend/src/optimizer/optimizer_context.rs +++ b/src/frontend/src/optimizer/optimizer_context.rs @@ -21,11 +21,13 @@ use std::sync::Arc; use risingwave_sqlparser::ast::{ExplainOptions, ExplainType}; +use crate::binder::ShareId; use crate::expr::{CorrelatedId, SessionTimezone}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::PlanNodeId; use crate::session::SessionImpl; use crate::utils::{OverwriteOptions, WithOptions}; +use crate::PlanRef; const RESERVED_ID_NUM: u16 = 10000; @@ -58,6 +60,9 @@ pub struct OptimizerContext { /// Store the configs can be overwritten in with clause /// if not specified, use the value from session variable. overwrite_options: OverwriteOptions, + /// Store the mapping between `share_id` and the corresponding + /// `PlanRef`, used by rcte's planning. (e.g., in `LogicalCteRef`) + rcte_cache: HashMap, _phantom: PhantomUnsend, } @@ -91,6 +96,7 @@ impl OptimizerContext { next_expr_display_id: RefCell::new(RESERVED_ID_NUM.into()), total_rule_applied: RefCell::new(0), overwrite_options, + rcte_cache: HashMap::new(), _phantom: Default::default(), } } @@ -113,6 +119,7 @@ impl OptimizerContext { next_expr_display_id: RefCell::new(0), total_rule_applied: RefCell::new(0), overwrite_options: OverwriteOptions::default(), + rcte_cache: HashMap::new(), _phantom: Default::default(), } .into() From c51f77a9479f5b399480efccace587f43ac034c5 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 25 Apr 2024 17:07:40 -0400 Subject: [PATCH 05/13] add base to BoundBackCteRef --- src/frontend/src/binder/bind_context.rs | 2 +- src/frontend/src/binder/query.rs | 2 +- src/frontend/src/binder/relation/cte_ref.rs | 4 ++-- src/frontend/src/binder/relation/mod.rs | 6 +++--- src/frontend/src/optimizer/optimizer_context.rs | 6 +++--- .../src/optimizer/plan_node/generic/recursive_union.rs | 4 ++-- .../src/optimizer/plan_node/logical_recursive_union.rs | 4 ++-- src/frontend/src/planner/relation.rs | 7 +++---- src/frontend/src/planner/select.rs | 1 + 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 7a52de7decec..6a08e9393206 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -94,7 +94,7 @@ pub enum BindingCteState { #[default] Init, /// We know the schema form after the base term resolved. - BaseResolved { schema: Schema }, + BaseResolved { base: BoundSetExpr }, /// We get the whole bound result of the (recursive) CTE. Bound { query: Either, diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index 39de52666297..35c09dab6905 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -357,7 +357,7 @@ impl Binder { let mut base = self.bind_set_expr(*left)?; entry.borrow_mut().state = BindingCteState::BaseResolved { - schema: base.schema().clone(), + base: base.clone(), }; // Reset context for right side, but keep `cte_to_relation`. diff --git a/src/frontend/src/binder/relation/cte_ref.rs b/src/frontend/src/binder/relation/cte_ref.rs index 86c40cc096dd..87e87fb1823a 100644 --- a/src/frontend/src/binder/relation/cte_ref.rs +++ b/src/frontend/src/binder/relation/cte_ref.rs @@ -13,14 +13,14 @@ // limitations under the License. use crate::binder::statement::RewriteExprsRecursive; -use crate::binder::ShareId; +use crate::binder::{BoundSetExpr, ShareId}; /// A CTE reference, currently only used in the back reference of recursive CTE. /// For the non-recursive one, see [`BoundShare`](super::BoundShare). #[derive(Debug, Clone)] pub struct BoundBackCteRef { - #[expect(dead_code)] pub(crate) share_id: ShareId, + pub(crate) base: BoundSetExpr, } impl RewriteExprsRecursive for BoundBackCteRef { diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 0843989aff2a..4de9c3cfaa9a 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -374,13 +374,13 @@ impl Binder { BindingCteState::Init => { Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_string()).into()) } - BindingCteState::BaseResolved { schema } => { + BindingCteState::BaseResolved { base } => { self.bind_table_to_context( - schema.fields.iter().map(|f| (false, f.clone())), + base.schema().fields.iter().map(|f| (false, f.clone())), table_name.clone(), Some(original_alias), )?; - Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id }))) + Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base }))) } BindingCteState::Bound { query } => { let schema = match &query { diff --git a/src/frontend/src/optimizer/optimizer_context.rs b/src/frontend/src/optimizer/optimizer_context.rs index 3c3a3c696cac..dcdf38b94fb3 100644 --- a/src/frontend/src/optimizer/optimizer_context.rs +++ b/src/frontend/src/optimizer/optimizer_context.rs @@ -62,7 +62,7 @@ pub struct OptimizerContext { overwrite_options: OverwriteOptions, /// Store the mapping between `share_id` and the corresponding /// `PlanRef`, used by rcte's planning. (e.g., in `LogicalCteRef`) - rcte_cache: HashMap, + // rcte_cache: HashMap, _phantom: PhantomUnsend, } @@ -96,7 +96,7 @@ impl OptimizerContext { next_expr_display_id: RefCell::new(RESERVED_ID_NUM.into()), total_rule_applied: RefCell::new(0), overwrite_options, - rcte_cache: HashMap::new(), + // rcte_cache: HashMap::new(), _phantom: Default::default(), } } @@ -119,7 +119,7 @@ impl OptimizerContext { next_expr_display_id: RefCell::new(0), total_rule_applied: RefCell::new(0), overwrite_options: OverwriteOptions::default(), - rcte_cache: HashMap::new(), + // rcte_cache: HashMap::new(), _phantom: Default::default(), } .into() diff --git a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs index 1f1bfb4acf46..a57747dce277 100644 --- a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs @@ -20,8 +20,8 @@ use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; use crate::optimizer::property::FunctionalDependencySet; use crate::OptimizerContextRef; -/// `Union` returns the union of the rows of its inputs. -/// If `all` is false, it needs to eliminate duplicates. +/// `RecursiveUnion` returns the union of the rows of its inputs. +/// note: if `all` is false, it needs to eliminate duplicates. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RecursiveUnion { pub base: PlanRef, diff --git a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs index 100c45ead144..e7331fcfed20 100644 --- a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs @@ -28,8 +28,8 @@ use crate::error::Result; use crate::utils::Condition; use crate::PlanRef; -/// `LogicalUnion` returns the union of the rows of its inputs. -/// If `all` is false, it needs to eliminate duplicates. +/// `LogicalRecursiveUnion` returns the union of the rows of its inputs. +/// note: if `all` is false, it needs to eliminate duplicates. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalRecursiveUnion { pub base: PlanBase, diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 0b66e4e743a0..056ab148fe8e 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -29,8 +29,7 @@ use crate::error::{ErrorCode, Result}; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{ - LogicalApply, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, LogicalShare, - LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef, + LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef }; use crate::optimizer::property::Cardinality; use crate::planner::Planner; @@ -242,8 +241,8 @@ impl Planner { todo!("plan watermark"); } - pub(super) fn plan_cte_ref(&mut self, _cte_ref: BoundBackCteRef) -> Result { - bail_not_implemented!(issue = 15135, "recursive CTE not supported") + pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result { + todo!() } fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result> { diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index a9e7dd3526ed..6c830df2a5a9 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -95,6 +95,7 @@ impl Planner { None => self.create_dummy_values(), Some(t) => self.plan_relation(t)?, }; + println!("after plan relation: {:#?}", root); // Plan the WHERE clause. if let Some(where_clause) = where_clause { root = self.plan_where(root, where_clause)?; From 1580c26d766712213839fcacea838c923f1ba682 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 25 Apr 2024 18:13:23 -0400 Subject: [PATCH 06/13] logical plan rcte --- .../src/optimizer/optimizer_context.rs | 15 +++++++-- .../optimizer/plan_node/generic/cte_ref.rs | 32 +++++++++---------- .../plan_node/generic/recursive_union.rs | 4 +-- .../optimizer/plan_node/logical_cte_ref.rs | 8 ++--- .../plan_node/logical_recursive_union.rs | 2 +- src/frontend/src/planner/recursive_union.rs | 7 ++-- src/frontend/src/planner/relation.rs | 6 ++-- 7 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/frontend/src/optimizer/optimizer_context.rs b/src/frontend/src/optimizer/optimizer_context.rs index dcdf38b94fb3..aceaadaa33ff 100644 --- a/src/frontend/src/optimizer/optimizer_context.rs +++ b/src/frontend/src/optimizer/optimizer_context.rs @@ -15,6 +15,7 @@ use core::convert::Into; use core::fmt::Formatter; use std::cell::{RefCell, RefMut}; +use std::collections::HashMap; use std::marker::PhantomData; use std::rc::Rc; use std::sync::Arc; @@ -62,7 +63,7 @@ pub struct OptimizerContext { overwrite_options: OverwriteOptions, /// Store the mapping between `share_id` and the corresponding /// `PlanRef`, used by rcte's planning. (e.g., in `LogicalCteRef`) - // rcte_cache: HashMap, + rcte_cache: RefCell>, _phantom: PhantomUnsend, } @@ -96,7 +97,7 @@ impl OptimizerContext { next_expr_display_id: RefCell::new(RESERVED_ID_NUM.into()), total_rule_applied: RefCell::new(0), overwrite_options, - // rcte_cache: HashMap::new(), + rcte_cache: RefCell::new(HashMap::new()), _phantom: Default::default(), } } @@ -119,7 +120,7 @@ impl OptimizerContext { next_expr_display_id: RefCell::new(0), total_rule_applied: RefCell::new(0), overwrite_options: OverwriteOptions::default(), - // rcte_cache: HashMap::new(), + rcte_cache: RefCell::new(HashMap::new()), _phantom: Default::default(), } .into() @@ -237,6 +238,14 @@ impl OptimizerContext { pub fn get_session_timezone(&self) -> String { self.session_timezone.borrow().timezone() } + + pub fn get_rcte_cache_plan(&self, id: &ShareId) -> Option { + self.rcte_cache.borrow().get(id).cloned() + } + + pub fn insert_rcte_cache_plan(&self, id: ShareId, plan: PlanRef) { + self.rcte_cache.borrow_mut().insert(id, plan); + } } impl std::fmt::Debug for OptimizerContext { diff --git a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs index 5dbbf02f741d..983ab2c0e545 100644 --- a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs @@ -16,18 +16,19 @@ use std::cell::RefCell; use std::hash::Hash; use std::rc::Weak; +use itertools::Itertools; use pretty_xmlish::StrAssocArr; +use risingwave_common::array::Op; use risingwave_common::catalog::Schema; use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; use crate::binder::ShareId; use crate::optimizer::property::FunctionalDependencySet; -use crate::OptimizerContextRef; +use crate::{optimizer, OptimizerContextRef}; #[derive(Clone, Debug)] pub struct CteRef { share_id: ShareId, - pub r#ref: Weak, base: PlanRef, derived_stream_key: RefCell>>>, deriving: RefCell, @@ -48,10 +49,9 @@ impl Hash for CteRef { } impl CteRef { - pub fn new(share_id: ShareId, r#ref: Weak, base: PlanRef) -> Self { + pub fn new(share_id: ShareId, base: PlanRef) -> Self { Self { share_id, - r#ref, base, derived_stream_key: RefCell::new(None), deriving: RefCell::new(false), @@ -59,31 +59,31 @@ impl CteRef { } } +impl CteRef { + pub fn get_cte_ref(&self) -> Option { + self.base.ctx().get_rcte_cache_plan(&self.share_id) + } +} + impl GenericPlanNode for CteRef { fn schema(&self) -> Schema { self.base.schema().clone() } fn stream_key(&self) -> Option> { - if let Some(derived_stream_key) = self.derived_stream_key.borrow().as_ref() { - return derived_stream_key.clone(); - } - if *self.deriving.borrow() { - return self.base.stream_key().map(Into::into); + if let Some(plan_ref) = self.get_cte_ref() { + plan_ref.stream_key().map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) + } else { + self.base.stream_key().map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) } - *self.deriving.borrow_mut() = true; - let derived_stream_key = self.r#ref.upgrade().unwrap().stream_key().map(Into::into); - *self.deriving.borrow_mut() = false; - *self.derived_stream_key.borrow_mut() = Some(derived_stream_key.clone()); - derived_stream_key } fn ctx(&self) -> OptimizerContextRef { - self.r#ref.upgrade().unwrap().ctx() + self.base.ctx() } fn functional_dependency(&self) -> FunctionalDependencySet { - todo!() + self.base.functional_dependency().clone() } } diff --git a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs index a57747dce277..ea7652d3b674 100644 --- a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs @@ -30,7 +30,7 @@ pub struct RecursiveUnion { impl GenericPlanNode for RecursiveUnion { fn functional_dependency(&self) -> FunctionalDependencySet { - todo!() + self.recursive.functional_dependency().clone() } fn schema(&self) -> Schema { @@ -53,7 +53,7 @@ impl GenericPlanNode for RecursiveUnion { } fn ctx(&self) -> OptimizerContextRef { - self.base.ctx() + self.recursive.ctx() } } diff --git a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs index 5e8fefb84c6d..5f09c0a047c9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs @@ -36,14 +36,14 @@ pub struct LogicalCteRef { } impl LogicalCteRef { - pub fn new(share_id: ShareId, base_plan: PlanRef, r#ref: Weak) -> Self { - let core = generic::CteRef::new(share_id, r#ref, base_plan); + pub fn new(share_id: ShareId, base_plan: PlanRef) -> Self { + let core = generic::CteRef::new(share_id, base_plan); let base = PlanBase::new_logical_with_core(&core); Self { base, core } } - pub fn create(share_id: ShareId, base_plan: PlanRef, r#ref: Weak) -> PlanRef { - Self::new(share_id, base_plan, r#ref).into() + pub fn create(share_id: ShareId, base_plan: PlanRef) -> PlanRef { + Self::new(share_id, base_plan).into() } } diff --git a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs index e7331fcfed20..7694f5ece82b 100644 --- a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs @@ -85,7 +85,7 @@ impl PredicatePushdown for LogicalRecursiveUnion { _predicate: Condition, _ctx: &mut PredicatePushdownContext, ) -> PlanRef { - todo!() + self.clone().into() } } diff --git a/src/frontend/src/planner/recursive_union.rs b/src/frontend/src/planner/recursive_union.rs index bc6cec3a6013..44e0e27e4e56 100644 --- a/src/frontend/src/planner/recursive_union.rs +++ b/src/frontend/src/planner/recursive_union.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::binder::BoundSetExpr; +use crate::binder::{BoundSetExpr, ShareId}; use crate::error::Result; use crate::optimizer::plan_node::LogicalRecursiveUnion; use crate::{PlanRef, Planner}; @@ -22,9 +22,12 @@ impl Planner { &mut self, base: BoundSetExpr, recursive: BoundSetExpr, + id: ShareId, ) -> Result { let base = self.plan_set_expr(base, vec![], &[])?; let recursive = self.plan_set_expr(recursive, vec![], &[])?; - Ok(LogicalRecursiveUnion::create(base, recursive)) + let plan = LogicalRecursiveUnion::create(base, recursive); + self.ctx.insert_rcte_cache_plan(id, plan.clone()); + Ok(plan) } } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 056ab148fe8e..e8752dd0b00d 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -232,7 +232,7 @@ impl Planner { } // for the recursive union in rcte Either::Right(recursive_union) => { - self.plan_recursive_union(*recursive_union.base, *recursive_union.recursive) + self.plan_recursive_union(*recursive_union.base, *recursive_union.recursive, share.share_id) } } } @@ -242,7 +242,9 @@ impl Planner { } pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result { - todo!() + // TODO: this is actually duplicated from `plan_recursive_union`, refactor? + let base = self.plan_set_expr(cte_ref.base, vec![], &[])?; + Ok(LogicalCteRef::create(cte_ref.share_id, base)) } fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result> { From 6740299d3849793c683b2646fc044279077525b5 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 25 Apr 2024 18:13:47 -0400 Subject: [PATCH 07/13] update fmt --- src/frontend/src/binder/query.rs | 4 +--- .../src/optimizer/plan_node/generic/cte_ref.rs | 8 ++++++-- .../src/optimizer/plan_node/logical_cte_ref.rs | 6 +++--- .../optimizer/plan_node/logical_recursive_union.rs | 6 +++--- src/frontend/src/planner/relation.rs | 13 ++++++++----- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index 35c09dab6905..abc9314a84ec 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -356,9 +356,7 @@ impl Binder { // reference: let mut base = self.bind_set_expr(*left)?; - entry.borrow_mut().state = BindingCteState::BaseResolved { - base: base.clone(), - }; + entry.borrow_mut().state = BindingCteState::BaseResolved { base: base.clone() }; // Reset context for right side, but keep `cte_to_relation`. let new_context = std::mem::take(&mut self.context); diff --git a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs index 983ab2c0e545..11e5febb7293 100644 --- a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs @@ -72,9 +72,13 @@ impl GenericPlanNode for CteRef { fn stream_key(&self) -> Option> { if let Some(plan_ref) = self.get_cte_ref() { - plan_ref.stream_key().map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) + plan_ref + .stream_key() + .map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) } else { - self.base.stream_key().map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) + self.base + .stream_key() + .map(|s| s.iter().map(|i| i.to_owned()).collect_vec()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs index 5f09c0a047c9..f0844a2e2eab 100644 --- a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs @@ -20,9 +20,9 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use super::expr_visitable::ExprVisitable; use super::utils::impl_distill_by_unit; use super::{ - generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, - LogicalProject, PlanBase, PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, - ToBatch, ToStream, ToStreamContext, + generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalProject, PlanBase, + PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, ToBatch, ToStream, + ToStreamContext, }; use crate::binder::ShareId; use crate::error::Result; diff --git a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs index 7694f5ece82b..cc16ade466ab 100644 --- a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs @@ -20,9 +20,9 @@ use smallvec::{smallvec, SmallVec}; use super::expr_visitable::ExprVisitable; use super::utils::impl_distill_by_unit; use super::{ - generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, - PlanBase, PlanTreeNode, PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, - ToBatch, ToStream, ToStreamContext, + generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, PlanBase, PlanTreeNode, + PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, ToBatch, ToStream, + ToStreamContext, }; use crate::error::Result; use crate::utils::Condition; diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index e8752dd0b00d..e2f38617dc66 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -29,7 +29,8 @@ use crate::error::{ErrorCode, Result}; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{ - LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef + LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, + LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef, }; use crate::optimizer::property::Cardinality; use crate::planner::Planner; @@ -56,7 +57,7 @@ impl Planner { Relation::Watermark(tf) => self.plan_watermark(*tf), // note that rcte (i.e., RecursiveUnion) is included *implicitly* in share. Relation::Share(share) => self.plan_share(*share), - Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref) + Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref), } } @@ -231,9 +232,11 @@ impl Planner { } } // for the recursive union in rcte - Either::Right(recursive_union) => { - self.plan_recursive_union(*recursive_union.base, *recursive_union.recursive, share.share_id) - } + Either::Right(recursive_union) => self.plan_recursive_union( + *recursive_union.base, + *recursive_union.recursive, + share.share_id, + ), } } From 799bd4304e1f67a1254b5c7038578547486f5c6c Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 25 Apr 2024 18:44:05 -0400 Subject: [PATCH 08/13] change schema of recursive union in generic plan node --- .../src/optimizer/plan_node/generic/cte_ref.rs | 12 +----------- .../optimizer/plan_node/generic/recursive_union.rs | 8 ++------ .../src/optimizer/plan_node/logical_cte_ref.rs | 4 +--- 3 files changed, 4 insertions(+), 20 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs index 11e5febb7293..fd7d4fb6c424 100644 --- a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::RefCell; use std::hash::Hash; -use std::rc::Weak; use itertools::Itertools; use pretty_xmlish::StrAssocArr; -use risingwave_common::array::Op; use risingwave_common::catalog::Schema; use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; @@ -30,8 +27,6 @@ use crate::{optimizer, OptimizerContextRef}; pub struct CteRef { share_id: ShareId, base: PlanRef, - derived_stream_key: RefCell>>>, - deriving: RefCell, } impl PartialEq for CteRef { @@ -50,12 +45,7 @@ impl Hash for CteRef { impl CteRef { pub fn new(share_id: ShareId, base: PlanRef) -> Self { - Self { - share_id, - base, - derived_stream_key: RefCell::new(None), - deriving: RefCell::new(false), - } + Self { share_id, base } } } diff --git a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs index ea7652d3b674..5a459349009e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/recursive_union.rs @@ -13,8 +13,7 @@ // limitations under the License. use pretty_xmlish::StrAssocArr; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; +use risingwave_common::catalog::Schema; use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; use crate::optimizer::property::FunctionalDependencySet; @@ -34,10 +33,7 @@ impl GenericPlanNode for RecursiveUnion { } fn schema(&self) -> Schema { - let mut base = self.base.schema().clone(); - let iter_field = Field::with_name(DataType::Int16, "$iter"); - base.fields.push(iter_field); - base + self.recursive.schema().clone() } fn stream_key(&self) -> Option> { diff --git a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs index f0844a2e2eab..7731844b22e3 100644 --- a/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/logical_cte_ref.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::rc::Weak; - use risingwave_common::bail_not_implemented; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -67,7 +65,7 @@ impl PredicatePushdown for LogicalCteRef { _predicate: Condition, _ctx: &mut PredicatePushdownContext, ) -> PlanRef { - todo!() + self.clone().into() } } From 86545f84d794cb2dc405b8977cae9a0872b7ae08 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 25 Apr 2024 18:48:39 -0400 Subject: [PATCH 09/13] remove redundant debugging stmt --- src/frontend/src/planner/select.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index 6c830df2a5a9..a9e7dd3526ed 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -95,7 +95,6 @@ impl Planner { None => self.create_dummy_values(), Some(t) => self.plan_relation(t)?, }; - println!("after plan relation: {:#?}", root); // Plan the WHERE clause. if let Some(where_clause) = where_clause { root = self.plan_where(root, where_clause)?; From 61fe89258d34e82caf866c6f78dde07a1f0b4694 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 29 Apr 2024 17:40:08 -0400 Subject: [PATCH 10/13] update planner tests; pretty fields for LogicalRecursiveUnion --- .../tests/testdata/input/recursive_cte.yaml | 4 ++-- .../tests/testdata/output/recursive_cte.yaml | 22 ++++++++++++++----- .../plan_node/logical_recursive_union.rs | 14 ++++++++++-- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/recursive_cte.yaml b/src/frontend/planner_test/tests/testdata/input/recursive_cte.yaml index 411d129d415f..e4661b1b4fde 100644 --- a/src/frontend/planner_test/tests/testdata/input/recursive_cte.yaml +++ b/src/frontend/planner_test/tests/testdata/input/recursive_cte.yaml @@ -1,11 +1,11 @@ - name: basic sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT * FROM t1; expected_outputs: - - planner_error + - logical_plan - name: output column follows lhs sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a FROM t1; expected_outputs: - - planner_error + - logical_plan - name: name a is leaked outside sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a; expected_outputs: diff --git a/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml b/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml index 18e0e18726d3..694894778209 100644 --- a/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml +++ b/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml @@ -1,14 +1,24 @@ # This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. - name: basic sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT * FROM t1; - planner_error: |- - Feature is not yet implemented: recursive CTE is not supported - Tracking issue: https://github.com/risingwavelabs/risingwave/issues/15135 + logical_plan: |- + LogicalProject { exprs: [$expr1] } + └─LogicalRecursiveUnion { id: 4 } + ├─LogicalProject { exprs: [1:Int32] } + │ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } + └─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } + └─LogicalFilter { predicate: (1:Int32 < 10:Int32) } + └─LogicalCteRef - name: output column follows lhs sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a FROM t1; - planner_error: |- - Feature is not yet implemented: recursive CTE is not supported - Tracking issue: https://github.com/risingwavelabs/risingwave/issues/15135 + logical_plan: |- + LogicalProject { exprs: [$expr1] } + └─LogicalRecursiveUnion { id: 4 } + ├─LogicalProject { exprs: [1:Int32] } + │ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } + └─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } + └─LogicalFilter { predicate: (1:Int32 < 10:Int32) } + └─LogicalCteRef - name: name a is leaked outside sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a; binder_error: | diff --git a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs index cc16ade466ab..9cfd3c9ad7f3 100644 --- a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs @@ -13,12 +13,14 @@ // limitations under the License. use itertools::Itertools; +use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::bail_not_implemented; use risingwave_common::util::column_index_mapping::ColIndexMapping; use smallvec::{smallvec, SmallVec}; use super::expr_visitable::ExprVisitable; -use super::utils::impl_distill_by_unit; +use super::generic::GenericPlanRef; +use super::utils::{childless_record, Distill}; use super::{ generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, PlanBase, PlanTreeNode, PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, ToBatch, ToStream, @@ -49,6 +51,10 @@ impl LogicalRecursiveUnion { pub fn create(base_plan: PlanRef, recursive: PlanRef) -> PlanRef { Self::new(base_plan, recursive).into() } + + pub(super) fn pretty_fields(base: impl GenericPlanRef, name: &str) -> XmlNode<'_> { + childless_record(name, vec![("id", Pretty::debug(&base.id().0))]) + } } impl PlanTreeNode for LogicalRecursiveUnion { @@ -62,7 +68,11 @@ impl PlanTreeNode for LogicalRecursiveUnion { } } -impl_distill_by_unit!(LogicalRecursiveUnion, core, "LogicalRecursiveUnion"); +impl Distill for LogicalRecursiveUnion { + fn distill<'a>(&self) -> XmlNode<'a> { + Self::pretty_fields(&self.base, "LogicalRecursiveUnion") + } +} impl ColPrunable for LogicalRecursiveUnion { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { From 4ae584fd57fe83a44a35c181db2260cf073986a9 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 29 Apr 2024 17:45:01 -0400 Subject: [PATCH 11/13] update planner tests for cte ref --- .../planner_test/tests/testdata/output/recursive_cte.yaml | 4 ++-- src/frontend/src/optimizer/plan_node/generic/cte_ref.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml b/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml index 694894778209..6f54c60ba23a 100644 --- a/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml +++ b/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml @@ -8,7 +8,7 @@ │ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } └─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } └─LogicalFilter { predicate: (1:Int32 < 10:Int32) } - └─LogicalCteRef + └─LogicalCteRef { share_id: 0 } - name: output column follows lhs sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a FROM t1; logical_plan: |- @@ -18,7 +18,7 @@ │ └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } └─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } └─LogicalFilter { predicate: (1:Int32 < 10:Int32) } - └─LogicalCteRef + └─LogicalCteRef { share_id: 0 } - name: name a is leaked outside sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a; binder_error: | diff --git a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs index fd7d4fb6c424..5e66f334eac2 100644 --- a/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs +++ b/src/frontend/src/optimizer/plan_node/generic/cte_ref.rs @@ -15,7 +15,7 @@ use std::hash::Hash; use itertools::Itertools; -use pretty_xmlish::StrAssocArr; +use pretty_xmlish::{Pretty, StrAssocArr}; use risingwave_common::catalog::Schema; use super::{impl_distill_unit_from_fields, GenericPlanNode, GenericPlanRef}; @@ -83,7 +83,7 @@ impl GenericPlanNode for CteRef { impl CteRef { pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> { - vec![] + vec![("share_id", Pretty::debug(&self.share_id))] } } From 6a2fe6020f3a4cb526dcba76e0ef7ee35829accb Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 29 Apr 2024 17:57:31 -0400 Subject: [PATCH 12/13] add case with explicit column to planner test --- .../planner_test/tests/testdata/input/recursive_cte.yaml | 6 +++++- .../tests/testdata/output/recursive_cte.yaml | 9 +++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/frontend/planner_test/tests/testdata/input/recursive_cte.yaml b/src/frontend/planner_test/tests/testdata/input/recursive_cte.yaml index e4661b1b4fde..bdd6f11f0b7b 100644 --- a/src/frontend/planner_test/tests/testdata/input/recursive_cte.yaml +++ b/src/frontend/planner_test/tests/testdata/input/recursive_cte.yaml @@ -6,7 +6,11 @@ sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a FROM t1; expected_outputs: - logical_plan +- name: with normal column + sql: WITH RECURSIVE t(a) AS (VALUES(1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t; + expected_outputs: + - logical_plan - name: name a is leaked outside sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a; expected_outputs: - - binder_error + - binder_error \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml b/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml index 6f54c60ba23a..93872d5848bd 100644 --- a/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml +++ b/src/frontend/planner_test/tests/testdata/output/recursive_cte.yaml @@ -19,6 +19,15 @@ └─LogicalProject { exprs: [(1:Int32 + 1:Int32) as $expr1] } └─LogicalFilter { predicate: (1:Int32 < 10:Int32) } └─LogicalCteRef { share_id: 0 } +- name: with normal column + sql: WITH RECURSIVE t(a) AS (VALUES(1) UNION ALL SELECT a + 1 FROM t WHERE a < 100) SELECT * FROM t; + logical_plan: |- + LogicalProject { exprs: [$expr1] } + └─LogicalRecursiveUnion { id: 3 } + ├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32] } } + └─LogicalProject { exprs: [(*VALUES*_0.column_0 + 1:Int32) as $expr1] } + └─LogicalFilter { predicate: (*VALUES*_0.column_0 < 100:Int32) } + └─LogicalCteRef { share_id: 0 } - name: name a is leaked outside sql: WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT a; binder_error: | From 35b6d6154670f546e9a46157bd453f768d9ef081 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 29 Apr 2024 21:56:17 -0400 Subject: [PATCH 13/13] fix check --- src/frontend/src/optimizer/plan_node/logical_recursive_union.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs index 9cfd3c9ad7f3..b0bf8993ef0a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_recursive_union.rs @@ -63,7 +63,7 @@ impl PlanTreeNode for LogicalRecursiveUnion { } fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef { - let mut inputs = inputs.into_iter().cloned(); + let mut inputs = inputs.iter().cloned(); Self::create(inputs.next().unwrap(), inputs.next().unwrap()) } }