From 267e6c9410c13a23fe064c00ae46d9163e7de30e Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 20 Jun 2024 15:57:16 +0800 Subject: [PATCH] fix com --- .../src/optimizer/plan_node/convert.rs | 20 +--------------- .../optimizer/plan_node/logical_change_log.rs | 24 +++++++++++++++---- .../src/optimizer/plan_node/logical_scan.rs | 4 ++-- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index e29af79b5efb..db961b3b1e20 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -94,19 +94,9 @@ pub fn stream_enforce_eowc_requirement( } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct RewriteStreamContext { share_rewrite_map: HashMap, - with_stream_key: bool, -} - -impl Default for RewriteStreamContext { - fn default() -> Self { - Self { - share_rewrite_map: HashMap::default(), - with_stream_key: true, - } - } } impl RewriteStreamContext { @@ -128,14 +118,6 @@ impl RewriteStreamContext { ) -> Option<&(PlanRef, ColIndexMapping)> { self.share_rewrite_map.get(&plan_node_id) } - - pub fn set_with_stream_key(&mut self, with_stream_key: bool) { - self.with_stream_key = with_stream_key; - } - - pub fn get_with_stream_key(&self) -> bool { - self.with_stream_key - } } #[derive(Debug, Clone)] diff --git a/src/frontend/src/optimizer/plan_node/logical_change_log.rs b/src/frontend/src/optimizer/plan_node/logical_change_log.rs index fe4f920e92fa..05dd3f055ef0 100644 --- a/src/frontend/src/optimizer/plan_node/logical_change_log.rs +++ b/src/frontend/src/optimizer/plan_node/logical_change_log.rs @@ -12,16 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; + use super::expr_visitable::ExprVisitable; use super::generic::{GenericPlanRef, CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID}; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, - PlanBase, PlanTreeNodeUnary, PredicatePushdown, RewriteStreamContext, StreamChangeLog, - StreamRowIdGen, ToBatch, ToStream, ToStreamContext, + LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown, RewriteStreamContext, + StreamChangeLog, StreamRowIdGen, ToBatch, ToStream, ToStreamContext, }; use crate::error::ErrorCode::BindError; use crate::error::Result; +use crate::expr::{ExprImpl, InputRef}; use crate::optimizer::property::Distribution; use crate::utils::{ColIndexMapping, Condition}; use crate::PlanRef; @@ -149,9 +152,22 @@ impl ToStream for LogicalChangeLog { &self, ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - ctx.set_with_stream_key(false); + let original_schema = self.input().schema().clone(); let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?; - let (change_log, out_col_change) = self.rewrite_with_input(input, input_col_change); + let exprs = (0..original_schema.len()) + .map(|x| { + ExprImpl::InputRef( + InputRef::new( + input_col_change.map(x), + original_schema.fields[x].data_type.clone(), + ) + .into(), + ) + }) + .collect_vec(); + let project = LogicalProject::new(input.clone(), exprs); + let (project, out_col_change) = project.rewrite_with_input(input, input_col_change); + let (change_log, out_col_change) = self.rewrite_with_input(project.into(), out_col_change); Ok((change_log.into(), out_col_change)) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index d4dd87b50834..e2aeaa6b9517 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -536,9 +536,9 @@ impl ToStream for LogicalScan { fn logical_rewrite_for_stream( &self, - ctx: &mut RewriteStreamContext, + _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - match self.base.stream_key().is_none() && ctx.get_with_stream_key() { + match self.base.stream_key().is_none() { true => { let mut col_ids = HashSet::new();