From 27b5439636eb732b3566306add89685fa6d81129 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 7 Mar 2024 20:13:56 +0800 Subject: [PATCH 01/18] feat(binder): bind RCTE Signed-off-by: TennyZhuang --- src/frontend/src/binder/bind_context.rs | 29 ++++- src/frontend/src/binder/query.rs | 113 +++++++++++++++++--- src/frontend/src/binder/relation/cte_ref.rs | 28 +++++ src/frontend/src/binder/relation/mod.rs | 63 +++++++---- src/frontend/src/binder/set_expr.rs | 24 +++++ src/frontend/src/expr/mod.rs | 14 +++ src/frontend/src/lib.rs | 1 + src/frontend/src/planner/relation.rs | 3 + src/frontend/src/planner/set_expr.rs | 4 + 9 files changed, 242 insertions(+), 37 deletions(-) create mode 100644 src/frontend/src/binder/relation/cte_ref.rs diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 0dc03464fbe62..e787d4ae24970 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::rc::Rc; use parse_display::Display; -use risingwave_common::catalog::Field; +use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::TableAlias; @@ -66,6 +67,30 @@ pub struct LateralBindContext { pub context: BindContext, } +/// If the CTE a recursive one, we may need store it in `cte_to_relation` first, and bind it step by step. +/// +/// ```sql +/// WITH RECURSIVE t(n) AS ( +/// ---------------^ Init +/// VALUES (1) +/// UNION ALL +/// SELECT n+1 FROM t WHERE n < 100 +/// # ------------------^BaseResolved +/// ) +/// SELECT sum(n) FROM t; +/// # -----------------^Bound +/// ``` +#[derive(Default, Debug, Clone)] +pub enum BindingCteState { + /// We know nothing about the CTE before resolve the body. + #[default] + Init, + /// We know the schema from after the base term resolved. + BaseResolved { schema: Schema }, + /// We get the whole bound result. + Bound { query: BoundQuery }, +} + #[derive(Default, Debug, Clone)] pub struct BindContext { // Columns of all tables. @@ -80,7 +105,7 @@ pub struct BindContext { pub column_group_context: ColumnGroupContext, /// Map the cte's name to its Relation::Subquery. /// The `ShareId` of the value is used to help the planner identify the share plan. - pub cte_to_relation: HashMap>, + pub cte_to_relation: HashMap>>, /// Current lambda functions's arguments pub lambda_args: Option>, } diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index fe2008f50f3eb..8a45550b056f5 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -12,16 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; -use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_sqlparser::ast::{Cte, Expr, Fetch, OrderByExpr, Query, Value, With}; +use risingwave_sqlparser::ast::{ + Cte, Expr, Fetch, OrderByExpr, Query, SetExpr, SetOperator, Value, With, +}; use thiserror_ext::AsReport; +use super::bind_context::BindingCteState; use super::statement::RewriteExprsRecursive; use super::BoundValues; use crate::binder::{Binder, BoundSetExpr}; @@ -279,20 +282,104 @@ impl Binder { } fn bind_with(&mut self, with: With) -> Result<()> { - if with.recursive { - bail_not_implemented!("recursive cte"); - } else { - for cte_table in with.cte_tables { - let Cte { alias, query, .. } = cte_table; - let table_name = alias.name.real_value(); - let bound_query = self.bind_query(query)?; - let share_id = self.next_share_id(); - self.context + for cte_table in with.cte_tables { + let share_id = self.next_share_id(); + let Cte { alias, query, .. } = cte_table; + let table_name = alias.name.real_value(); + if with.recursive { + let Query { + with, + body, + order_by, + limit, + offset, + fetch, + } = query; + fn should_be_empty(v: Option, clause: &str) -> Result<()> { + if !v.is_none() { + return Err(ErrorCode::BindError(format!( + "`{clause}` is not supported in recursive CTE" + )) + .into()); + } + Ok(()) + } + should_be_empty(order_by.first(), "ORDER BY")?; + should_be_empty(limit, "LIMIT")?; + should_be_empty(offset, "OFFSET")?; + should_be_empty(fetch, "FETCH")?; + + let SetExpr::SetOperation { + op: SetOperator::Union, + all, + left, + right, + } = body + else { + return Err(ErrorCode::BindError(format!( + "`UNION` is required in recursive CTE" + )) + .into()); + }; + + if !all { + return Err(ErrorCode::BindError(format!( + "only `UNION ALL` is supported in recursive CTE now" + )) + .into()); + } + + let entry = self + .context .cte_to_relation - .insert(table_name, Rc::new((share_id, bound_query, alias))); + .entry(table_name) + .insert_entry(Rc::new(RefCell::new(( + share_id, + BindingCteState::Init, + alias, + )))) + .get() + .clone(); + + if let Some(with) = with { + self.bind_with(with)?; + } + + // We assume `left` is base term, otherwise the implementation may be very hard. + let bound_base = self.bind_set_expr(*left)?; + + entry.borrow_mut().1 = BindingCteState::BaseResolved { + schema: bound_base.schema().clone(), + }; + + let bound_recursive = self.bind_set_expr(*right)?; + + let bound_query = BoundQuery { + body: BoundSetExpr::RecursiveUnion { + base: Box::new(bound_base), + recursive: Box::new(bound_recursive), + }, + order: vec![], + limit: None, + offset: None, + with_ties: false, + extra_order_exprs: vec![], + }; + + entry.borrow_mut().1 = BindingCteState::Bound { query: bound_query }; + } else { + let bound_query = self.bind_query(query)?; + self.context.cte_to_relation.insert( + table_name, + Rc::new(RefCell::new(( + share_id, + BindingCteState::Bound { query: bound_query }, + alias, + ))), + ); } - Ok(()) } + Ok(()) } } diff --git a/src/frontend/src/binder/relation/cte_ref.rs b/src/frontend/src/binder/relation/cte_ref.rs new file mode 100644 index 0000000000000..86c40cc096dd1 --- /dev/null +++ b/src/frontend/src/binder/relation/cte_ref.rs @@ -0,0 +1,28 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::binder::statement::RewriteExprsRecursive; +use crate::binder::ShareId; + +/// A CTE reference, currently only used in the back reference of recursive CTE. +/// For the non-recursive one, see [`BoundShare`](super::BoundShare). +#[derive(Debug, Clone)] +pub struct BoundBackCteRef { + #[expect(dead_code)] + pub(crate) share_id: ShareId, +} + +impl RewriteExprsRecursive for BoundBackCteRef { + fn rewrite_exprs_recursive(&mut self, _rewriter: &mut impl crate::expr::ExprRewriter) {} +} diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 69eb6787d47a0..aabf5ba719dd7 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -24,12 +24,15 @@ use risingwave_sqlparser::ast::{ use thiserror::Error; use thiserror_ext::AsReport; +use self::cte_ref::BoundBackCteRef; use super::bind_context::ColumnBinding; use super::statement::RewriteExprsRecursive; +use crate::binder::bind_context::BindingCteState; use crate::binder::Binder; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, InputRef}; +mod cte_ref; mod join; mod share; mod subquery; @@ -65,6 +68,7 @@ pub enum Relation { }, Watermark(Box), Share(Box), + BackCteRef(Box), } impl RewriteExprsRecursive for Relation { @@ -79,6 +83,7 @@ impl RewriteExprsRecursive for Relation { Relation::TableFunction { expr: inner, .. } => { *inner = rewriter.rewrite_expr(inner.take()) } + Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter), _ => {} } } @@ -336,7 +341,7 @@ impl Binder { { // Handles CTE - let (share_id, query, mut original_alias) = item.deref().clone(); + let (share_id, cte_state, mut original_alias) = item.deref().borrow().clone(); debug_assert_eq!(original_alias.name.real_value(), table_name); // The original CTE alias ought to be its table name. if let Some(from_alias) = alias { @@ -349,27 +354,41 @@ impl Binder { .collect(); } - self.bind_table_to_context( - query - .body - .schema() - .fields - .iter() - .map(|f| (false, f.clone())), - table_name.clone(), - Some(original_alias), - )?; - - // Share the CTE. - let input_relation = Relation::Subquery(Box::new(BoundSubquery { - query, - lateral: false, - })); - let share_relation = Relation::Share(Box::new(BoundShare { - share_id, - input: input_relation, - })); - Ok(share_relation) + match cte_state { + BindingCteState::Init => { + Err(ErrorCode::BindError(format!("Base term of recursive CTE not found, consider write it to left side of the `UNION` operator")).into()) + } + BindingCteState::BaseResolved { schema } => { + self.bind_table_to_context( + schema.fields.iter().map(|f| (false, f.clone())), + table_name.clone(), + Some(original_alias), + )?; + Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id }))) + } + BindingCteState::Bound { query } => { + self.bind_table_to_context( + query + .body + .schema() + .fields + .iter() + .map(|f| (false, f.clone())), + table_name.clone(), + Some(original_alias), + )?; + // Share the CTE. + let input_relation = Relation::Subquery(Box::new(BoundSubquery { + query, + lateral: false, + })); + let share_relation = Relation::Share(Box::new(BoundShare { + share_id, + input: input_relation, + })); + Ok(share_relation) + } + } } else { self.bind_relation_by_name_inner( schema_name.as_deref(), diff --git a/src/frontend/src/binder/set_expr.rs b/src/frontend/src/binder/set_expr.rs index 99ec66ac0b725..289043cd10029 100644 --- a/src/frontend/src/binder/set_expr.rs +++ b/src/frontend/src/binder/set_expr.rs @@ -36,6 +36,11 @@ pub enum BoundSetExpr { left: Box, right: Box, }, + /// UNION in recursive CTE definition + RecursiveUnion { + base: Box, + recursive: Box, + }, } impl RewriteExprsRecursive for BoundSetExpr { @@ -48,6 +53,10 @@ impl RewriteExprsRecursive for BoundSetExpr { left.rewrite_exprs_recursive(rewriter); right.rewrite_exprs_recursive(rewriter); } + BoundSetExpr::RecursiveUnion { base, recursive } => { + base.rewrite_exprs_recursive(rewriter); + recursive.rewrite_exprs_recursive(rewriter); + } } } } @@ -78,6 +87,7 @@ impl BoundSetExpr { BoundSetExpr::Values(v) => v.schema(), BoundSetExpr::Query(q) => q.schema(), BoundSetExpr::SetOperation { left, .. } => left.schema(), + BoundSetExpr::RecursiveUnion { base, .. } => base.schema(), } } @@ -89,6 +99,9 @@ impl BoundSetExpr { BoundSetExpr::SetOperation { left, right, .. } => { left.is_correlated(depth) || right.is_correlated(depth) } + BoundSetExpr::RecursiveUnion { base, recursive } => { + base.is_correlated(depth) || recursive.is_correlated(depth) + } } } @@ -117,6 +130,17 @@ impl BoundSetExpr { ); correlated_indices } + BoundSetExpr::RecursiveUnion { base, recursive } => { + let mut correlated_indices = vec![]; + correlated_indices.extend( + base.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id), + ); + correlated_indices.extend( + recursive + .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id), + ); + correlated_indices + } } } } diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 78ae2db726a39..db8a438aa1755 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -483,6 +483,12 @@ impl ExprImpl { self.visit_bound_set_expr(left); self.visit_bound_set_expr(right); } + BoundSetExpr::RecursiveUnion { + base, recursive, .. + } => { + self.visit_bound_set_expr(base); + self.visit_bound_set_expr(recursive); + } }; } } @@ -524,6 +530,10 @@ impl ExprImpl { self.visit_bound_set_expr(left); self.visit_bound_set_expr(right); } + BoundSetExpr::RecursiveUnion { base, recursive } => { + self.visit_bound_set_expr(base); + self.visit_bound_set_expr(recursive); + } } } } @@ -593,6 +603,10 @@ impl ExprImpl { self.visit_bound_set_expr(&mut *left); self.visit_bound_set_expr(&mut *right); } + BoundSetExpr::RecursiveUnion { base, recursive } => { + self.visit_bound_set_expr(&mut *base); + self.visit_bound_set_expr(&mut *recursive); + } } } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 9dc64983671d3..59271dfd18d4d 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -35,6 +35,7 @@ #![feature(round_ties_even)] #![feature(iterator_try_collect)] #![feature(used_with_arg)] +#![feature(entry_insert)] #![recursion_limit = "256"] #[cfg(test)] diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 3f64a8fde4405..5101e263b299b 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -54,6 +54,9 @@ impl Planner { } => self.plan_table_function(tf, with_ordinality), Relation::Watermark(tf) => self.plan_watermark(*tf), Relation::Share(share) => self.plan_share(*share), + Relation::BackCteRef(..) => { + bail_not_implemented!(issue = 15135, "recursive CTE is not supported") + } } } diff --git a/src/frontend/src/planner/set_expr.rs b/src/frontend/src/planner/set_expr.rs index e2ff43a2c211b..eeb789e9d9ded 100644 --- a/src/frontend/src/planner/set_expr.rs +++ b/src/frontend/src/planner/set_expr.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::bail_not_implemented; use risingwave_common::util::sort_util::ColumnOrder; use crate::binder::BoundSetExpr; @@ -37,6 +38,9 @@ impl Planner { left, right, } => self.plan_set_operation(op, all, *left, *right), + BoundSetExpr::RecursiveUnion { .. } => { + bail_not_implemented!(issue = 15135, "recursive CTE is not supported") + } } } } From 20493ab4ba17f5b125834408d4312cb487c9af15 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 14 Mar 2024 14:59:03 +0800 Subject: [PATCH 02/18] Update src/frontend/src/binder/bind_context.rs --- src/frontend/src/binder/bind_context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 37259c3b9ec4d..3d740b21166ab 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -72,7 +72,7 @@ pub struct LateralBindContext { /// /// ```sql /// WITH RECURSIVE t(n) AS ( -/// ---------------^ Init +/// # -------------^ Init /// VALUES (1) /// UNION ALL /// SELECT n+1 FROM t WHERE n < 100 From cfb9920ee7744b3d0a9ddacffc6f1e0248bb048a Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Thu, 14 Mar 2024 18:01:10 +0800 Subject: [PATCH 03/18] Update src/frontend/src/expr/mod.rs Co-authored-by: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> --- src/frontend/src/expr/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index db8a438aa1755..9ad24c2fe429c 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -484,7 +484,7 @@ impl ExprImpl { self.visit_bound_set_expr(right); } BoundSetExpr::RecursiveUnion { - base, recursive, .. + base, recursive, } => { self.visit_bound_set_expr(base); self.visit_bound_set_expr(recursive); From 76cc79c4ee3ac052e71f347aa61286a0142c7d6a Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Fri, 29 Mar 2024 14:49:11 +0800 Subject: [PATCH 04/18] update Signed-off-by: TennyZhuang --- src/frontend/src/binder/bind_context.rs | 2 +- src/frontend/src/binder/query.rs | 14 +++++++------- src/frontend/src/binder/relation/mod.rs | 2 +- src/frontend/src/expr/mod.rs | 4 +--- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 003232b209e58..cb81b445f96e0 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -104,7 +104,7 @@ pub struct BindContext { pub clause: Option, // The `BindContext`'s data on its column groups pub column_group_context: ColumnGroupContext, - /// Map the cte's name to its `Relation::Subquery`. + /// Map the cte's name to its binding state. /// The `ShareId` of the value is used to help the planner identify the share plan. pub cte_to_relation: HashMap>>, /// Current lambda functions's arguments diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index 8a45550b056f5..c83d70cd9cf18 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -296,7 +296,7 @@ impl Binder { fetch, } = query; fn should_be_empty(v: Option, clause: &str) -> Result<()> { - if !v.is_none() { + if v.is_some() { return Err(ErrorCode::BindError(format!( "`{clause}` is not supported in recursive CTE" )) @@ -316,16 +316,16 @@ impl Binder { right, } = body else { - return Err(ErrorCode::BindError(format!( - "`UNION` is required in recursive CTE" - )) + return Err(ErrorCode::BindError( + "`UNION` is required in recursive CTE".to_string(), + ) .into()); }; if !all { - return Err(ErrorCode::BindError(format!( - "only `UNION ALL` is supported in recursive CTE now" - )) + return Err(ErrorCode::BindError( + "only `UNION ALL` is supported in recursive CTE now".to_string(), + ) .into()); } diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index de76d4e59d7b9..a70323add1322 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -362,7 +362,7 @@ impl Binder { match cte_state { BindingCteState::Init => { - Err(ErrorCode::BindError(format!("Base term of recursive CTE not found, consider write it to left side of the `UNION` operator")).into()) + Err(ErrorCode::BindError("Base term of recursive CTE not found, consider write it to left side of the `UNION` operator".to_string()).into()) } BindingCteState::BaseResolved { schema } => { self.bind_table_to_context( diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 81fe3b77615cd..e6b2c0d382b90 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -483,9 +483,7 @@ impl ExprImpl { self.visit_bound_set_expr(left); self.visit_bound_set_expr(right); } - BoundSetExpr::RecursiveUnion { - base, recursive, - } => { + BoundSetExpr::RecursiveUnion { base, recursive } => { self.visit_bound_set_expr(base); self.visit_bound_set_expr(recursive); } From 8e5b68a96d37de9bc8511b8425d82e5065fb733a Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Fri, 29 Mar 2024 17:07:46 +0800 Subject: [PATCH 05/18] refine Signed-off-by: TennyZhuang --- src/frontend/src/binder/bind_context.rs | 9 ++++++++- src/frontend/src/binder/query.rs | 19 +++++++++++-------- src/frontend/src/binder/relation/mod.rs | 8 ++++++-- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index cb81b445f96e0..309bf9b43c105 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -92,6 +92,13 @@ pub enum BindingCteState { Bound { query: BoundQuery }, } +#[derive(Clone, Debug)] +pub struct BindingCte { + pub share_id: ShareId, + pub state: BindingCteState, + pub alias: TableAlias, +} + #[derive(Default, Debug, Clone)] pub struct BindContext { // Columns of all tables. @@ -106,7 +113,7 @@ pub struct BindContext { pub column_group_context: ColumnGroupContext, /// Map the cte's name to its binding state. /// The `ShareId` of the value is used to help the planner identify the share plan. - pub cte_to_relation: HashMap>>, + pub cte_to_relation: HashMap>>, /// Current lambda functions's arguments pub lambda_args: Option>, } diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index c83d70cd9cf18..ef957225c7b47 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -27,6 +27,7 @@ use thiserror_ext::AsReport; use super::bind_context::BindingCteState; use super::statement::RewriteExprsRecursive; use super::BoundValues; +use crate::binder::bind_context::BindingCte; use crate::binder::{Binder, BoundSetExpr}; use crate::error::{ErrorCode, Result}; use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter}; @@ -333,11 +334,11 @@ impl Binder { .context .cte_to_relation .entry(table_name) - .insert_entry(Rc::new(RefCell::new(( + .insert_entry(Rc::new(RefCell::new(BindingCte { share_id, - BindingCteState::Init, + state: BindingCteState::Init, alias, - )))) + }))) .get() .clone(); @@ -346,9 +347,11 @@ impl Binder { } // We assume `left` is base term, otherwise the implementation may be very hard. + // The behavior is same as PostgreSQL. + // https://www.postgresql.org/docs/16/sql-select.html#:~:text=the%20recursive%20self%2Dreference%20must%20appear%20on%20the%20right%2Dhand%20side%20of%20the%20UNION let bound_base = self.bind_set_expr(*left)?; - entry.borrow_mut().1 = BindingCteState::BaseResolved { + entry.borrow_mut().state = BindingCteState::BaseResolved { schema: bound_base.schema().clone(), }; @@ -366,16 +369,16 @@ impl Binder { extra_order_exprs: vec![], }; - entry.borrow_mut().1 = BindingCteState::Bound { query: bound_query }; + entry.borrow_mut().state = BindingCteState::Bound { query: bound_query }; } else { let bound_query = self.bind_query(query)?; self.context.cte_to_relation.insert( table_name, - Rc::new(RefCell::new(( + Rc::new(RefCell::new(BindingCte { share_id, - BindingCteState::Bound { query: bound_query }, + state: BindingCteState::Bound { query: bound_query }, alias, - ))), + })), ); } } diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index a70323add1322..d8d6ad7df08bc 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -28,7 +28,7 @@ use thiserror_ext::AsReport; use self::cte_ref::BoundBackCteRef; use super::bind_context::ColumnBinding; use super::statement::RewriteExprsRecursive; -use crate::binder::bind_context::BindingCteState; +use crate::binder::bind_context::{BindingCte, BindingCteState}; use crate::binder::Binder; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, InputRef}; @@ -347,7 +347,11 @@ impl Binder { { // Handles CTE - let (share_id, cte_state, mut original_alias) = item.deref().borrow().clone(); + let BindingCte { + share_id, + state: cte_state, + alias: mut original_alias, + } = item.deref().borrow().clone(); debug_assert_eq!(original_alias.name.real_value(), table_name); // The original CTE alias ought to be its table name. if let Some(from_alias) = alias { From e2292a31012d3d1b61ff9eed02bb6d015bd3a3d3 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Fri, 29 Mar 2024 23:35:59 -0400 Subject: [PATCH 06/18] update comment; fix typo(s) --- src/frontend/src/binder/bind_context.rs | 18 +++++++++++------- src/frontend/src/binder/set_expr.rs | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 309bf9b43c105..3229f4d08be02 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -68,27 +68,31 @@ pub struct LateralBindContext { pub context: BindContext, } -/// If the CTE a recursive one, we may need store it in `cte_to_relation` first, and bind it step by step. +/// For recursive CTE, we may need to store it in `cte_to_relation` first, +/// and then bind it *step by step*. +/// +/// note: the below sql example is to illustrate when we get the +/// corresponding binding state when handling a recursive CTE like this. /// /// ```sql /// WITH RECURSIVE t(n) AS ( -/// # -------------^ Init +/// # -------------^ => Init /// VALUES (1) /// UNION ALL /// SELECT n+1 FROM t WHERE n < 100 -/// # ------------------^BaseResolved +/// # ------------------^ => BaseResolved /// ) /// SELECT sum(n) FROM t; -/// # -----------------^Bound +/// # -----------------^ => Bound /// ``` #[derive(Default, Debug, Clone)] pub enum BindingCteState { - /// We know nothing about the CTE before resolve the body. + /// We know nothing about the CTE before resolving the body. #[default] Init, - /// We know the schema from after the base term resolved. + /// We know the schema form after the base term resolved. BaseResolved { schema: Schema }, - /// We get the whole bound result. + /// We get the whole bound result of the (recursive) CTE. Bound { query: BoundQuery }, } diff --git a/src/frontend/src/binder/set_expr.rs b/src/frontend/src/binder/set_expr.rs index e64fa7dca2a15..64c38657b3b4b 100644 --- a/src/frontend/src/binder/set_expr.rs +++ b/src/frontend/src/binder/set_expr.rs @@ -23,7 +23,7 @@ use crate::error::{ErrorCode, Result}; use crate::expr::{align_types, CorrelatedId, Depth}; /// Part of a validated query, without order or limit clause. It may be composed of smaller -/// `BoundSetExpr`s via set operators (e.g. union). +/// `BoundSetExpr`(s) via set operators (e.g., union). #[derive(Debug, Clone)] pub enum BoundSetExpr { Select(Box), From 78a2422cbada0c9c0ca437fba9718af55f414303 Mon Sep 17 00:00:00 2001 From: Zihao Xu Date: Wed, 3 Apr 2024 14:38:04 -0400 Subject: [PATCH 07/18] feat(binder): correctly bind rcte in `bind_with` & `bind_relation_by_name` (#16023) --- src/frontend/src/binder/bind_context.rs | 47 ++++-- src/frontend/src/binder/query.rs | 61 +++++--- src/frontend/src/binder/relation/mod.rs | 46 ++++-- .../src/binder/relation/recursive_union.rs | 35 +++++ src/frontend/src/binder/set_expr.rs | 144 +++++++++--------- src/frontend/src/expr/mod.rs | 12 -- src/frontend/src/planner/relation.rs | 5 + src/frontend/src/planner/set_expr.rs | 4 - 8 files changed, 227 insertions(+), 127 deletions(-) create mode 100644 src/frontend/src/binder/relation/recursive_union.rs diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 3229f4d08be02..fbe2789179c79 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -17,6 +17,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::rc::Rc; +use either::Either; use parse_display::Display; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; @@ -26,6 +27,7 @@ use crate::error::{ErrorCode, Result}; type LiteResult = std::result::Result; +use super::BoundSetExpr; use crate::binder::{BoundQuery, ShareId, COLUMN_GROUP_PREFIX}; #[derive(Debug, Clone)] @@ -78,12 +80,12 @@ pub struct LateralBindContext { /// WITH RECURSIVE t(n) AS ( /// # -------------^ => Init /// VALUES (1) +/// # ----------^ => BaseResolved (after binding the base term) /// UNION ALL /// SELECT n+1 FROM t WHERE n < 100 -/// # ------------------^ => BaseResolved +/// # ------------------^ => Bound (we know exactly what the entire cte looks like) /// ) /// SELECT sum(n) FROM t; -/// # -----------------^ => Bound /// ``` #[derive(Default, Debug, Clone)] pub enum BindingCteState { @@ -93,7 +95,26 @@ pub enum BindingCteState { /// We know the schema form after the base term resolved. BaseResolved { schema: Schema }, /// We get the whole bound result of the (recursive) CTE. - Bound { query: BoundQuery }, + Bound { + query: Either, + }, +} + +/// the entire `RecursiveUnion` represents a *bound* recursive cte. +/// reference: +#[derive(Debug, Clone)] +pub struct RecursiveUnion { + /// currently this *must* be true, + /// otherwise binding will fail. + pub all: bool, + /// lhs part of the `UNION ALL` operator + pub base: Box, + /// rhs part of the `UNION ALL` operator + pub recursive: Box, + /// the aligned schema for this union + /// will be the *same* schema as recursive's + /// this is just for a better readability + pub schema: Schema, } #[derive(Clone, Debug)] @@ -116,7 +137,7 @@ pub struct BindContext { // The `BindContext`'s data on its column groups pub column_group_context: ColumnGroupContext, /// Map the cte's name to its binding state. - /// The `ShareId` of the value is used to help the planner identify the share plan. + /// The `ShareId` in `BindingCte` of the value is used to help the planner identify the share plan. pub cte_to_relation: HashMap>>, /// Current lambda functions's arguments pub lambda_args: Option>, @@ -341,13 +362,19 @@ impl BindContext { entry.extend(v.into_iter().map(|x| x + begin)); } for (k, (x, y)) in other.range_of { - match self.range_of.entry(k) { + match self.range_of.entry(k.clone()) { Entry::Occupied(e) => { - return Err(ErrorCode::InternalError(format!( - "Duplicated table name while merging adjacent contexts: {}", - e.key() - )) - .into()); + if let BindingCteState::Bound { .. } = + self.cte_to_relation.get(&k).unwrap().borrow().state.clone() + { + // do nothing + } else { + return Err(ErrorCode::InternalError(format!( + "Duplicated table name while merging adjacent contexts: {}", + e.key() + )) + .into()); + } } Entry::Vacant(entry) => { entry.insert((begin + x, begin + y)); diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index ef957225c7b47..37a97e8bf365e 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -27,7 +27,7 @@ use thiserror_ext::AsReport; use super::bind_context::BindingCteState; use super::statement::RewriteExprsRecursive; use super::BoundValues; -use crate::binder::bind_context::BindingCte; +use crate::binder::bind_context::{BindingCte, RecursiveUnion}; use crate::binder::{Binder, BoundSetExpr}; use crate::error::{ErrorCode, Result}; use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter}; @@ -287,6 +287,7 @@ impl Binder { let share_id = self.next_share_id(); let Cte { alias, query, .. } = cte_table; let table_name = alias.name.real_value(); + if with.recursive { let Query { with, @@ -296,6 +297,8 @@ impl Binder { offset, fetch, } = query; + + /// the input clause should not be supported. fn should_be_empty(v: Option, clause: &str) -> Result<()> { if v.is_some() { return Err(ErrorCode::BindError(format!( @@ -305,6 +308,7 @@ impl Binder { } Ok(()) } + should_be_empty(order_by.first(), "ORDER BY")?; should_be_empty(limit, "LIMIT")?; should_be_empty(offset, "OFFSET")?; @@ -315,7 +319,7 @@ impl Binder { all, left, right, - } = body + } = body.clone() else { return Err(ErrorCode::BindError( "`UNION` is required in recursive CTE".to_string(), @@ -346,37 +350,52 @@ impl Binder { self.bind_with(with)?; } - // We assume `left` is base term, otherwise the implementation may be very hard. - // The behavior is same as PostgreSQL. - // https://www.postgresql.org/docs/16/sql-select.html#:~:text=the%20recursive%20self%2Dreference%20must%20appear%20on%20the%20right%2Dhand%20side%20of%20the%20UNION - let bound_base = self.bind_set_expr(*left)?; + // We assume `left` is the base term, otherwise the implementation may be very hard. + // The behavior is the same as PostgreSQL's. + // reference: + let mut base = self.bind_set_expr(*left)?; entry.borrow_mut().state = BindingCteState::BaseResolved { - schema: bound_base.schema().clone(), + schema: base.schema().clone(), }; - let bound_recursive = self.bind_set_expr(*right)?; - - let bound_query = BoundQuery { - body: BoundSetExpr::RecursiveUnion { - base: Box::new(bound_base), - recursive: Box::new(bound_recursive), - }, - order: vec![], - limit: None, - offset: None, - with_ties: false, - extra_order_exprs: vec![], + // bind the rest of the recursive cte + let mut recursive = self.bind_set_expr(*right)?; + + // todo: add validate check here for *bound* `base` and `recursive` + Self::align_schema(&mut base, &mut recursive, SetOperator::Union)?; + + // please note that even after aligning, the schema of `left` + // may not be the same as `right`; this is because there may + // be case(s) where the `base` term is just a value, and the + // `recursive` term is a select expression / statement. + let schema = recursive.schema().clone(); + // yet another sanity check + assert_eq!( + schema, + recursive.schema().clone(), + "expect `schema` to be the same as recursive's" + ); + + let recursive_union = RecursiveUnion { + all, + base: Box::new(base), + recursive: Box::new(recursive), + schema, }; - entry.borrow_mut().state = BindingCteState::Bound { query: bound_query }; + entry.borrow_mut().state = BindingCteState::Bound { + query: either::Either::Right(recursive_union), + }; } else { let bound_query = self.bind_query(query)?; self.context.cte_to_relation.insert( table_name, Rc::new(RefCell::new(BindingCte { share_id, - state: BindingCteState::Bound { query: bound_query }, + state: BindingCteState::Bound { + query: either::Either::Left(bound_query), + }, alias, })), ); diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index d8d6ad7df08bc..17ea96b49cce9 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -15,6 +15,7 @@ use std::collections::hash_map::Entry; use std::ops::Deref; +use either::Either::{Left, Right}; use itertools::{EitherOrBoth, Itertools}; use risingwave_common::bail; use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME}; @@ -26,6 +27,7 @@ use thiserror::Error; use thiserror_ext::AsReport; use self::cte_ref::BoundBackCteRef; +use self::recursive_union::BoundRecursiveUnion; use super::bind_context::ColumnBinding; use super::statement::RewriteExprsRecursive; use crate::binder::bind_context::{BindingCte, BindingCteState}; @@ -35,6 +37,7 @@ use crate::expr::{ExprImpl, InputRef}; mod cte_ref; mod join; +mod recursive_union; mod share; mod subquery; mod table_function; @@ -70,6 +73,7 @@ pub enum Relation { Watermark(Box), Share(Box), BackCteRef(Box), + RecursiveUnion(Box), } impl RewriteExprsRecursive for Relation { @@ -85,6 +89,7 @@ impl RewriteExprsRecursive for Relation { *inner = rewriter.rewrite_expr(inner.take()) } Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter), + Relation::RecursiveUnion(inner) => inner.rewrite_exprs_recursive(rewriter), _ => {} } } @@ -342,7 +347,9 @@ impl Binder { as_of: Option, ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; + if schema_name.is_none() + // the `table_name` here is the name of the currently binding cte. && let Some(item) = self.context.cte_to_relation.get(&table_name) { // Handles CTE @@ -352,7 +359,9 @@ impl Binder { state: cte_state, alias: mut original_alias, } = item.deref().borrow().clone(); - debug_assert_eq!(original_alias.name.real_value(), table_name); // The original CTE alias ought to be its table name. + + // The original CTE alias ought to be its table name. + debug_assert_eq!(original_alias.name.real_value(), table_name); if let Some(from_alias) = alias { original_alias.name = from_alias.name; @@ -366,7 +375,7 @@ impl Binder { match cte_state { BindingCteState::Init => { - Err(ErrorCode::BindError("Base term of recursive CTE not found, consider write it to left side of the `UNION` operator".to_string()).into()) + Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_string()).into()) } BindingCteState::BaseResolved { schema } => { self.bind_table_to_context( @@ -377,23 +386,40 @@ impl Binder { Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id }))) } BindingCteState::Bound { query } => { + let schema = match query.clone() { + Left(normal) => normal.body.schema().clone(), + Right(recursive) => recursive.schema.clone(), + }; self.bind_table_to_context( - query - .body - .schema() + schema .fields .iter() .map(|f| (false, f.clone())), table_name.clone(), Some(original_alias), )?; - // Share the CTE. - let input_relation = Relation::Subquery(Box::new(BoundSubquery { - query, - lateral: false, - })); + // todo: to be further reviewed + let input_relation = match query { + // normal cte with union + Left(query) => { + Relation::Subquery(Box::new(BoundSubquery { + query, + lateral: false, + })) + } + // recursive cte + Right(recursive) => { + Relation::RecursiveUnion(Box::new(BoundRecursiveUnion { + base: *recursive.base, + recursive: *recursive.recursive, + })) + } + }; + // we could always share the cte, + // no matter it's recursive or not. let share_relation = Relation::Share(Box::new(BoundShare { share_id, + // should either be a *bound* `subquery` or `recursive union` input: input_relation, })); Ok(share_relation) diff --git a/src/frontend/src/binder/relation/recursive_union.rs b/src/frontend/src/binder/relation/recursive_union.rs new file mode 100644 index 0000000000000..2181403f49dbf --- /dev/null +++ b/src/frontend/src/binder/relation/recursive_union.rs @@ -0,0 +1,35 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::binder::statement::RewriteExprsRecursive; +use crate::binder::BoundSetExpr; + +/// a *bound* recursive union representation. +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct BoundRecursiveUnion { + /// the *bound* base case + pub(crate) base: BoundSetExpr, + /// the *bound* recursive case + pub(crate) recursive: BoundSetExpr, +} + +impl RewriteExprsRecursive for BoundRecursiveUnion { + fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { + // rewrite base case + self.base.rewrite_exprs_recursive(rewriter); + // rewrite recursive case + self.recursive.rewrite_exprs_recursive(rewriter); + } +} diff --git a/src/frontend/src/binder/set_expr.rs b/src/frontend/src/binder/set_expr.rs index 64c38657b3b4b..de6e3f28468f1 100644 --- a/src/frontend/src/binder/set_expr.rs +++ b/src/frontend/src/binder/set_expr.rs @@ -36,11 +36,6 @@ pub enum BoundSetExpr { left: Box, right: Box, }, - /// UNION in recursive CTE definition - RecursiveUnion { - base: Box, - recursive: Box, - }, } impl RewriteExprsRecursive for BoundSetExpr { @@ -53,10 +48,6 @@ impl RewriteExprsRecursive for BoundSetExpr { left.rewrite_exprs_recursive(rewriter); right.rewrite_exprs_recursive(rewriter); } - BoundSetExpr::RecursiveUnion { base, recursive } => { - base.rewrite_exprs_recursive(rewriter); - recursive.rewrite_exprs_recursive(rewriter); - } } } } @@ -87,7 +78,6 @@ impl BoundSetExpr { BoundSetExpr::Values(v) => v.schema(), BoundSetExpr::Query(q) => q.schema(), BoundSetExpr::SetOperation { left, .. } => left.schema(), - BoundSetExpr::RecursiveUnion { base, .. } => base.schema(), } } @@ -99,9 +89,6 @@ impl BoundSetExpr { BoundSetExpr::SetOperation { left, right, .. } => { left.is_correlated(depth) || right.is_correlated(depth) } - BoundSetExpr::RecursiveUnion { base, recursive } => { - base.is_correlated(depth) || recursive.is_correlated(depth) - } } } @@ -130,22 +117,83 @@ impl BoundSetExpr { ); correlated_indices } - BoundSetExpr::RecursiveUnion { base, recursive } => { - let mut correlated_indices = vec![]; - correlated_indices.extend( - base.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id), - ); - correlated_indices.extend( - recursive - .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id), - ); - correlated_indices - } } } } impl Binder { + /// note: align_schema only works when the `left` and `right` + /// are both select expression(s). + pub(crate) fn align_schema( + mut left: &mut BoundSetExpr, + mut right: &mut BoundSetExpr, + op: SetOperator, + ) -> Result<()> { + if left.schema().fields.len() != right.schema().fields.len() { + return Err(ErrorCode::InvalidInputSyntax(format!( + "each {} query must have the same number of columns", + op + )) + .into()); + } + + // handle type alignment for select union select + // e.g., select 1 UNION ALL select NULL + if let (BoundSetExpr::Select(l_select), BoundSetExpr::Select(r_select)) = + (&mut left, &mut right) + { + for (i, (l, r)) in l_select + .select_items + .iter_mut() + .zip_eq_fast(r_select.select_items.iter_mut()) + .enumerate() + { + let Ok(column_type) = align_types(vec![l, r].into_iter()) else { + return Err(ErrorCode::InvalidInputSyntax(format!( + "{} types {} and {} cannot be matched. Columns' name are `{}` and `{}`.", + op, + l_select.schema.fields[i].data_type, + r_select.schema.fields[i].data_type, + l_select.schema.fields[i].name, + r_select.schema.fields[i].name, + )) + .into()); + }; + l_select.schema.fields[i].data_type = column_type.clone(); + r_select.schema.fields[i].data_type = column_type; + } + } + + Self::validate(left, right, op) + } + + /// validate the schema, should be called after aligning. + pub(crate) fn validate( + left: &BoundSetExpr, + right: &BoundSetExpr, + op: SetOperator, + ) -> Result<()> { + for (a, b) in left + .schema() + .fields + .iter() + .zip_eq_fast(right.schema().fields.iter()) + { + if a.data_type != b.data_type { + return Err(ErrorCode::InvalidInputSyntax(format!( + "{} types {} and {} cannot be matched. Columns' name are {} and {}.", + op, + a.data_type.prost_type_name().as_str_name(), + b.data_type.prost_type_name().as_str_name(), + a.name, + b.name, + )) + .into()); + } + } + Ok(()) + } + pub(super) fn bind_set_expr(&mut self, set_expr: SetExpr) -> Result { match set_expr { SetExpr::Select(s) => Ok(BoundSetExpr::Select(Box::new(self.bind_select(*s)?))), @@ -157,7 +205,7 @@ impl Binder { left, right, } => { - match op { + match op.clone() { SetOperator::Union | SetOperator::Intersect | SetOperator::Except => { let mut left = self.bind_set_expr(*left)?; // Reset context for right side, but keep `cte_to_relation`. @@ -175,51 +223,7 @@ impl Binder { .into()); } - // Handle type alignment for select union select - // E.g. Select 1 UNION ALL Select NULL - if let (BoundSetExpr::Select(l_select), BoundSetExpr::Select(r_select)) = - (&mut left, &mut right) - { - for (i, (l, r)) in l_select - .select_items - .iter_mut() - .zip_eq_fast(r_select.select_items.iter_mut()) - .enumerate() - { - let Ok(column_type) = align_types(vec![l, r].into_iter()) else { - return Err(ErrorCode::InvalidInputSyntax(format!( - "{} types {} and {} cannot be matched. Columns' name are `{}` and `{}`.", - op, - l_select.schema.fields[i].data_type, - r_select.schema.fields[i].data_type, - l_select.schema.fields[i].name, - r_select.schema.fields[i].name, - )) - .into()); - }; - l_select.schema.fields[i].data_type = column_type.clone(); - r_select.schema.fields[i].data_type = column_type; - } - } - - for (a, b) in left - .schema() - .fields - .iter() - .zip_eq_fast(right.schema().fields.iter()) - { - if a.data_type != b.data_type { - return Err(ErrorCode::InvalidInputSyntax(format!( - "{} types {} and {} cannot be matched. Columns' name are {} and {}.", - op, - a.data_type.prost_type_name().as_str_name(), - b.data_type.prost_type_name().as_str_name(), - a.name, - b.name, - )) - .into()); - } - } + Self::align_schema(&mut left, &mut right, op.clone())?; if all { match op { diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index e6b2c0d382b90..893ae425b8513 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -483,10 +483,6 @@ impl ExprImpl { self.visit_bound_set_expr(left); self.visit_bound_set_expr(right); } - BoundSetExpr::RecursiveUnion { base, recursive } => { - self.visit_bound_set_expr(base); - self.visit_bound_set_expr(recursive); - } }; } } @@ -528,10 +524,6 @@ impl ExprImpl { self.visit_bound_set_expr(left); self.visit_bound_set_expr(right); } - BoundSetExpr::RecursiveUnion { base, recursive } => { - self.visit_bound_set_expr(base); - self.visit_bound_set_expr(recursive); - } } } } @@ -601,10 +593,6 @@ impl ExprImpl { self.visit_bound_set_expr(&mut *left); self.visit_bound_set_expr(&mut *right); } - BoundSetExpr::RecursiveUnion { base, recursive } => { - self.visit_bound_set_expr(&mut *base); - self.visit_bound_set_expr(&mut *recursive); - } } } } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 4d83bee2fba2e..5393341f13f4f 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -58,6 +58,11 @@ impl Planner { Relation::BackCteRef(..) => { bail_not_implemented!(issue = 15135, "recursive CTE is not supported") } + // todo: ensure this will always be wrapped in a `Relation::Share` + // so that it will not be explicitly planned here + Relation::RecursiveUnion(..) => { + bail_not_implemented!(issue = 15135, "recursive CTE is not supported") + } } } diff --git a/src/frontend/src/planner/set_expr.rs b/src/frontend/src/planner/set_expr.rs index eeb789e9d9ded..e2ff43a2c211b 100644 --- a/src/frontend/src/planner/set_expr.rs +++ b/src/frontend/src/planner/set_expr.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::bail_not_implemented; use risingwave_common::util::sort_util::ColumnOrder; use crate::binder::BoundSetExpr; @@ -38,9 +37,6 @@ impl Planner { left, right, } => self.plan_set_operation(op, all, *left, *right), - BoundSetExpr::RecursiveUnion { .. } => { - bail_not_implemented!(issue = 15135, "recursive CTE is not supported") - } } } } From 6b84174ba50570e92aa9fb9872ffdcb5f3587154 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 4 Apr 2024 11:17:27 -0400 Subject: [PATCH 08/18] fix nit; cleanup comment --- src/frontend/src/binder/query.rs | 6 ++---- src/frontend/src/binder/relation/mod.rs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index 37a97e8bf365e..87759908b3307 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -365,10 +365,8 @@ impl Binder { // todo: add validate check here for *bound* `base` and `recursive` Self::align_schema(&mut base, &mut recursive, SetOperator::Union)?; - // please note that even after aligning, the schema of `left` - // may not be the same as `right`; this is because there may - // be case(s) where the `base` term is just a value, and the - // `recursive` term is a select expression / statement. + // the final schema should be sticked with `recursive`'s + // e.g., union all - let schema = recursive.schema().clone(); - // yet another sanity check - assert_eq!( - schema, - recursive.schema().clone(), - "expect `schema` to be the same as recursive's" - ); + let schema = base.schema().clone(); let recursive_union = RecursiveUnion { all, diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 847f99aa22cf6..b7c288551191c 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -15,7 +15,7 @@ use std::collections::hash_map::Entry; use std::ops::Deref; -use either::Either::{self, Left, Right}; +use either::Either; use itertools::{EitherOrBoth, Itertools}; use risingwave_common::bail; use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME}; @@ -383,32 +383,19 @@ impl Binder { Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id }))) } BindingCteState::Bound { query } => { - let schema = match query.clone() { - Left(normal) => normal.schema().clone(), - Right(recursive) => recursive.schema.clone(), + let schema = match &query { + Either::Left(normal) => normal.schema(), + Either::Right(recursive) => &recursive.schema, }; self.bind_table_to_context( - schema - .fields - .iter() - .map(|f| (false, f.clone())), + schema.fields.iter().map(|f| (false, f.clone())), table_name.clone(), Some(original_alias), )?; - let input = match query { - // normal cte with union - Left(query) => Either::Left(query), - // recursive cte - Right(recursive) => Either::Right(recursive), - }; // we could always share the cte, // no matter it's recursive or not. - let share_relation = Relation::Share(Box::new(BoundShare { - share_id, - // should either be a `BoundQuery` or `RecursiveUnion` - input, - })); - Ok(share_relation) + let input = query; + Ok(Relation::Share(Box::new(BoundShare { share_id, input }))) } } } else { diff --git a/src/frontend/src/binder/relation/share.rs b/src/frontend/src/binder/relation/share.rs index 3703da211eb32..3a64189bd8861 100644 --- a/src/frontend/src/binder/relation/share.rs +++ b/src/frontend/src/binder/relation/share.rs @@ -14,7 +14,6 @@ use either::Either; -use super::{BoundSubquery, Relation}; use crate::binder::bind_context::RecursiveUnion; use crate::binder::statement::RewriteExprsRecursive; use crate::binder::{BoundQuery, ShareId}; @@ -29,30 +28,9 @@ pub struct BoundShare { impl RewriteExprsRecursive for BoundShare { fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { - let rewrite = match self.input.clone() { - Either::Left(mut q) => { - q.rewrite_exprs_recursive(rewriter); - Either::Left(q) - } - Either::Right(mut r) => { - r.rewrite_exprs_recursive(rewriter); - Either::Right(r) - } + match &mut self.input { + Either::Left(q) => q.rewrite_exprs_recursive(rewriter), + Either::Right(r) => r.rewrite_exprs_recursive(rewriter), }; - self.input = rewrite; - } -} - -/// from inner `BoundQuery` to `Relation::Subquery` -impl From for Relation { - fn from(value: BoundShare) -> Self { - let Either::Left(q) = value.input else { - // leave it intact - return Self::Share(Box::new(value)); - }; - Self::Subquery(Box::new(BoundSubquery { - query: q, - lateral: false, - })) } } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 4dc15e160eb39..d593a7308f39f 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -56,12 +56,7 @@ impl Planner { } => self.plan_table_function(tf, with_ordinality), Relation::Watermark(tf) => self.plan_watermark(*tf), // note that rcte (i.e., RecursiveUnion) is included *implicitly* in share. - Relation::Share(share) => { - if let Either::Right(_) = share.input.clone() { - bail_not_implemented!(issue = 15135, "recursive CTE is not supported"); - } - self.plan_share(*share) - } + Relation::Share(share) => self.plan_share(*share), Relation::BackCteRef(..) => { bail_not_implemented!(issue = 15135, "recursive CTE is not supported") } @@ -223,10 +218,15 @@ impl Planner { } pub(super) fn plan_share(&mut self, share: BoundShare) -> Result { + let Either::Left(nonrecursive_query) = share.input else { + bail_not_implemented!(issue = 15135, "recursive CTE is not supported"); + }; let id = share.share_id; match self.share_cache.get(&id) { None => { - let result = self.plan_relation(share.into())?; + let result = self + .plan_query(nonrecursive_query)? + .into_unordered_subplan(); let logical_share = LogicalShare::create(result); self.share_cache.insert(id, logical_share.clone()); Ok(logical_share) @@ -253,17 +253,11 @@ impl Planner { .iter() .map(|col| col.data_type().clone()) .collect(), - Relation::Subquery(q) => q - .query - .schema() - .fields - .iter() - .map(|f| f.data_type()) - .collect(), - Relation::Share(share) => { - let s = share.clone(); - Self::collect_col_data_types_for_tumble_window(&((*s).into()))? - } + Relation::Subquery(q) => q.query.schema().data_types(), + Relation::Share(share) => match &share.input { + Either::Left(nonrecursive) => nonrecursive.schema().data_types(), + Either::Right(recursive) => recursive.schema.data_types(), + }, r => { return Err(ErrorCode::BindError(format!( "Invalid input relation to tumble: {r:?}"