diff --git a/src/frontend/src/binder/relation/share.rs b/src/frontend/src/binder/relation/share.rs index a7450096cd973..0c5ea7627cc53 100644 --- a/src/frontend/src/binder/relation/share.rs +++ b/src/frontend/src/binder/relation/share.rs @@ -20,6 +20,7 @@ use crate::binder::bind_context::RecursiveUnion; use crate::binder::statement::RewriteExprsRecursive; use crate::binder::{BoundQuery, Relation, ShareId}; use crate::error::{ErrorCode, Result}; +use crate::optimizer::plan_node::generic::{CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID}; /// Share a relation during binding and planning. /// It could be used to share a (recursive) CTE, a source, a view and so on. @@ -73,14 +74,14 @@ impl BoundShareInput { false, Field::with_name( risingwave_common::types::DataType::Int16, - "op".to_string(), + CHANGE_LOG_OP.to_string(), ), ), ( true, Field::with_name( risingwave_common::types::DataType::Serial, - "_changelog_row_id".to_string(), + _CHANGE_LOG_ROW_ID.to_string(), ), ), ]) diff --git a/src/frontend/src/binder/select.rs b/src/frontend/src/binder/select.rs index d9848ed769732..4c6097ba9e9f8 100644 --- a/src/frontend/src/binder/select.rs +++ b/src/frontend/src/binder/select.rs @@ -25,11 +25,12 @@ use risingwave_sqlparser::ast::{ use super::bind_context::{Clause, ColumnBinding}; use super::statement::RewriteExprsRecursive; -use super::UNNAMED_COLUMN; +use super::{BoundShareInput, UNNAMED_COLUMN}; use crate::binder::{Binder, Relation}; use crate::catalog::check_valid_column_name; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef}; +use crate::optimizer::plan_node::generic::CHANGE_LOG_OP; use crate::utils::group_by::GroupBy; #[derive(Debug, Clone)] @@ -282,6 +283,17 @@ impl Binder { }) .collect::>>()?; + if let Some(Relation::Share(bound)) = &from { + if matches!(bound.input, BoundShareInput::ChangeLog(_)) + && fields.iter().filter(|&x| x.name.eq(CHANGE_LOG_OP)).count() > 1 + { + return Err(ErrorCode::BindError( + "The source table of changelog cannot have `change_log_op`, please rename it first".to_string() + ) + .into()); + } + } + Ok(BoundSelect { distinct, select_items, diff --git a/src/frontend/src/optimizer/plan_node/generic/change_log.rs b/src/frontend/src/optimizer/plan_node/generic/change_log.rs index cb4d6df05b1ad..e505a451d824c 100644 --- a/src/frontend/src/optimizer/plan_node/generic/change_log.rs +++ b/src/frontend/src/optimizer/plan_node/generic/change_log.rs @@ -23,6 +23,8 @@ use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMappingRewriteExt; use crate::OptimizerContextRef; +pub const CHANGE_LOG_OP: &str = "change_log_op"; +pub const _CHANGE_LOG_ROW_ID: &str = "_change_log_row_id"; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ChangeLog { pub input: PlanRef, @@ -57,13 +59,13 @@ impl GenericPlanNode for ChangeLog { if self.need_op { fields.push(Field::with_name( risingwave_common::types::DataType::Int16, - "op", + CHANGE_LOG_OP, )); } if self.need_change_log_row_id { fields.push(Field::with_name( risingwave_common::types::DataType::Serial, - "_change_log_row_id", + _CHANGE_LOG_ROW_ID, )); } Schema::new(fields) diff --git a/src/frontend/src/optimizer/plan_node/logical_change_log.rs b/src/frontend/src/optimizer/plan_node/logical_change_log.rs index 5b8ed3a5e85f0..fe4f920e92fa9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_change_log.rs +++ b/src/frontend/src/optimizer/plan_node/logical_change_log.rs @@ -13,7 +13,7 @@ // limitations under the License. use super::expr_visitable::ExprVisitable; -use super::generic::GenericPlanRef; +use super::generic::{GenericPlanRef, CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID}; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, @@ -101,10 +101,10 @@ impl ColPrunable for LogicalChangeLog { .iter() .filter_map(|a| { if let Some(f) = fields.get(*a) { - if f.name == "op" { + if f.name == CHANGE_LOG_OP { need_op = true; None - } else if f.name == "_change_log_row_id" { + } else if f.name == _CHANGE_LOG_ROW_ID { need_change_log_row_id = true; None } else { diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 565e3082c004f..d4dd87b508342 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -538,7 +538,7 @@ impl ToStream for LogicalScan { &self, ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - match self.base.stream_key().is_none() && ctx.get_with_stream_key(){ + match self.base.stream_key().is_none() && ctx.get_with_stream_key() { true => { let mut col_ids = HashSet::new(); diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 3993cdf9a359d..b72cc5474036b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -34,7 +34,7 @@ use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::{Explain, TableCatalog}; +use crate::TableCatalog; /// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted /// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView` @@ -240,9 +240,7 @@ impl StreamTableScan { let stream_key = self .stream_key() - .unwrap_or_else(|| { - &[] - }) + .unwrap_or(&[]) .iter() .map(|x| *x as u32) .collect_vec();