diff --git a/Cargo.lock b/Cargo.lock index 6135ee839bfe7..138d43545f078 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9872,6 +9872,7 @@ dependencies = [ "educe 0.5.7", "either", "enum-as-inner", + "expect-test", "fancy-regex", "fixedbitset 0.5.0", "futures", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 2c688179698b6..962677cf86ccd 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -102,6 +102,7 @@ workspace-hack = { path = "../workspace-hack" } [dev-dependencies] assert_matches = "1" +expect-test = "1" risingwave_expr_impl = { workspace = true } tempfile = "3" diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 386ed55c05aa7..21a987cbfac01 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::rc::Rc; +use either::Either; use parse_display::Display; -use risingwave_common::catalog::Field; +use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::TableAlias; @@ -25,6 +27,8 @@ use crate::error::{ErrorCode, Result}; type LiteResult = std::result::Result; +use super::statement::RewriteExprsRecursive; +use super::BoundSetExpr; use crate::binder::{BoundQuery, ShareId, COLUMN_GROUP_PREFIX}; #[derive(Debug, Clone)] @@ -67,6 +71,68 @@ pub struct LateralBindContext { pub context: BindContext, } +/// For recursive CTE, we may need to store it in `cte_to_relation` first, +/// and then bind it *step by step*. +/// +/// note: the below sql example is to illustrate when we get the +/// corresponding binding state when handling a recursive CTE like this. +/// +/// ```sql +/// WITH RECURSIVE t(n) AS ( +/// # -------------^ => Init +/// VALUES (1) +/// # ----------^ => BaseResolved (after binding the base term) +/// UNION ALL +/// SELECT n+1 FROM t WHERE n < 100 +/// # ------------------^ => Bound (we know exactly what the entire cte looks like) +/// ) +/// SELECT sum(n) FROM t; +/// ``` +#[derive(Default, Debug, Clone)] +pub enum BindingCteState { + /// We know nothing about the CTE before resolving the body. + #[default] + Init, + /// We know the schema form after the base term resolved. + BaseResolved { schema: Schema }, + /// We get the whole bound result of the (recursive) CTE. + Bound { + query: Either, + }, +} + +/// the entire `RecursiveUnion` represents a *bound* recursive cte. +/// reference: +#[derive(Debug, Clone)] +pub struct RecursiveUnion { + /// currently this *must* be true, + /// otherwise binding will fail. + pub all: bool, + /// lhs part of the `UNION ALL` operator + pub base: Box, + /// rhs part of the `UNION ALL` operator + pub recursive: Box, + /// the aligned schema for this union + /// will be the *same* schema as recursive's + /// this is just for a better readability + pub schema: Schema, +} + +impl RewriteExprsRecursive for RecursiveUnion { + fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { + // rewrite `base` and `recursive` separately + self.base.rewrite_exprs_recursive(rewriter); + self.recursive.rewrite_exprs_recursive(rewriter); + } +} + +#[derive(Clone, Debug)] +pub struct BindingCte { + pub share_id: ShareId, + pub state: BindingCteState, + pub alias: TableAlias, +} + #[derive(Default, Debug, Clone)] pub struct BindContext { // Columns of all tables. @@ -79,9 +145,9 @@ pub struct BindContext { pub clause: Option, // The `BindContext`'s data on its column groups pub column_group_context: ColumnGroupContext, - /// Map the cte's name to its `Relation::Subquery`. - /// The `ShareId` of the value is used to help the planner identify the share plan. - pub cte_to_relation: HashMap>, + /// Map the cte's name to its binding state. + /// The `ShareId` in `BindingCte` of the value is used to help the planner identify the share plan. + pub cte_to_relation: HashMap>>, /// Current lambda functions's arguments pub lambda_args: Option>, } @@ -305,8 +371,16 @@ impl BindContext { entry.extend(v.into_iter().map(|x| x + begin)); } for (k, (x, y)) in other.range_of { - match self.range_of.entry(k) { + match self.range_of.entry(k.clone()) { Entry::Occupied(e) => { + // check if this is a merge with recursive cte + if let Some(r) = self.cte_to_relation.get(&k) { + if let BindingCteState::Bound { .. } = r.borrow().state { + // no-op + continue; + } + } + // otherwise this merge in invalid return Err(ErrorCode::InternalError(format!( "Duplicated table name while merging adjacent contexts: {}", e.key() diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index aa62e8f772e03..2b2765bdba97a 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -535,3 +535,202 @@ pub mod test_utils { Binder::new_with_param_types(&SessionImpl::mock(), param_types) } } + +#[cfg(test)] +mod tests { + use expect_test::expect; + + use super::test_utils::*; + + #[tokio::test] + async fn test_rcte() { + let stmt = risingwave_sqlparser::parser::Parser::parse_sql( + "WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT * FROM t1", + ).unwrap().into_iter().next().unwrap(); + let mut binder = mock_binder(); + let bound = binder.bind(stmt).unwrap(); + + let expected = expect![[r#" + Query( + BoundQuery { + body: Select( + BoundSelect { + distinct: All, + select_items: [ + InputRef( + InputRef { + index: 0, + data_type: Int32, + }, + ), + InputRef( + InputRef { + index: 1, + data_type: Int32, + }, + ), + ], + aliases: [ + Some( + "a", + ), + Some( + "a", + ), + ], + from: Some( + Share( + BoundShare { + share_id: 0, + input: Right( + RecursiveUnion { + all: true, + base: Select( + BoundSelect { + distinct: All, + select_items: [ + Literal( + Literal { + data: Some( + Int32( + 1, + ), + ), + data_type: Some( + Int32, + ), + }, + ), + ], + aliases: [ + Some( + "a", + ), + ], + from: None, + where_clause: None, + group_by: GroupKey( + [], + ), + having: None, + schema: Schema { + fields: [ + a:Int32, + ], + }, + }, + ), + recursive: Select( + BoundSelect { + distinct: All, + select_items: [ + FunctionCall( + FunctionCall { + func_type: Add, + return_type: Int32, + inputs: [ + InputRef( + InputRef { + index: 0, + data_type: Int32, + }, + ), + Literal( + Literal { + data: Some( + Int32( + 1, + ), + ), + data_type: Some( + Int32, + ), + }, + ), + ], + }, + ), + ], + aliases: [ + None, + ], + from: Some( + BackCteRef( + BoundBackCteRef { + share_id: 0, + }, + ), + ), + where_clause: Some( + FunctionCall( + FunctionCall { + func_type: LessThan, + return_type: Boolean, + inputs: [ + InputRef( + InputRef { + index: 0, + data_type: Int32, + }, + ), + Literal( + Literal { + data: Some( + Int32( + 10, + ), + ), + data_type: Some( + Int32, + ), + }, + ), + ], + }, + ), + ), + group_by: GroupKey( + [], + ), + having: None, + schema: Schema { + fields: [ + ?column?:Int32, + ], + }, + }, + ), + schema: Schema { + fields: [ + a:Int32, + ], + }, + }, + ), + }, + ), + ), + where_clause: None, + group_by: GroupKey( + [], + ), + having: None, + schema: Schema { + fields: [ + a:Int32, + a:Int32, + ], + }, + }, + ), + order: [], + limit: None, + offset: None, + with_ties: false, + extra_order_exprs: [], + }, + )"#]]; + + expected.assert_eq(&format!("{:#?}", bound)); + } +} diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index fe2008f50f3eb..fbacc1178d8d3 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -12,18 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; -use risingwave_common::bail_not_implemented; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_sqlparser::ast::{Cte, Expr, Fetch, OrderByExpr, Query, Value, With}; +use risingwave_sqlparser::ast::{ + Cte, Expr, Fetch, OrderByExpr, Query, SetExpr, SetOperator, Value, With, +}; use thiserror_ext::AsReport; +use super::bind_context::BindingCteState; use super::statement::RewriteExprsRecursive; use super::BoundValues; +use crate::binder::bind_context::{BindingCte, RecursiveUnion}; use crate::binder::{Binder, BoundSetExpr}; use crate::error::{ErrorCode, Result}; use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter}; @@ -279,20 +283,113 @@ impl Binder { } fn bind_with(&mut self, with: With) -> Result<()> { - if with.recursive { - bail_not_implemented!("recursive cte"); - } else { - for cte_table in with.cte_tables { - let Cte { alias, query, .. } = cte_table; - let table_name = alias.name.real_value(); - let bound_query = self.bind_query(query)?; - let share_id = self.next_share_id(); - self.context + for cte_table in with.cte_tables { + let share_id = self.next_share_id(); + let Cte { alias, query, .. } = cte_table; + let table_name = alias.name.real_value(); + + if with.recursive { + let Query { + with, + body, + order_by, + limit, + offset, + fetch, + } = query; + + /// the input clause should not be supported. + fn should_be_empty(v: Option, clause: &str) -> Result<()> { + if v.is_some() { + return Err(ErrorCode::BindError(format!( + "`{clause}` is not supported in recursive CTE" + )) + .into()); + } + Ok(()) + } + + should_be_empty(order_by.first(), "ORDER BY")?; + should_be_empty(limit, "LIMIT")?; + should_be_empty(offset, "OFFSET")?; + should_be_empty(fetch, "FETCH")?; + + let SetExpr::SetOperation { + op: SetOperator::Union, + all, + left, + right, + } = body + else { + return Err(ErrorCode::BindError( + "`UNION` is required in recursive CTE".to_string(), + ) + .into()); + }; + + if !all { + return Err(ErrorCode::BindError( + "only `UNION ALL` is supported in recursive CTE now".to_string(), + ) + .into()); + } + + let entry = self + .context .cte_to_relation - .insert(table_name, Rc::new((share_id, bound_query, alias))); + .entry(table_name) + .insert_entry(Rc::new(RefCell::new(BindingCte { + share_id, + state: BindingCteState::Init, + alias, + }))) + .get() + .clone(); + + if let Some(with) = with { + self.bind_with(with)?; + } + + // We assume `left` is the base term, otherwise the implementation may be very hard. + // The behavior is the same as PostgreSQL's. + // reference: + let mut base = self.bind_set_expr(*left)?; + + entry.borrow_mut().state = BindingCteState::BaseResolved { + schema: base.schema().clone(), + }; + + // bind the rest of the recursive cte + let mut recursive = self.bind_set_expr(*right)?; + + Self::align_schema(&mut base, &mut recursive, SetOperator::Union)?; + let schema = base.schema().clone(); + + let recursive_union = RecursiveUnion { + all, + base: Box::new(base), + recursive: Box::new(recursive), + schema, + }; + + entry.borrow_mut().state = BindingCteState::Bound { + query: either::Either::Right(recursive_union), + }; + } else { + let bound_query = self.bind_query(query)?; + self.context.cte_to_relation.insert( + table_name, + Rc::new(RefCell::new(BindingCte { + share_id, + state: BindingCteState::Bound { + query: either::Either::Left(bound_query), + }, + alias, + })), + ); } - Ok(()) } + Ok(()) } } diff --git a/src/frontend/src/binder/relation/cte_ref.rs b/src/frontend/src/binder/relation/cte_ref.rs new file mode 100644 index 0000000000000..86c40cc096dd1 --- /dev/null +++ b/src/frontend/src/binder/relation/cte_ref.rs @@ -0,0 +1,28 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::binder::statement::RewriteExprsRecursive; +use crate::binder::ShareId; + +/// A CTE reference, currently only used in the back reference of recursive CTE. +/// For the non-recursive one, see [`BoundShare`](super::BoundShare). +#[derive(Debug, Clone)] +pub struct BoundBackCteRef { + #[expect(dead_code)] + pub(crate) share_id: ShareId, +} + +impl RewriteExprsRecursive for BoundBackCteRef { + fn rewrite_exprs_recursive(&mut self, _rewriter: &mut impl crate::expr::ExprRewriter) {} +} diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 1675c9bc762ce..b7c288551191c 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -15,6 +15,7 @@ use std::collections::hash_map::Entry; use std::ops::Deref; +use either::Either; use itertools::{EitherOrBoth, Itertools}; use risingwave_common::bail; use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME}; @@ -25,12 +26,15 @@ use risingwave_sqlparser::ast::{ use thiserror::Error; use thiserror_ext::AsReport; +use self::cte_ref::BoundBackCteRef; use super::bind_context::ColumnBinding; use super::statement::RewriteExprsRecursive; +use crate::binder::bind_context::{BindingCte, BindingCteState}; use crate::binder::Binder; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, InputRef}; +mod cte_ref; mod join; mod share; mod subquery; @@ -65,7 +69,9 @@ pub enum Relation { with_ordinality: bool, }, Watermark(Box), + /// rcte is implicitly included in share Share(Box), + BackCteRef(Box), } impl RewriteExprsRecursive for Relation { @@ -80,6 +86,7 @@ impl RewriteExprsRecursive for Relation { Relation::TableFunction { expr: inner, .. } => { *inner = rewriter.rewrite_expr(inner.take()) } + Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter), _ => {} } } @@ -337,13 +344,21 @@ impl Binder { as_of: Option, ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; + if schema_name.is_none() + // the `table_name` here is the name of the currently binding cte. && let Some(item) = self.context.cte_to_relation.get(&table_name) { // Handles CTE - let (share_id, query, mut original_alias) = item.deref().clone(); - debug_assert_eq!(original_alias.name.real_value(), table_name); // The original CTE alias ought to be its table name. + let BindingCte { + share_id, + state: cte_state, + alias: mut original_alias, + } = item.deref().borrow().clone(); + + // The original CTE alias ought to be its table name. + debug_assert_eq!(original_alias.name.real_value(), table_name); if let Some(from_alias) = alias { original_alias.name = from_alias.name; @@ -355,27 +370,34 @@ impl Binder { .collect(); } - self.bind_table_to_context( - query - .body - .schema() - .fields - .iter() - .map(|f| (false, f.clone())), - table_name.clone(), - Some(original_alias), - )?; - - // Share the CTE. - let input_relation = Relation::Subquery(Box::new(BoundSubquery { - query, - lateral: false, - })); - let share_relation = Relation::Share(Box::new(BoundShare { - share_id, - input: input_relation, - })); - Ok(share_relation) + match cte_state { + BindingCteState::Init => { + Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_string()).into()) + } + BindingCteState::BaseResolved { schema } => { + self.bind_table_to_context( + schema.fields.iter().map(|f| (false, f.clone())), + table_name.clone(), + Some(original_alias), + )?; + Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id }))) + } + BindingCteState::Bound { query } => { + let schema = match &query { + Either::Left(normal) => normal.schema(), + Either::Right(recursive) => &recursive.schema, + }; + self.bind_table_to_context( + schema.fields.iter().map(|f| (false, f.clone())), + table_name.clone(), + Some(original_alias), + )?; + // we could always share the cte, + // no matter it's recursive or not. + let input = query; + Ok(Relation::Share(Box::new(BoundShare { share_id, input }))) + } + } } else { self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, as_of) } diff --git a/src/frontend/src/binder/relation/share.rs b/src/frontend/src/binder/relation/share.rs index 28b2306f3d9b5..3a64189bd8861 100644 --- a/src/frontend/src/binder/relation/share.rs +++ b/src/frontend/src/binder/relation/share.rs @@ -12,19 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use either::Either; + +use crate::binder::bind_context::RecursiveUnion; use crate::binder::statement::RewriteExprsRecursive; -use crate::binder::{Relation, ShareId}; +use crate::binder::{BoundQuery, ShareId}; /// Share a relation during binding and planning. -/// It could be used to share a CTE, a source, a view and so on. +/// It could be used to share a (recursive) CTE, a source, a view and so on. #[derive(Debug, Clone)] pub struct BoundShare { pub(crate) share_id: ShareId, - pub(crate) input: Relation, + pub(crate) input: Either, } impl RewriteExprsRecursive for BoundShare { fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { - self.input.rewrite_exprs_recursive(rewriter); + match &mut self.input { + Either::Left(q) => q.rewrite_exprs_recursive(rewriter), + Either::Right(r) => r.rewrite_exprs_recursive(rewriter), + }; } } diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 782c533cd76d4..c5283a2cc592a 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use either::Either; use itertools::Itertools; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{is_system_schema, Field}; @@ -24,7 +25,6 @@ use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; use super::BoundShare; -use crate::binder::relation::BoundSubquery; use crate::binder::{Binder, Relation}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; @@ -278,10 +278,7 @@ impl Binder { share_id } }; - let input = Relation::Subquery(Box::new(BoundSubquery { - query, - lateral: false, - })); + let input = Either::Left(query); Ok(( Relation::Share(Box::new(BoundShare { share_id, input })), columns.iter().map(|c| (false, c.clone())).collect_vec(), diff --git a/src/frontend/src/binder/set_expr.rs b/src/frontend/src/binder/set_expr.rs index e1905c2b9df9d..be4943d59defd 100644 --- a/src/frontend/src/binder/set_expr.rs +++ b/src/frontend/src/binder/set_expr.rs @@ -23,7 +23,7 @@ use crate::error::{ErrorCode, Result}; use crate::expr::{align_types, CorrelatedId, Depth}; /// Part of a validated query, without order or limit clause. It may be composed of smaller -/// `BoundSetExpr`s via set operators (e.g. union). +/// `BoundSetExpr`(s) via set operators (e.g., union). #[derive(Debug, Clone)] pub enum BoundSetExpr { Select(Box), @@ -122,6 +122,78 @@ impl BoundSetExpr { } impl Binder { + /// note: `align_schema` only works when the `left` and `right` + /// are both select expression(s). + pub(crate) fn align_schema( + mut left: &mut BoundSetExpr, + mut right: &mut BoundSetExpr, + op: SetOperator, + ) -> Result<()> { + if left.schema().fields.len() != right.schema().fields.len() { + return Err(ErrorCode::InvalidInputSyntax(format!( + "each {} query must have the same number of columns", + op + )) + .into()); + } + + // handle type alignment for select union select + // e.g., select 1 UNION ALL select NULL + if let (BoundSetExpr::Select(l_select), BoundSetExpr::Select(r_select)) = + (&mut left, &mut right) + { + for (i, (l, r)) in l_select + .select_items + .iter_mut() + .zip_eq_fast(r_select.select_items.iter_mut()) + .enumerate() + { + let Ok(column_type) = align_types(vec![l, r].into_iter()) else { + return Err(ErrorCode::InvalidInputSyntax(format!( + "{} types {} and {} cannot be matched. Columns' name are `{}` and `{}`.", + op, + l_select.schema.fields[i].data_type, + r_select.schema.fields[i].data_type, + l_select.schema.fields[i].name, + r_select.schema.fields[i].name, + )) + .into()); + }; + l_select.schema.fields[i].data_type = column_type.clone(); + r_select.schema.fields[i].data_type = column_type; + } + } + + Self::validate(left, right, op) + } + + /// validate the schema, should be called after aligning. + pub(crate) fn validate( + left: &BoundSetExpr, + right: &BoundSetExpr, + op: SetOperator, + ) -> Result<()> { + for (a, b) in left + .schema() + .fields + .iter() + .zip_eq_fast(right.schema().fields.iter()) + { + if a.data_type != b.data_type { + return Err(ErrorCode::InvalidInputSyntax(format!( + "{} types {} and {} cannot be matched. Columns' name are {} and {}.", + op, + a.data_type.prost_type_name().as_str_name(), + b.data_type.prost_type_name().as_str_name(), + a.name, + b.name, + )) + .into()); + } + } + Ok(()) + } + pub(super) fn bind_set_expr(&mut self, set_expr: SetExpr) -> Result { match set_expr { SetExpr::Select(s) => Ok(BoundSetExpr::Select(Box::new(self.bind_select(*s)?))), @@ -133,7 +205,7 @@ impl Binder { left, right, } => { - match op { + match op.clone() { SetOperator::Union | SetOperator::Intersect | SetOperator::Except => { let mut left = self.bind_set_expr(*left)?; // Reset context for right side, but keep `cte_to_relation`. @@ -151,51 +223,7 @@ impl Binder { .into()); } - // Handle type alignment for select union select - // E.g. Select 1 UNION ALL Select NULL - if let (BoundSetExpr::Select(l_select), BoundSetExpr::Select(r_select)) = - (&mut left, &mut right) - { - for (i, (l, r)) in l_select - .select_items - .iter_mut() - .zip_eq_fast(r_select.select_items.iter_mut()) - .enumerate() - { - let Ok(column_type) = align_types(vec![l, r].into_iter()) else { - return Err(ErrorCode::InvalidInputSyntax(format!( - "{} types {} and {} cannot be matched. Columns' name are `{}` and `{}`.", - op, - l_select.schema.fields[i].data_type, - r_select.schema.fields[i].data_type, - l_select.schema.fields[i].name, - r_select.schema.fields[i].name, - )) - .into()); - }; - l_select.schema.fields[i].data_type = column_type.clone(); - r_select.schema.fields[i].data_type = column_type; - } - } - - for (a, b) in left - .schema() - .fields - .iter() - .zip_eq_fast(right.schema().fields.iter()) - { - if a.data_type != b.data_type { - return Err(ErrorCode::InvalidInputSyntax(format!( - "{} types {} and {} cannot be matched. Columns' name are {} and {}.", - op, - a.data_type.prost_type_name().as_str_name(), - b.data_type.prost_type_name().as_str_name(), - a.name, - b.name, - )) - .into()); - } - } + Self::align_schema(&mut left, &mut right, op.clone())?; if all { match op { diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index b79aaee3cb4ab..7f41de5cdc190 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -35,6 +35,7 @@ #![feature(round_ties_even)] #![feature(iterator_try_collect)] #![feature(used_with_arg)] +#![feature(entry_insert)] #![recursion_limit = "256"] #[cfg(test)] diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 62e32ad4dc3d5..d593a7308f39f 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -14,6 +14,7 @@ use std::rc::Rc; +use either::Either; use itertools::Itertools; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Field, Schema}; @@ -54,7 +55,11 @@ impl Planner { with_ordinality, } => self.plan_table_function(tf, with_ordinality), Relation::Watermark(tf) => self.plan_watermark(*tf), + // note that rcte (i.e., RecursiveUnion) is included *implicitly* in share. Relation::Share(share) => self.plan_share(*share), + Relation::BackCteRef(..) => { + bail_not_implemented!(issue = 15135, "recursive CTE is not supported") + } } } @@ -213,12 +218,17 @@ impl Planner { } pub(super) fn plan_share(&mut self, share: BoundShare) -> Result { - match self.share_cache.get(&share.share_id) { + let Either::Left(nonrecursive_query) = share.input else { + bail_not_implemented!(issue = 15135, "recursive CTE is not supported"); + }; + let id = share.share_id; + match self.share_cache.get(&id) { None => { - let result = self.plan_relation(share.input)?; + let result = self + .plan_query(nonrecursive_query)? + .into_unordered_subplan(); let logical_share = LogicalShare::create(result); - self.share_cache - .insert(share.share_id, logical_share.clone()); + self.share_cache.insert(id, logical_share.clone()); Ok(logical_share) } Some(result) => Ok(result.clone()), @@ -243,14 +253,11 @@ impl Planner { .iter() .map(|col| col.data_type().clone()) .collect(), - Relation::Subquery(q) => q - .query - .schema() - .fields - .iter() - .map(|f| f.data_type()) - .collect(), - Relation::Share(share) => Self::collect_col_data_types_for_tumble_window(&share.input)?, + Relation::Subquery(q) => q.query.schema().data_types(), + Relation::Share(share) => match &share.input { + Either::Left(nonrecursive) => nonrecursive.schema().data_types(), + Either::Right(recursive) => recursive.schema.data_types(), + }, r => { return Err(ErrorCode::BindError(format!( "Invalid input relation to tumble: {r:?}"