From 2a1d9bf3b38bc4e3dee579a17491817cd00946ed Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 14 Jun 2024 16:07:42 +0800 Subject: [PATCH] fix comm --- e2e_test/streaming/changed_log.slt | 34 +++-- proto/stream_plan.proto | 4 +- src/frontend/src/binder/bind_context.rs | 2 +- src/frontend/src/binder/query.rs | 136 +++++++++--------- src/frontend/src/binder/relation/mod.rs | 4 +- src/frontend/src/binder/relation/share.rs | 8 +- src/frontend/src/binder/select.rs | 2 +- .../generic/{changed_log.rs => change_log.rs} | 43 +++--- .../src/optimizer/plan_node/generic/mod.rs | 4 +- ...l_changed_log.rs => logical_change_log.rs} | 82 ++++++----- src/frontend/src/optimizer/plan_node/mod.rs | 16 +-- ...am_changed_log.rs => stream_change_log.rs} | 26 ++-- .../planner/{changed_log.rs => change_log.rs} | 6 +- src/frontend/src/planner/mod.rs | 2 +- src/frontend/src/planner/relation.rs | 4 +- src/frontend/src/planner/select.rs | 2 +- src/meta/src/controller/rename.rs | 9 +- src/sqlparser/src/ast/mod.rs | 6 +- src/sqlparser/src/ast/query.rs | 20 +-- src/sqlparser/src/parser.rs | 45 +++--- src/sqlparser/tests/sqlparser_common.rs | 78 ++++++++++ .../{changed_log.rs => change_log.rs} | 38 ++--- src/stream/src/executor/mod.rs | 4 +- .../{changed_log.rs => change_log.rs} | 12 +- src/stream/src/from_proto/mod.rs | 6 +- src/tests/sqlsmith/src/reducer.rs | 6 +- src/tests/sqlsmith/src/sql_gen/query.rs | 4 +- 27 files changed, 344 insertions(+), 259 deletions(-) rename src/frontend/src/optimizer/plan_node/generic/{changed_log.rs => change_log.rs} (69%) rename src/frontend/src/optimizer/plan_node/{logical_changed_log.rs => logical_change_log.rs} (62%) rename src/frontend/src/optimizer/plan_node/{stream_changed_log.rs => stream_change_log.rs} (78%) rename src/frontend/src/planner/{changed_log.rs => change_log.rs} (80%) rename src/stream/src/executor/{changed_log.rs => change_log.rs} (63%) rename src/stream/src/from_proto/{changed_log.rs => change_log.rs} (76%) diff --git a/e2e_test/streaming/changed_log.slt b/e2e_test/streaming/changed_log.slt index be502683d9e7..199cbf381313 100644 --- a/e2e_test/streaming/changed_log.slt +++ b/e2e_test/streaming/changed_log.slt @@ -11,24 +11,27 @@ statement ok create table t3 (v1 int primary key, v2 int); statement ok -create materialized view mv1 as with sub as changedlog from t1 select * from sub; +create materialized view mv1 as with sub as changelog from t1 select * from sub; statement ok -create materialized view mv2 as with sub as changedlog from t2 select * from sub; +create materialized view mv2 as with sub as changelog from t2 select * from sub; statement ok -create materialized view mv3 as with sub as changedlog from t1 select v1, v2 from sub; +create materialized view mv3 as with sub as changelog from t1 select v1, v2 from sub; statement ok -create materialized view mv4 as with sub1 as changedlog from t1, sub2 as changedlog from t2 +create materialized view mv4 as with sub1 as changelog from t1, sub2 as changelog from t2 select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1 inner join sub2 on sub1.v1 = sub2.v1; statement ok -create materialized view mv5 as with sub1 as changedlog from t1, sub2 as changedlog from t2 +create materialized view mv5 as with sub1 as changelog from t1, sub2 as changelog from t2 select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.op as op1, sub2.op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1; statement ok -create materialized view mv6 as with sub as changedlog from t3 select * from sub; +create materialized view mv6 as with sub as changelog from t3 select * from sub; + +statement ok +create materialized view mv7(col1,col2,col3) as with sub as changelog from t3 select * from sub; statement ok insert into t1 values(1,1),(2,2); @@ -51,6 +54,12 @@ update t3 set v2 = 500 where v1 = 5; statement ok delete from t1 where v1 = 2; +statement ok +alter materialized view mv7 rename to mv7_rename; + +statement ok +alter table t3 rename to t3_rename; + query III rowsort select * from mv1 order by v1; ---- @@ -116,6 +125,16 @@ select * from mv6 order by v1; 5 500 3 6 6 1 +query III rowsort +select * from mv7_rename order by col1; +---- +5 5 1 +5 5 4 +5 500 3 +6 6 1 + +statement ok +drop materialized view mv7_rename; statement ok drop materialized view mv6; @@ -135,9 +154,8 @@ drop materialized view mv2; statement ok drop materialized view mv1; - statement ok -drop table t3; +drop table t3_rename; statement ok drop table t2; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 2ca844c1363c..febbf4dad7e8 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -287,7 +287,7 @@ message FilterNode { expr.ExprNode search_condition = 1; } -message ChangedLogNode { +message ChangeLogNode { bool need_op = 1; } @@ -816,7 +816,7 @@ message StreamNode { StreamCdcScanNode stream_cdc_scan = 139; CdcFilterNode cdc_filter = 140; SourceBackfillNode source_backfill = 142; - ChangedLogNode changed_log = 143; + ChangeLogNode change_log = 143; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 07bd65dce42c..ad9ca59ae44f 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -103,7 +103,7 @@ pub enum BindingCteState { query: Either, }, - ChangedLog { + ChangeLog { table: Relation, }, } diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index 8350b86b18e5..459e1b7921e9 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -20,7 +20,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_sqlparser::ast::{ - Cte, Expr, Fetch, ObjectName, OrderByExpr, Query, SetExpr, SetOperator, Value, With, + Cte, CteInner, Expr, Fetch, ObjectName, OrderByExpr, Query, SetExpr, SetOperator, Value, With, }; use thiserror_ext::AsReport; @@ -145,7 +145,7 @@ impl Binder { /// After finishing binding, we pop the previous context from the stack. pub fn bind_query(&mut self, query: Query) -> Result { self.push_context(); - let result = self.bind_query_inner(query.clone()); + let result = self.bind_query_inner(query); self.pop_context()?; result } @@ -286,77 +286,81 @@ impl Binder { for cte_table in with.cte_tables { // note that the new `share_id` for the rcte is generated here let share_id = self.next_share_id(); - let Cte { alias, query, from } = cte_table; + let Cte { alias, cte_inner } = cte_table; let table_name = alias.name.real_value(); if with.recursive { - let query = query.ok_or_else(|| { - ErrorCode::BindError("RECURSIVE CTE don't support changedlog from".to_string()) - })?; - let ( - SetExpr::SetOperation { - op: SetOperator::Union, - all, - left, - right, - }, - with, - ) = Self::validate_rcte(query)? - else { + if let CteInner::Query(query) = cte_inner { + let ( + SetExpr::SetOperation { + op: SetOperator::Union, + all, + left, + right, + }, + with, + ) = Self::validate_rcte(query)? + else { + return Err(ErrorCode::BindError( + "expect `SetOperation` as the return type of validation".into(), + ) + .into()); + }; + + let entry = self + .context + .cte_to_relation + .entry(table_name) + .insert_entry(Rc::new(RefCell::new(BindingCte { + share_id, + state: BindingCteState::Init, + alias, + }))) + .get() + .clone(); + + self.bind_rcte(with, entry, *left, *right, all)?; + } else { return Err(ErrorCode::BindError( - "expect `SetOperation` as the return type of validation".into(), + "RECURSIVE CTE only support query".to_string(), ) .into()); - }; - - let entry = self - .context - .cte_to_relation - .entry(table_name) - .insert_entry(Rc::new(RefCell::new(BindingCte { - share_id, - state: BindingCteState::Init, - alias, - }))) - .get() - .clone(); - - self.bind_rcte(with, entry, *left, *right, all)?; - } else if let Some(query) = query { - let bound_query = self.bind_query(query)?; - self.context.cte_to_relation.insert( - table_name, - Rc::new(RefCell::new(BindingCte { - share_id, - state: BindingCteState::Bound { - query: either::Either::Left(bound_query), - }, - alias, - })), - ); + } } else { - let from_table_name = from.ok_or_else(|| { - ErrorCode::BindError( - "CTE with changedlog from must have a table/mv".to_string(), - ) - })?; - self.push_context(); - let from_table_relation = self.bind_relation_by_name( - ObjectName::from(vec![from_table_name]), - None, - None, - )?; - self.pop_context()?; - self.context.cte_to_relation.insert( - table_name, - Rc::new(RefCell::new(BindingCte { - share_id, - state: BindingCteState::ChangedLog { - table: from_table_relation, - }, - alias, - })), - ); + match cte_inner { + CteInner::Query(query) => { + let bound_query = self.bind_query(query)?; + self.context.cte_to_relation.insert( + table_name, + Rc::new(RefCell::new(BindingCte { + share_id, + state: BindingCteState::Bound { + query: either::Either::Left(bound_query), + }, + alias, + })), + ); + } + CteInner::ChangeLog(from_table_name) => { + self.push_context(); + let from_table_relation = self.bind_relation_by_name( + ObjectName::from(vec![from_table_name]), + None, + None, + )?; + self.pop_context()?; + self.context.cte_to_relation.insert( + table_name, + Rc::new(RefCell::new(BindingCte { + share_id, + state: BindingCteState::ChangeLog { + table: from_table_relation, + }, + alias, + })), + ); + } + } } } Ok(()) diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index dd9a12e10b47..2cdf3ea07db4 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -392,8 +392,8 @@ impl Binder { // no matter it's recursive or not. Ok(Relation::Share(Box::new(BoundShare { share_id, input}))) } - BindingCteState::ChangedLog { table } => { - let input = BoundShareInput::ChangedLog(table); + BindingCteState::ChangeLog { table } => { + let input = BoundShareInput::ChangeLog(table); self.bind_table_to_context( input.fields()?, table_name.clone(), diff --git a/src/frontend/src/binder/relation/share.rs b/src/frontend/src/binder/relation/share.rs index 56f20e830a91..a7450096cd97 100644 --- a/src/frontend/src/binder/relation/share.rs +++ b/src/frontend/src/binder/relation/share.rs @@ -27,7 +27,7 @@ use crate::error::{ErrorCode, Result}; #[derive(Debug, Clone)] pub enum BoundShareInput { Query(Either), - ChangedLog(Relation), + ChangeLog(Relation), } impl BoundShareInput { pub fn fields(&self) -> Result> { @@ -48,7 +48,7 @@ impl BoundShareInput { .map(|f| (false, f)) .collect_vec()), }, - BoundShareInput::ChangedLog(r) => { + BoundShareInput::ChangeLog(r) => { let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r { ( bound_base_table.table_catalog.columns().to_vec(), @@ -80,7 +80,7 @@ impl BoundShareInput { true, Field::with_name( risingwave_common::types::DataType::Serial, - "_changedlog_row_id".to_string(), + "_changelog_row_id".to_string(), ), ), ]) @@ -103,7 +103,7 @@ impl RewriteExprsRecursive for BoundShare { Either::Left(q) => q.rewrite_exprs_recursive(rewriter), Either::Right(r) => r.rewrite_exprs_recursive(rewriter), }, - BoundShareInput::ChangedLog(r) => r.rewrite_exprs_recursive(rewriter), + BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter), }; } } diff --git a/src/frontend/src/binder/select.rs b/src/frontend/src/binder/select.rs index 514eb165e64f..d9848ed76973 100644 --- a/src/frontend/src/binder/select.rs +++ b/src/frontend/src/binder/select.rs @@ -363,7 +363,7 @@ impl Binder { && !except_indices.contains(&c.index) })); - select_list.extend(exprs.clone()); + select_list.extend(exprs); aliases.extend(names); // TODO: we will need to be able to handle wildcard expressions bound to // aliases in the future. We'd then need a diff --git a/src/frontend/src/optimizer/plan_node/generic/changed_log.rs b/src/frontend/src/optimizer/plan_node/generic/change_log.rs similarity index 69% rename from src/frontend/src/optimizer/plan_node/generic/changed_log.rs rename to src/frontend/src/optimizer/plan_node/generic/change_log.rs index e6539df3b371..cb4d6df05b1a 100644 --- a/src/frontend/src/optimizer/plan_node/generic/changed_log.rs +++ b/src/frontend/src/optimizer/plan_node/generic/change_log.rs @@ -24,22 +24,24 @@ use crate::utils::ColIndexMappingRewriteExt; use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ChangedLog { +pub struct ChangeLog { pub input: PlanRef, + // If there is no op in the output result, it is false, example 'create materialized view mv1 as with sub as changelog from t1 select v1 from sub;' pub need_op: bool, - pub need_changed_log_row_id: bool, + // False before rewrite, true after rewrite + pub need_change_log_row_id: bool, } -impl DistillUnit for ChangedLog { +impl DistillUnit for ChangeLog { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a> { childless_record(name, vec![]) } } -impl ChangedLog { - pub fn new(input: PlanRef, need_op: bool, need_changed_log_row_id: bool) -> Self { - ChangedLog { +impl ChangeLog { + pub fn new(input: PlanRef, need_op: bool, need_change_log_row_id: bool) -> Self { + ChangeLog { input, need_op, - need_changed_log_row_id, + need_change_log_row_id, } } @@ -49,7 +51,7 @@ impl ChangedLog { ColIndexMapping::new(map, self.schema().len()) } } -impl GenericPlanNode for ChangedLog { +impl GenericPlanNode for ChangeLog { fn schema(&self) -> Schema { let mut fields = self.input.schema().fields.clone(); if self.need_op { @@ -58,32 +60,21 @@ impl GenericPlanNode for ChangedLog { "op", )); } - if self.need_changed_log_row_id { + if self.need_change_log_row_id { fields.push(Field::with_name( risingwave_common::types::DataType::Serial, - "_changed_log_row_id", + "_change_log_row_id", )); } Schema::new(fields) } fn stream_key(&self) -> Option> { - match self.input.stream_key() { - Some(keys) => { - let mut keys = keys.to_vec(); - if self.need_changed_log_row_id { - keys.push(self.schema().len() - 1); - } - Some(keys) - } - None => { - if self.need_changed_log_row_id { - let keys = vec![self.schema().len() - 1]; - Some(keys) - } else { - None - } - } + if self.need_change_log_row_id { + let keys = vec![self.schema().len() - 1]; + Some(keys) + } else { + None } } diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index a84e4e1e35b0..26645c9955fc 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -78,8 +78,8 @@ mod cte_ref; pub use cte_ref::*; mod recursive_union; pub use recursive_union::*; -mod changed_log; -pub use changed_log::*; +mod change_log; +pub use change_log::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; diff --git a/src/frontend/src/optimizer/plan_node/logical_changed_log.rs b/src/frontend/src/optimizer/plan_node/logical_change_log.rs similarity index 62% rename from src/frontend/src/optimizer/plan_node/logical_changed_log.rs rename to src/frontend/src/optimizer/plan_node/logical_change_log.rs index f4153b1a1dfc..8c7d6365f035 100644 --- a/src/frontend/src/optimizer/plan_node/logical_changed_log.rs +++ b/src/frontend/src/optimizer/plan_node/logical_change_log.rs @@ -17,7 +17,7 @@ use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, - PlanBase, PlanTreeNodeUnary, PredicatePushdown, RewriteStreamContext, StreamChangedLog, + PlanBase, PlanTreeNodeUnary, PredicatePushdown, RewriteStreamContext, StreamChangeLog, StreamRowIdGen, ToBatch, ToStream, ToStreamContext, }; use crate::error::ErrorCode::BindError; @@ -27,34 +27,34 @@ use crate::utils::{ColIndexMapping, Condition}; use crate::PlanRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct LogicalChangedLog { +pub struct LogicalChangeLog { pub base: PlanBase, - core: generic::ChangedLog, + core: generic::ChangeLog, } -impl LogicalChangedLog { +impl LogicalChangeLog { pub fn create(input: PlanRef) -> PlanRef { Self::new(input, true, true).into() } - pub fn new(input: PlanRef, need_op: bool, need_changed_log_row_id: bool) -> Self { - let core = generic::ChangedLog::new(input, need_op, need_changed_log_row_id); + pub fn new(input: PlanRef, need_op: bool, need_change_log_row_id: bool) -> Self { + let core = generic::ChangeLog::new(input, need_op, need_change_log_row_id); Self::with_core(core) } - pub fn with_core(core: generic::ChangedLog) -> Self { + pub fn with_core(core: generic::ChangeLog) -> Self { let base = PlanBase::new_logical_with_core(&core); - LogicalChangedLog { base, core } + LogicalChangeLog { base, core } } } -impl PlanTreeNodeUnary for LogicalChangedLog { +impl PlanTreeNodeUnary for LogicalChangeLog { fn input(&self) -> PlanRef { self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(input, self.core.need_op, self.core.need_changed_log_row_id) + Self::new(input, self.core.need_op, self.core.need_change_log_row_id) } fn rewrite_with_input( @@ -62,27 +62,27 @@ impl PlanTreeNodeUnary for LogicalChangedLog { input: PlanRef, input_col_change: ColIndexMapping, ) -> (Self, ColIndexMapping) { - let changed_log = Self::new(input, self.core.need_op, true); + let change_log = Self::new(input, self.core.need_op, true); if self.core.need_op { let mut output_vec = input_col_change.to_parts().0.to_vec(); let len = input_col_change.to_parts().1; output_vec.push(Some(len)); let out_col_change = ColIndexMapping::new(output_vec, len + 1); - (changed_log, out_col_change) + (change_log, out_col_change) } else { - (changed_log, input_col_change) + (change_log, input_col_change) } } } -impl_plan_tree_node_for_unary! {LogicalChangedLog} -impl_distill_by_unit!(LogicalChangedLog, core, "LogicalChangedLog"); +impl_plan_tree_node_for_unary! {LogicalChangeLog} +impl_distill_by_unit!(LogicalChangeLog, core, "LogicalChangeLog"); -impl ExprRewritable for LogicalChangedLog {} +impl ExprRewritable for LogicalChangeLog {} -impl ExprVisitable for LogicalChangedLog {} +impl ExprVisitable for LogicalChangeLog {} -impl PredicatePushdown for LogicalChangedLog { +impl PredicatePushdown for LogicalChangeLog { fn predicate_pushdown( &self, predicate: Condition, @@ -92,42 +92,48 @@ impl PredicatePushdown for LogicalChangedLog { } } -impl ColPrunable for LogicalChangedLog { +impl ColPrunable for LogicalChangeLog { fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { - let mut new_required_cols = required_cols.to_vec(); let fields = self.schema().fields(); let mut need_op = false; - let mut need_changed_log_row_id = false; - required_cols.iter().for_each(|a| { - if let Some(f) = fields.get(*a) { - if f.name == "op" { - new_required_cols.pop(); - need_op = true; - } else if f.name == "_changed_log_row_id" { - new_required_cols.pop(); - need_changed_log_row_id = true; + let mut need_change_log_row_id = false; + let new_required_cols: Vec<_> = required_cols + .iter() + .filter_map(|a| { + if let Some(f) = fields.get(*a) { + if f.name == "op" { + need_op = true; + None + } else if f.name == "_change_log_row_id" { + need_change_log_row_id = true; + None + } else { + Some(*a) + } + } else { + Some(*a) } - } - }); + }) + .collect(); let new_input = self.input().prune_col(&new_required_cols, ctx); - Self::new(new_input, need_op, need_changed_log_row_id).into() + Self::new(new_input, need_op, need_change_log_row_id).into() } } -impl ToBatch for LogicalChangedLog { +impl ToBatch for LogicalChangeLog { fn to_batch(&self) -> Result { - Err(BindError("With changedlog cte only support with create mv/sink".to_string()).into()) + Err(BindError("With changelog cte only support with create mv/sink".to_string()).into()) } } -impl ToStream for LogicalChangedLog { +impl ToStream for LogicalChangeLog { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { let new_input = self.input().to_stream(ctx)?; let mut new_logical = self.core.clone(); new_logical.input = new_input; - let plan = StreamChangedLog::new(new_logical).into(); + let plan = StreamChangeLog::new(new_logical).into(); let row_id_index = self.schema().fields().len() - 1; let plan = StreamRowIdGen::new_with_dist( plan, @@ -144,7 +150,7 @@ impl ToStream for LogicalChangedLog { ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?; - let (changed_log, out_col_change) = self.rewrite_with_input(input, input_col_change); - Ok((changed_log.into(), out_col_change)) + let (change_log, out_col_change) = self.rewrite_with_input(input, input_col_change); + Ok((change_log.into(), out_col_change)) } } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 68a0c6766759..fa5ad199c0f1 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -848,7 +848,7 @@ mod batch_values; mod logical_agg; mod logical_apply; mod logical_cdc_scan; -mod logical_changed_log; +mod logical_change_log; mod logical_cte_ref; mod logical_dedup; mod logical_delete; @@ -877,7 +877,7 @@ mod logical_topn; mod logical_union; mod logical_update; mod logical_values; -mod stream_changed_log; +mod stream_change_log; mod stream_dedup; mod stream_delta_join; mod stream_dml; @@ -951,7 +951,7 @@ pub use batch_values::BatchValues; pub use logical_agg::LogicalAgg; pub use logical_apply::LogicalApply; pub use logical_cdc_scan::LogicalCdcScan; -pub use logical_changed_log::LogicalChangedLog; +pub use logical_change_log::LogicalChangeLog; pub use logical_cte_ref::LogicalCteRef; pub use logical_dedup::LogicalDedup; pub use logical_delete::LogicalDelete; @@ -982,7 +982,7 @@ pub use logical_union::LogicalUnion; pub use logical_update::LogicalUpdate; pub use logical_values::LogicalValues; pub use stream_cdc_table_scan::StreamCdcTableScan; -pub use stream_changed_log::StreamChangedLog; +pub use stream_change_log::StreamChangeLog; pub use stream_dedup::StreamDedup; pub use stream_delta_join::StreamDeltaJoin; pub use stream_dml::StreamDml; @@ -1073,7 +1073,7 @@ macro_rules! for_all_plan_nodes { , { Logical, IcebergScan } , { Logical, RecursiveUnion } , { Logical, CteRef } - , { Logical, ChangedLog } + , { Logical, ChangeLog } , { Batch, SimpleAgg } , { Batch, HashAgg } , { Batch, SortAgg } @@ -1137,7 +1137,7 @@ macro_rules! for_all_plan_nodes { , { Stream, EowcSort } , { Stream, OverWindow } , { Stream, FsFetch } - , { Stream, ChangedLog } + , { Stream, ChangeLog } } }; } @@ -1179,7 +1179,7 @@ macro_rules! for_logical_plan_nodes { , { Logical, IcebergScan } , { Logical, RecursiveUnion } , { Logical, CteRef } - , { Logical, ChangedLog } + , { Logical, ChangeLog } } }; } @@ -1261,7 +1261,7 @@ macro_rules! for_stream_plan_nodes { , { Stream, EowcSort } , { Stream, OverWindow } , { Stream, FsFetch } - , { Stream, ChangedLog } + , { Stream, ChangeLog } } }; } diff --git a/src/frontend/src/optimizer/plan_node/stream_changed_log.rs b/src/frontend/src/optimizer/plan_node/stream_change_log.rs similarity index 78% rename from src/frontend/src/optimizer/plan_node/stream_changed_log.rs rename to src/frontend/src/optimizer/plan_node/stream_change_log.rs index c6e3898c0a84..1f91bd16085d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_changed_log.rs +++ b/src/frontend/src/optimizer/plan_node/stream_change_log.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use risingwave_pb::stream_plan::ChangedLogNode; +use risingwave_pb::stream_plan::ChangeLogNode; use super::expr_visitable::ExprVisitable; use super::stream::prelude::PhysicalPlanRef; @@ -24,13 +24,13 @@ use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct StreamChangedLog { +pub struct StreamChangeLog { pub base: PlanBase, - core: generic::ChangedLog, + core: generic::ChangeLog, } -impl StreamChangedLog { - pub fn new(core: generic::ChangedLog) -> Self { +impl StreamChangeLog { + pub fn new(core: generic::ChangeLog) -> Self { let input = core.input.clone(); let dist = input.distribution().clone(); // Filter executor won't change the append-only behavior of the stream. @@ -47,11 +47,11 @@ impl StreamChangedLog { input.emit_on_window_close(), watermark_columns, ); - StreamChangedLog { base, core } + StreamChangeLog { base, core } } } -impl PlanTreeNodeUnary for StreamChangedLog { +impl PlanTreeNodeUnary for StreamChangeLog { fn input(&self) -> PlanRef { self.core.input.clone() } @@ -63,17 +63,17 @@ impl PlanTreeNodeUnary for StreamChangedLog { } } -impl_plan_tree_node_for_unary! { StreamChangedLog } -impl_distill_by_unit!(StreamChangedLog, core, "StreamChangedLog"); +impl_plan_tree_node_for_unary! { StreamChangeLog } +impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog"); -impl StreamNode for StreamChangedLog { +impl StreamNode for StreamChangeLog { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { - PbNodeBody::ChangedLog(ChangedLogNode { + PbNodeBody::ChangeLog(ChangeLogNode { need_op: self.core.need_op, }) } } -impl ExprRewritable for StreamChangedLog {} +impl ExprRewritable for StreamChangeLog {} -impl ExprVisitable for StreamChangedLog {} +impl ExprVisitable for StreamChangeLog {} diff --git a/src/frontend/src/planner/changed_log.rs b/src/frontend/src/planner/change_log.rs similarity index 80% rename from src/frontend/src/planner/changed_log.rs rename to src/frontend/src/planner/change_log.rs index 3bf2e2aed7aa..d01fe71f7499 100644 --- a/src/frontend/src/planner/changed_log.rs +++ b/src/frontend/src/planner/change_log.rs @@ -13,13 +13,13 @@ // limitations under the License. use crate::binder::Relation; use crate::error::Result; -use crate::optimizer::plan_node::LogicalChangedLog; +use crate::optimizer::plan_node::LogicalChangeLog; use crate::{PlanRef, Planner}; impl Planner { - pub(super) fn plan_changed_log(&mut self, relation: Relation) -> Result { + pub(super) fn plan_change_log(&mut self, relation: Relation) -> Result { let root = self.plan_relation(relation)?; - let plan = LogicalChangedLog::create(root); + let plan = LogicalChangeLog::create(root); Ok(plan) } } diff --git a/src/frontend/src/planner/mod.rs b/src/frontend/src/planner/mod.rs index c65172d5578d..2dc76a671b9e 100644 --- a/src/frontend/src/planner/mod.rs +++ b/src/frontend/src/planner/mod.rs @@ -18,7 +18,7 @@ use crate::binder::{BoundStatement, ShareId}; use crate::error::Result; use crate::optimizer::{OptimizerContextRef, PlanRoot}; -mod changed_log; +mod change_log; mod delete; mod insert; mod query; diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index a07b752b7b09..b8d27d2d9bc0 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -244,9 +244,9 @@ impl Planner { *recursive_union.recursive, share.share_id, ), - BoundShareInput::ChangedLog(relation) => { + BoundShareInput::ChangeLog(relation) => { let id = share.share_id; - let result = self.plan_changed_log(relation)?; + let result = self.plan_change_log(relation)?; let logical_share = LogicalShare::create(result); self.share_cache.insert(id, logical_share.clone()); Ok(logical_share) diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index 70bd358f7332..a9e7dd3526ed 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -153,7 +153,7 @@ impl Planner { let need_restore_select_items = select_items.len() > original_select_items_len; - root = LogicalProjectSet::create(root, select_items.clone()); + root = LogicalProjectSet::create(root, select_items); if matches!(&distinct, BoundDistinct::DistinctOn(_)) { root = if order.is_empty() { diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs index 3a7b05bdc1f5..bbddc6461db4 100644 --- a/src/meta/src/controller/rename.rs +++ b/src/meta/src/controller/rename.rs @@ -153,10 +153,11 @@ impl QueryRewriter<'_> { fn visit_query(&self, query: &mut Query) { if let Some(with) = &mut query.with { for cte_table in &mut with.cte_tables { - if let Some(query) = cte_table.query.as_mut() { - self.visit_query(query); - } else { - cte_table.from = Some(Ident::new_unchecked(self.to)); + match &mut cte_table.cte_inner { + risingwave_sqlparser::ast::CteInner::Query(query) => self.visit_query(query), + risingwave_sqlparser::ast::CteInner::ChangeLog(from) => { + *from = Ident::new_unchecked(self.to) + } } } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 0ce72be34bf8..46bf544d75bf 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -46,9 +46,9 @@ pub use self::legacy_source::{ }; pub use self::operator::{BinaryOperator, QualifiedOperator, UnaryOperator}; pub use self::query::{ - Cte, Distinct, Fetch, Join, JoinConstraint, JoinOperator, LateralView, OrderByExpr, Query, - Select, SelectItem, SetExpr, SetOperator, TableAlias, TableFactor, TableWithJoins, Top, Values, - With, + Cte, CteInner, Distinct, Fetch, Join, JoinConstraint, JoinOperator, LateralView, OrderByExpr, + Query, Select, SelectItem, SetExpr, SetOperator, TableAlias, TableFactor, TableWithJoins, Top, + Values, With, }; pub use self::statement::*; pub use self::value::{ diff --git a/src/sqlparser/src/ast/query.rs b/src/sqlparser/src/ast/query.rs index 6805133b7816..83e84907a109 100644 --- a/src/sqlparser/src/ast/query.rs +++ b/src/sqlparser/src/ast/query.rs @@ -282,24 +282,28 @@ impl fmt::Display for With { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct Cte { pub alias: TableAlias, - pub query: Option, - pub from: Option, + pub cte_inner: CteInner, } impl fmt::Display for Cte { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(query) = &self.query { - write!(f, "{} AS ({})", self.alias, query)?; - if let Some(ref fr) = self.from { - write!(f, " FROM {}", fr)?; + match &self.cte_inner { + CteInner::Query(query) => write!(f, "{} AS ({})", self.alias, query)?, + CteInner::ChangeLog(ident) => { + write!(f, "{} AS changelog from {}", self.alias, ident.value)? } - } else { - write!(f, "{} AS changedlog from {:?}", self.alias, self.from)?; } Ok(()) } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum CteInner { + Query(Query), + ChangeLog(Ident), +} + /// One item of the comma-separated list following `SELECT` #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index b984c96f3f47..df51791ec5c3 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3965,43 +3965,36 @@ impl Parser<'_> { /// Parse a CTE (`alias [( col1, col2, ... )] AS (subquery)`) fn parse_cte(&mut self) -> PResult { let name = self.parse_identifier_non_reserved()?; - - let mut cte = if self.parse_keyword(Keyword::AS) { - let query = if let Ok(()) = self.expect_token(&Token::LParen) { - let query = self.parse_query()?; - self.expect_token(&Token::RParen)?; - Some(query) - } else { - let changed_log = self.parse_identifier_non_reserved()?; - assert!(changed_log.to_string().to_lowercase() == "changedlog"); - None - }; + let cte = if self.parse_keyword(Keyword::AS) { + let cte_inner = self.parse_cte_inner()?; let alias = TableAlias { name, columns: vec![], }; - Cte { - alias, - query, - from: None, - } + Cte { alias, cte_inner } } else { let columns = self.parse_parenthesized_column_list(Optional)?; self.expect_keyword(Keyword::AS)?; - self.expect_token(&Token::LParen)?; + let cte_inner = self.parse_cte_inner()?; + let alias = TableAlias { name, columns }; + Cte { alias, cte_inner } + }; + Ok(cte) + } + + fn parse_cte_inner(&mut self) -> PResult { + if let Ok(()) = self.expect_token(&Token::LParen) { let query = self.parse_query()?; self.expect_token(&Token::RParen)?; - let alias = TableAlias { name, columns }; - Cte { - alias, - query: Some(query), - from: None, + Ok(CteInner::Query(query)) + } else { + let change_log = self.parse_identifier_non_reserved()?; + if change_log.to_string().to_lowercase() != "changelog" { + parser_err!("Expected 'changelog' but found '{}'", change_log); } - }; - if self.parse_keyword(Keyword::FROM) { - cte.from = Some(self.parse_identifier()?); + self.expect_keyword(Keyword::FROM)?; + Ok(CteInner::ChangeLog(self.parse_identifier()?)) } - Ok(cte) } /// Parse a "query body", which is an expression with roughly the diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index d7e323e1cef7..43f014a65b5e 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -2740,6 +2740,69 @@ fn parse_ctes() { ); } +#[test] +fn parse_change_log_ctes() { + let cte_sqls = vec!["foo", "bar"]; + let with = &format!( + "WITH a AS changelog from {}, b AS changelog from {} SELECT foo + bar FROM a, b", + cte_sqls[0], cte_sqls[1] + ); + + fn assert_change_log_ctes(expected: &[&str], sel: &Query) { + for (i, exp) in expected.iter().enumerate() { + let Cte { alias, query, from } = &sel.with.as_ref().unwrap().cte_tables[i]; + assert!(query.is_none()); + assert_eq!(*exp, from.as_ref().unwrap().to_string()); + assert_eq!( + if i == 0 { + Ident::new_unchecked("a") + } else { + Ident::new_unchecked("b") + }, + alias.name + ); + assert!(alias.columns.is_empty()); + } + } + + // Top-level CTE + assert_change_log_ctes(&cte_sqls, &verified_query(with)); + // CTE in a subquery + let sql = &format!("SELECT ({})", with); + let select = verified_only_select(sql); + match expr_from_projection(only(&select.projection)) { + Expr::Subquery(ref subquery) => { + assert_change_log_ctes(&cte_sqls, subquery.as_ref()); + } + _ => panic!("expected subquery"), + } + // CTE in a derived table + let sql = &format!("SELECT * FROM ({})", with); + let select = verified_only_select(sql); + match only(select.from).relation { + TableFactor::Derived { subquery, .. } => { + assert_change_log_ctes(&cte_sqls, subquery.as_ref()) + } + _ => panic!("expected derived table"), + } + // CTE in a view + let sql = &format!("CREATE VIEW v AS {}", with); + match verified_stmt(sql) { + Statement::CreateView { query, .. } => assert_change_log_ctes(&cte_sqls, &query), + _ => panic!("expected CREATE VIEW"), + } + // CTE in a CTE... + let sql = &format!("WITH outer_cte AS ({}) SELECT * FROM outer_cte", with); + let select = verified_query(sql); + assert_change_log_ctes( + &cte_sqls, + only(&select.with.unwrap().cte_tables) + .query + .as_ref() + .unwrap(), + ); +} + #[test] fn parse_cte_renamed_columns() { let sql = "WITH cte (col1, col2) AS (SELECT foo, bar FROM baz) SELECT * FROM cte"; @@ -2755,6 +2818,21 @@ fn parse_cte_renamed_columns() { .alias .columns ); + + let sql_change_log = "WITH cte (col1, col2) AS changelog from baz SELECT * FROM cte"; + + let query_change_log = verified_query(sql_change_log); + assert_eq!( + vec![Ident::new_unchecked("col1"), Ident::new_unchecked("col2")], + query_change_log + .with + .unwrap() + .cte_tables + .first() + .unwrap() + .alias + .columns + ); } #[test] diff --git a/src/stream/src/executor/changed_log.rs b/src/stream/src/executor/change_log.rs similarity index 63% rename from src/stream/src/executor/changed_log.rs rename to src/stream/src/executor/change_log.rs index 0ef0873c4314..1cdfd5b0f55e 100644 --- a/src/stream/src/executor/changed_log.rs +++ b/src/stream/src/executor/change_log.rs @@ -13,34 +13,34 @@ // limitations under the License. use core::fmt::Formatter; +use core::iter::repeat; use std::fmt::Debug; use std::sync::Arc; use futures::prelude::stream::StreamExt; use futures_async_stream::try_stream; use risingwave_common::array::{ArrayImpl, I16Array, Op, SerialArray, StreamChunk}; -use risingwave_common::types::Serial; use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError}; -pub struct ChangedLogExecutor { +pub struct ChangeLogExecutor { _ctx: ActorContextRef, input: Executor, need_op: bool, } -impl Debug for ChangedLogExecutor { +impl Debug for ChangeLogExecutor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ChangedLogExecutor").finish() + f.debug_struct("ChangeLogExecutor").finish() } } -impl Execute for ChangedLogExecutor { +impl Execute for ChangeLogExecutor { fn execute(self: Box) -> BoxedMessageStream { self.execute_inner().boxed() } } -impl ChangedLogExecutor { +impl ChangeLogExecutor { pub fn new(ctx: ActorContextRef, input: Executor, need_op: bool) -> Self { Self { _ctx: ctx, @@ -57,30 +57,22 @@ impl ChangedLogExecutor { let msg = msg?; match msg { Message::Chunk(chunk) => { - let (ops, columns, bitmap) = chunk.into_inner(); + let (ops, mut columns, bitmap) = chunk.into_inner(); let new_ops = vec![Op::Insert; ops.len()]; // They are all 0, will be add in row id gen executor. - let changed_log_row_id_array = - Arc::new(ArrayImpl::Serial(SerialArray::from_iter(vec![ - Serial::from( - 0 - ); - ops.len() - ]))); + let change_log_row_id_array = Arc::new(ArrayImpl::Serial( + SerialArray::from_iter(repeat(None).take(ops.len())), + )); let new_chunk = if self.need_op { let ops: Vec> = ops.iter().map(|op| Some(op.to_i16())).collect(); let ops_array = Arc::new(ArrayImpl::Int16(I16Array::from_iter(ops))); - let mut new_columns = Vec::with_capacity(columns.len() + 2); - new_columns.extend_from_slice(&columns); - new_columns.push(ops_array); - new_columns.push(changed_log_row_id_array); - StreamChunk::with_visibility(new_ops, new_columns, bitmap) + columns.push(ops_array); + columns.push(change_log_row_id_array); + StreamChunk::with_visibility(new_ops, columns, bitmap) } else { - let mut new_columns = Vec::with_capacity(columns.len() + 1); - new_columns.extend_from_slice(&columns); - new_columns.push(changed_log_row_id_array); - StreamChunk::with_visibility(new_ops, new_columns, bitmap) + columns.push(change_log_row_id_array); + StreamChunk::with_visibility(new_ops, columns, bitmap) }; yield Message::Chunk(new_chunk); } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 45b27b0cf602..f81684d6e01c 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -61,7 +61,7 @@ mod backfill; mod barrier_recv; mod batch_query; mod chain; -mod changed_log; +mod change_log; mod dedup; mod dispatch; pub mod dml; @@ -115,7 +115,7 @@ pub use backfill::no_shuffle_backfill::*; pub use barrier_recv::BarrierRecvExecutor; pub use batch_query::BatchQueryExecutor; pub use chain::ChainExecutor; -pub use changed_log::ChangedLogExecutor; +pub use change_log::ChangeLogExecutor; pub use dedup::AppendOnlyDedupExecutor; pub use dispatch::{DispatchExecutor, DispatcherImpl}; pub use dynamic_filter::DynamicFilterExecutor; diff --git a/src/stream/src/from_proto/changed_log.rs b/src/stream/src/from_proto/change_log.rs similarity index 76% rename from src/stream/src/from_proto/changed_log.rs rename to src/stream/src/from_proto/change_log.rs index 9ce129d2043a..231d894ccdd2 100644 --- a/src/stream/src/from_proto/changed_log.rs +++ b/src/stream/src/from_proto/change_log.rs @@ -12,18 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::stream_plan::ChangedLogNode; +use risingwave_pb::stream_plan::ChangeLogNode; use risingwave_storage::StateStore; use super::ExecutorBuilder; use crate::error::StreamResult; -use crate::executor::{ChangedLogExecutor, Executor}; +use crate::executor::{ChangeLogExecutor, Executor}; use crate::task::ExecutorParams; -pub struct ChangedLogExecutorBuilder; +pub struct ChangeLogExecutorBuilder; -impl ExecutorBuilder for ChangedLogExecutorBuilder { - type Node = ChangedLogNode; +impl ExecutorBuilder for ChangeLogExecutorBuilder { + type Node = ChangeLogNode; async fn new_boxed_executor( params: ExecutorParams, @@ -32,7 +32,7 @@ impl ExecutorBuilder for ChangedLogExecutorBuilder { ) -> StreamResult { let [input]: [_; 1] = params.input.try_into().unwrap(); - let exec = ChangedLogExecutor::new(params.actor_context, input, node.need_op); + let exec = ChangeLogExecutor::new(params.actor_context, input, node.need_op); Ok((params.info, exec).into()) } } diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 332c4b3a21cf..67b9b6972dc7 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -19,7 +19,7 @@ mod append_only_dedup; mod barrier_recv; mod batch_query; mod cdc_filter; -mod changed_log; +mod change_log; mod dml; mod dynamic_filter; mod eowc_over_window; @@ -96,7 +96,7 @@ use self::union::*; use self::watermark_filter::WatermarkFilterBuilder; use crate::error::StreamResult; use crate::executor::{Execute, Executor, ExecutorInfo}; -use crate::from_proto::changed_log::ChangedLogExecutorBuilder; +use crate::from_proto::change_log::ChangeLogExecutorBuilder; use crate::from_proto::values::ValuesExecutorBuilder; use crate::task::ExecutorParams; @@ -174,6 +174,6 @@ pub async fn create_executor( NodeBody::OverWindow => OverWindowExecutorBuilder, NodeBody::StreamFsFetch => FsFetchExecutorBuilder, NodeBody::SourceBackfill => SourceBackfillExecutorBuilder, - NodeBody::ChangedLog => ChangedLogExecutorBuilder, + NodeBody::ChangeLog => ChangeLogExecutorBuilder, } } diff --git a/src/tests/sqlsmith/src/reducer.rs b/src/tests/sqlsmith/src/reducer.rs index bb200f6abb9e..a2875de6ae9a 100644 --- a/src/tests/sqlsmith/src/reducer.rs +++ b/src/tests/sqlsmith/src/reducer.rs @@ -20,7 +20,7 @@ use anyhow::anyhow; use itertools::Itertools; use regex::Regex; use risingwave_sqlparser::ast::{ - Cte, Expr, FunctionArgExpr, Join, Query, Select, SetExpr, Statement, TableFactor, + Cte, CteInner, Expr, FunctionArgExpr, Join, Query, Select, SetExpr, Statement, TableFactor, TableWithJoins, With, }; @@ -123,8 +123,8 @@ pub(crate) fn find_ddl_references(sql_statements: &[Statement]) -> HashSet) { let Query { with, body, .. } = query; if let Some(With { cte_tables, .. }) = with { - for Cte { query, .. } in cte_tables { - if let Some(query) = query { + for Cte { cte_inner, .. } in cte_tables { + if let CteInner::Query(query) = cte_inner { find_ddl_references_for_query(query, ddl_references) } } diff --git a/src/tests/sqlsmith/src/sql_gen/query.rs b/src/tests/sqlsmith/src/sql_gen/query.rs index 22345c328743..dcead4275c7a 100644 --- a/src/tests/sqlsmith/src/sql_gen/query.rs +++ b/src/tests/sqlsmith/src/sql_gen/query.rs @@ -130,11 +130,9 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { fn gen_with_inner(&mut self) -> (With, Vec) { let alias = self.gen_table_alias_with_prefix("with"); let (query, query_schema) = self.gen_local_query(); - let from = None; let cte = Cte { alias: alias.clone(), - query: Some(query), - from, + cte_inner: risingwave_sqlparser::ast::CteInner::Query(query), }; let with_tables = vec![Table::new(alias.name.real_value(), query_schema)];