diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index b4238f57b1f52..0df387b0a53d5 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -616,7 +616,7 @@ fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bo fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool { fn is_user_table(plan: &PlanRef) -> bool { plan.as_batch_seq_scan() - .map(|node| !node.logical().is_sys_table) + .map(|node| !node.core().is_sys_table) .unwrap_or(false) } @@ -649,7 +649,7 @@ fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> boo fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool { fn is_user_table(plan: &PlanRef) -> bool { plan.as_batch_seq_scan() - .map(|node| !node.logical().is_sys_table) + .map(|node| !node.core().is_sys_table) .unwrap_or(false) } diff --git a/src/frontend/src/optimizer/plan_node/batch_delete.rs b/src/frontend/src/optimizer/plan_node/batch_delete.rs index 600ec6827e3eb..42db0a1c4a774 100644 --- a/src/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/src/frontend/src/optimizer/plan_node/batch_delete.rs @@ -27,35 +27,35 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchDelete { pub base: PlanBase, - pub logical: generic::Delete, + pub core: generic::Delete, } impl BatchDelete { - pub fn new(logical: generic::Delete) -> Self { - assert_eq!(logical.input.distribution(), &Distribution::Single); + pub fn new(core: generic::Delete) -> Self { + assert_eq!(core.input.distribution(), &Distribution::Single); let base: PlanBase = PlanBase::new_batch_from_logical( - &logical, - logical.input.distribution().clone(), + &core, + core.input.distribution().clone(), Order::any(), ); - Self { base, logical } + Self { base, core } } } impl PlanTreeNodeUnary for BatchDelete { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut core = self.logical.clone(); + let mut core = self.core.clone(); core.input = input; Self::new(core) } } impl_plan_tree_node_for_unary! { BatchDelete } -impl_distill_by_unit!(BatchDelete, logical, "BatchDelete"); +impl_distill_by_unit!(BatchDelete, core, "BatchDelete"); impl ToDistributedBatch for BatchDelete { fn to_distributed(&self) -> Result { @@ -68,9 +68,9 @@ impl ToDistributedBatch for BatchDelete { impl ToBatchPb for BatchDelete { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::Delete(DeleteNode { - table_id: self.logical.table_id.table_id(), - table_version_id: self.logical.table_version_id, - returning: self.logical.returning, + table_id: self.core.table_id.table_id(), + table_version_id: self.core.table_version_id, + returning: self.core.returning, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_expand.rs b/src/frontend/src/optimizer/plan_node/batch_expand.rs index 72caa27858be1..870368701be44 100644 --- a/src/frontend/src/optimizer/plan_node/batch_expand.rs +++ b/src/frontend/src/optimizer/plan_node/batch_expand.rs @@ -29,38 +29,38 @@ use crate::optimizer::PlanRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchExpand { pub base: PlanBase, - logical: generic::Expand, + core: generic::Expand, } impl BatchExpand { - pub fn new(logical: generic::Expand) -> Self { - let dist = match logical.input.distribution() { + pub fn new(core: generic::Expand) -> Self { + let dist = match core.input.distribution() { Distribution::Single => Distribution::Single, Distribution::SomeShard | Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard, Distribution::Broadcast => unreachable!(), }; - let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any()); - BatchExpand { base, logical } + let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + BatchExpand { base, core } } pub fn column_subsets(&self) -> &[Vec] { - &self.logical.column_subsets + &self.core.column_subsets } } -impl_distill_by_unit!(BatchExpand, logical, "BatchExpand"); +impl_distill_by_unit!(BatchExpand, core, "BatchExpand"); impl PlanTreeNodeUnary for BatchExpand { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_filter.rs b/src/frontend/src/optimizer/plan_node/batch_filter.rs index aadbda9800b16..6bc5086c7a29b 100644 --- a/src/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/src/frontend/src/optimizer/plan_node/batch_filter.rs @@ -26,35 +26,35 @@ use crate::utils::Condition; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchFilter { pub base: PlanBase, - logical: generic::Filter, + core: generic::Filter, } impl BatchFilter { - pub fn new(logical: generic::Filter) -> Self { + pub fn new(core: generic::Filter) -> Self { // TODO: derive from input let base = PlanBase::new_batch_from_logical( - &logical, - logical.input.distribution().clone(), - logical.input.order().clone(), + &core, + core.input.distribution().clone(), + core.input.order().clone(), ); - BatchFilter { base, logical } + BatchFilter { base, core } } pub fn predicate(&self) -> &Condition { - &self.logical.predicate + &self.core.predicate } } -impl_distill_by_unit!(BatchFilter, logical, "BatchFilter"); +impl_distill_by_unit!(BatchFilter, core, "BatchFilter"); impl PlanTreeNodeUnary for BatchFilter { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -70,7 +70,7 @@ impl ToDistributedBatch for BatchFilter { impl ToBatchPb for BatchFilter { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::Filter(FilterNode { - search_condition: Some(ExprImpl::from(self.logical.predicate.clone()).to_expr_proto()), + search_condition: Some(ExprImpl::from(self.core.predicate.clone()).to_expr_proto()), }) } } @@ -88,8 +88,8 @@ impl ExprRewritable for BatchFilter { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/batch_group_topn.rs b/src/frontend/src/optimizer/plan_node/batch_group_topn.rs index 1d61b4e9eb379..8f6684dc4d85b 100644 --- a/src/frontend/src/optimizer/plan_node/batch_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_group_topn.rs @@ -27,36 +27,36 @@ use crate::optimizer::property::{Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchGroupTopN { pub base: PlanBase, - logical: generic::TopN, + core: generic::TopN, } impl BatchGroupTopN { - pub fn new(logical: generic::TopN) -> Self { - assert!(!logical.group_key.is_empty()); + pub fn new(core: generic::TopN) -> Self { + assert!(!core.group_key.is_empty()); let base = PlanBase::new_batch_from_logical( - &logical, - logical.input.distribution().clone(), + &core, + core.input.distribution().clone(), Order::any(), ); - BatchGroupTopN { base, logical } + BatchGroupTopN { base, core } } fn group_key(&self) -> &[usize] { - &self.logical.group_key + &self.core.group_key } } -impl_distill_by_unit!(BatchGroupTopN, logical, "BatchGroupTopN"); +impl_distill_by_unit!(BatchGroupTopN, core, "BatchGroupTopN"); impl PlanTreeNodeUnary for BatchGroupTopN { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -73,13 +73,13 @@ impl ToDistributedBatch for BatchGroupTopN { impl ToBatchPb for BatchGroupTopN { fn to_batch_prost_body(&self) -> NodeBody { - let column_orders = self.logical.order.to_protobuf(); + let column_orders = self.core.order.to_protobuf(); NodeBody::GroupTopN(GroupTopNNode { - limit: self.logical.limit_attr.limit(), - offset: self.logical.offset, + limit: self.core.limit_attr.limit(), + offset: self.core.offset, column_orders, group_key: self.group_key().iter().map(|c| *c as u32).collect(), - with_ties: self.logical.limit_attr.with_ties(), + with_ties: self.core.limit_attr.with_ties(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs b/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs index 7100125dcee99..fa14076912689 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs @@ -31,27 +31,27 @@ use crate::utils::{ColIndexMappingRewriteExt, IndexSet}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchHashAgg { pub base: PlanBase, - logical: generic::Agg, + core: generic::Agg, } impl BatchHashAgg { - pub fn new(logical: generic::Agg) -> Self { - assert!(!logical.group_key.is_empty()); - let input = logical.input.clone(); + pub fn new(core: generic::Agg) -> Self { + assert!(!core.group_key.is_empty()); + let input = core.input.clone(); let input_dist = input.distribution(); - let dist = logical + let dist = core .i2o_col_mapping() .rewrite_provided_distribution(input_dist); - let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any()); - BatchHashAgg { base, logical } + let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + BatchHashAgg { base, core } } pub fn agg_calls(&self) -> &[PlanAggCall] { - &self.logical.agg_calls + &self.core.agg_calls } pub fn group_key(&self) -> &IndexSet { - &self.logical.group_key + &self.core.group_key } fn to_two_phase_agg(&self, dist_input: PlanRef) -> Result { @@ -68,7 +68,7 @@ impl BatchHashAgg { // insert total agg let total_agg_types = self - .logical + .core .agg_calls .iter() .enumerate() @@ -95,29 +95,27 @@ impl BatchHashAgg { } } -impl_distill_by_unit!(BatchHashAgg, logical, "BatchHashAgg"); +impl_distill_by_unit!(BatchHashAgg, core, "BatchHashAgg"); impl PlanTreeNodeUnary for BatchHashAgg { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! { BatchHashAgg } impl ToDistributedBatch for BatchHashAgg { fn to_distributed(&self) -> Result { - if self.logical.must_try_two_phase_agg() { + if self.core.must_try_two_phase_agg() { let input = self.input().to_distributed()?; let input_dist = input.distribution(); - if !self - .logical - .hash_agg_dist_satisfied_by_input_dist(input_dist) + if !self.core.hash_agg_dist_satisfied_by_input_dist(input_dist) && matches!( input_dist, Distribution::HashShard(_) @@ -162,8 +160,8 @@ impl ExprRewritable for BatchHashAgg { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/batch_hash_join.rs b/src/frontend/src/optimizer/plan_node/batch_hash_join.rs index a4ecf8311a479..5adfa6f5fd622 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hash_join.rs @@ -36,7 +36,7 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchHashJoin { pub base: PlanBase, - logical: generic::Join, + core: generic::Join, /// The join condition must be equivalent to `logical.on`, but separated into equal and /// non-equal parts to facilitate execution later @@ -44,17 +44,13 @@ pub struct BatchHashJoin { } impl BatchHashJoin { - pub fn new(logical: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { - let dist = Self::derive_dist( - logical.left.distribution(), - logical.right.distribution(), - &logical, - ); - let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any()); + pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { + let dist = Self::derive_dist(core.left.distribution(), core.right.distribution(), &core); + let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); Self { base, - logical, + core, eq_join_predicate, } } @@ -62,25 +58,21 @@ impl BatchHashJoin { pub(super) fn derive_dist( left: &Distribution, right: &Distribution, - logical: &generic::Join, + join: &generic::Join, ) -> Distribution { match (left, right) { (Distribution::Single, Distribution::Single) => Distribution::Single, // we can not derive the hash distribution from the side where outer join can generate a // NULL row - (Distribution::HashShard(_), Distribution::HashShard(_)) => match logical.join_type { + (Distribution::HashShard(_), Distribution::HashShard(_)) => match join.join_type { JoinType::Unspecified => unreachable!(), JoinType::FullOuter => Distribution::SomeShard, JoinType::Inner | JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => { - let l2o = logical - .l2i_col_mapping() - .composite(&logical.i2o_col_mapping()); + let l2o = join.l2i_col_mapping().composite(&join.i2o_col_mapping()); l2o.rewrite_provided_distribution(left) } JoinType::RightSemi | JoinType::RightAnti | JoinType::RightOuter => { - let r2o = logical - .r2i_col_mapping() - .composite(&logical.i2o_col_mapping()); + let r2o = join.r2i_col_mapping().composite(&join.i2o_col_mapping()); r2o.rewrite_provided_distribution(right) } }, @@ -101,9 +93,9 @@ impl Distill for BatchHashJoin { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx.is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); - vec.push(("type", Pretty::debug(&self.logical.join_type))); + vec.push(("type", Pretty::debug(&self.core.join_type))); - let concat_schema = self.logical.concat_schema(); + let concat_schema = self.core.concat_schema(); vec.push(( "predicate", Pretty::debug(&EqJoinPredicateDisplay { @@ -112,7 +104,7 @@ impl Distill for BatchHashJoin { }), )); if verbose { - let data = IndicesDisplay::from_join(&self.logical, &concat_schema); + let data = IndicesDisplay::from_join(&self.core, &concat_schema); vec.push(("output", data)); } childless_record("BatchHashJoin", vec) @@ -121,18 +113,18 @@ impl Distill for BatchHashJoin { impl PlanTreeNodeBinary for BatchHashJoin { fn left(&self) -> PlanRef { - self.logical.left.clone() + self.core.left.clone() } fn right(&self) -> PlanRef { - self.logical.right.clone() + self.core.right.clone() } fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.left = left; - logical.right = right; - Self::new(logical, self.eq_join_predicate.clone()) + let mut core = self.core.clone(); + core.left = left; + core.right = right; + Self::new(core, self.eq_join_predicate.clone()) } } @@ -200,7 +192,7 @@ impl ToDistributedBatch for BatchHashJoin { impl ToBatchPb for BatchHashJoin { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::HashJoin(HashJoinNode { - join_type: self.logical.join_type as i32, + join_type: self.core.join_type as i32, left_key: self .eq_join_predicate .left_eq_indexes() @@ -219,12 +211,7 @@ impl ToBatchPb for BatchHashJoin { .other_cond() .as_expr_unless_true() .map(|x| x.to_expr_proto()), - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), }) } } @@ -246,8 +233,8 @@ impl ExprRewritable for BatchHashJoin { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical, self.eq_join_predicate.rewrite_exprs(r)).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into() } } diff --git a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs index c4b84b7232d1a..68381956b8a9a 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs @@ -30,45 +30,45 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchHopWindow { pub base: PlanBase, - logical: generic::HopWindow, + core: generic::HopWindow, window_start_exprs: Vec, window_end_exprs: Vec, } impl BatchHopWindow { pub fn new( - logical: generic::HopWindow, + core: generic::HopWindow, window_start_exprs: Vec, window_end_exprs: Vec, ) -> Self { - let distribution = logical + let distribution = core .i2o_col_mapping() - .rewrite_provided_distribution(logical.input.distribution()); + .rewrite_provided_distribution(core.input.distribution()); let base = PlanBase::new_batch_from_logical( - &logical, + &core, distribution, - logical.get_out_column_index_order(), + core.get_out_column_index_order(), ); BatchHopWindow { base, - logical, + core, window_start_exprs, window_end_exprs, } } } -impl_distill_by_unit!(BatchHopWindow, logical, "BatchHopWindow"); +impl_distill_by_unit!(BatchHopWindow, core, "BatchHopWindow"); impl PlanTreeNodeUnary for BatchHopWindow { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; + let mut core = self.core.clone(); + core.input = input; Self::new( - logical, + core, self.window_start_exprs.clone(), self.window_end_exprs.clone(), ) @@ -92,13 +92,13 @@ impl ToDistributedBatch for BatchHopWindow { // communication. // We pass the required dist to its input. let input_required = self - .logical + .core .o2i_col_mapping() .rewrite_required_distribution(required_dist); let new_input = self .input() .to_distributed_with_required(required_order, &input_required)?; - let mut new_logical = self.logical.clone(); + let mut new_logical = self.core.clone(); new_logical.input = new_input; let batch_plan = BatchHopWindow::new( new_logical, @@ -113,15 +113,10 @@ impl ToDistributedBatch for BatchHopWindow { impl ToBatchPb for BatchHopWindow { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::HopWindow(HopWindowNode { - time_col: self.logical.time_col.index() as _, - window_slide: Some(self.logical.window_slide.into()), - window_size: Some(self.logical.window_size.into()), - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), + time_col: self.core.time_col.index() as _, + window_slide: Some(self.core.window_slide.into()), + window_size: Some(self.core.window_size.into()), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), window_start_exprs: self .window_start_exprs .clone() @@ -152,7 +147,7 @@ impl ExprRewritable for BatchHopWindow { fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { Self::new( - self.logical.clone(), + self.core.clone(), self.window_start_exprs .clone() .into_iter() diff --git a/src/frontend/src/optimizer/plan_node/batch_insert.rs b/src/frontend/src/optimizer/plan_node/batch_insert.rs index 305de0e2f6eaa..0a2d2dddec8c5 100644 --- a/src/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/src/frontend/src/optimizer/plan_node/batch_insert.rs @@ -28,40 +28,38 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchInsert { pub base: PlanBase, - pub logical: generic::Insert, + pub core: generic::Insert, } impl BatchInsert { - pub fn new(logical: generic::Insert) -> Self { - assert_eq!(logical.input.distribution(), &Distribution::Single); + pub fn new(core: generic::Insert) -> Self { + assert_eq!(core.input.distribution(), &Distribution::Single); let base: PlanBase = PlanBase::new_batch_from_logical( - &logical, - logical.input.distribution().clone(), + &core, + core.input.distribution().clone(), Order::any(), ); - BatchInsert { base, logical } + BatchInsert { base, core } } } impl Distill for BatchInsert { fn distill<'a>(&self) -> XmlNode<'a> { - let vec = self - .logical - .fields_pretty(self.base.ctx.is_explain_verbose()); + let vec = self.core.fields_pretty(self.base.ctx.is_explain_verbose()); childless_record("BatchInsert", vec) } } impl PlanTreeNodeUnary for BatchInsert { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -77,14 +75,9 @@ impl ToDistributedBatch for BatchInsert { impl ToBatchPb for BatchInsert { fn to_batch_prost_body(&self) -> NodeBody { - let column_indices = self - .logical - .column_indices - .iter() - .map(|&i| i as u32) - .collect(); + let column_indices = self.core.column_indices.iter().map(|&i| i as u32).collect(); - let default_columns = &self.logical.default_columns; + let default_columns = &self.core.default_columns; let has_default_columns = !default_columns.is_empty(); let default_columns = DefaultColumns { default_columns: default_columns @@ -96,16 +89,16 @@ impl ToBatchPb for BatchInsert { .collect(), }; NodeBody::Insert(InsertNode { - table_id: self.logical.table_id.table_id(), - table_version_id: self.logical.table_version_id, + table_id: self.core.table_id.table_id(), + table_version_id: self.core.table_version_id, column_indices, default_columns: if has_default_columns { Some(default_columns) } else { None }, - row_id_index: self.logical.row_id_index.map(|index| index as _), - returning: self.logical.returning, + row_id_index: self.core.row_id_index.map(|index| index as _), + returning: self.core.returning, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_limit.rs b/src/frontend/src/optimizer/plan_node/batch_limit.rs index e617f1c2cd544..17ee2c3fb69f3 100644 --- a/src/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/src/frontend/src/optimizer/plan_node/batch_limit.rs @@ -27,21 +27,21 @@ use crate::optimizer::property::{Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchLimit { pub base: PlanBase, - logical: generic::Limit, + core: generic::Limit, } impl BatchLimit { - pub fn new(logical: generic::Limit) -> Self { + pub fn new(core: generic::Limit) -> Self { let base = PlanBase::new_batch_from_logical( - &logical, - logical.input.distribution().clone(), - logical.input.order().clone(), + &core, + core.input.distribution().clone(), + core.input.order().clone(), ); - BatchLimit { base, logical } + BatchLimit { base, core } } fn two_phase_limit(&self, new_input: PlanRef) -> Result { - let new_limit = self.logical.limit + self.logical.offset; + let new_limit = self.core.limit + self.core.offset; let new_offset = 0; let logical_partial_limit = generic::Limit::new(new_input.clone(), new_limit, new_offset); let batch_partial_limit = Self::new(logical_partial_limit); @@ -60,27 +60,27 @@ impl BatchLimit { } pub fn limit(&self) -> u64 { - self.logical.limit + self.core.limit } pub fn offset(&self) -> u64 { - self.logical.offset + self.core.offset } } impl PlanTreeNodeUnary for BatchLimit { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut core = self.logical.clone(); + let mut core = self.core.clone(); core.input = input; Self::new(core) } } impl_plan_tree_node_for_unary! {BatchLimit} -impl_distill_by_unit!(BatchLimit, logical, "BatchLimit"); +impl_distill_by_unit!(BatchLimit, core, "BatchLimit"); impl ToDistributedBatch for BatchLimit { fn to_distributed(&self) -> Result { @@ -91,8 +91,8 @@ impl ToDistributedBatch for BatchLimit { impl ToBatchPb for BatchLimit { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::Limit(LimitNode { - limit: self.logical.limit, - offset: self.logical.offset, + limit: self.core.limit, + offset: self.core.offset, }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs index 3098019499b76..99eb905da4661 100644 --- a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs @@ -34,7 +34,7 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchLookupJoin { pub base: PlanBase, - logical: generic::Join, + core: generic::Join, /// The join condition must be equivalent to `logical.on`, but separated into equal and /// non-equal parts to facilitate execution later @@ -56,7 +56,7 @@ pub struct BatchLookupJoin { impl BatchLookupJoin { pub fn new( - logical: generic::Join, + core: generic::Join, eq_join_predicate: EqJoinPredicate, right_table_desc: TableDesc, right_output_column_ids: Vec, @@ -67,11 +67,11 @@ impl BatchLookupJoin { // lookup. assert!(eq_join_predicate.has_eq()); assert!(eq_join_predicate.eq_keys_are_type_aligned()); - let dist = Self::derive_dist(logical.left.distribution(), &logical); - let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any()); + let dist = Self::derive_dist(core.left.distribution(), &core); + let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); Self { base, - logical, + core, eq_join_predicate, right_table_desc, right_output_column_ids, @@ -80,13 +80,11 @@ impl BatchLookupJoin { } } - fn derive_dist(left: &Distribution, logical: &generic::Join) -> Distribution { + fn derive_dist(left: &Distribution, core: &generic::Join) -> Distribution { match left { Distribution::Single => Distribution::Single, Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => { - let l2o = logical - .l2i_col_mapping() - .composite(&logical.i2o_col_mapping()); + let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping()); l2o.rewrite_provided_distribution(left) } _ => unreachable!(), @@ -116,9 +114,9 @@ impl Distill for BatchLookupJoin { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx.is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); - vec.push(("type", Pretty::debug(&self.logical.join_type))); + vec.push(("type", Pretty::debug(&self.core.join_type))); - let concat_schema = self.logical.concat_schema(); + let concat_schema = self.core.concat_schema(); vec.push(( "predicate", Pretty::debug(&EqJoinPredicateDisplay { @@ -128,7 +126,7 @@ impl Distill for BatchLookupJoin { )); if verbose { - let data = IndicesDisplay::from_join(&self.logical, &concat_schema); + let data = IndicesDisplay::from_join(&self.core, &concat_schema); vec.push(("output", data)); } @@ -138,15 +136,15 @@ impl Distill for BatchLookupJoin { impl PlanTreeNodeUnary for BatchLookupJoin { fn input(&self) -> PlanRef { - self.logical.left.clone() + self.core.left.clone() } // Only change left side fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.left = input; + let mut core = self.core.clone(); + core.left = input; Self::new( - logical, + core, self.eq_join_predicate.clone(), self.right_table_desc.clone(), self.right_output_column_ids.clone(), @@ -199,7 +197,7 @@ impl ToBatchPb for BatchLookupJoin { fn to_batch_prost_body(&self) -> NodeBody { if self.distributed_lookup { NodeBody::DistributedLookupJoin(DistributedLookupJoinNode { - join_type: self.logical.join_type as i32, + join_type: self.core.join_type as i32, condition: self .eq_join_predicate .other_cond() @@ -223,18 +221,13 @@ impl ToBatchPb for BatchLookupJoin { .iter() .map(ColumnId::get_id) .collect(), - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), null_safe: self.eq_join_predicate.null_safes(), lookup_prefix_len: self.lookup_prefix_len as u32, }) } else { NodeBody::LocalLookupJoin(LocalLookupJoinNode { - join_type: self.logical.join_type as i32, + join_type: self.core.join_type as i32, condition: self .eq_join_predicate .other_cond() @@ -259,12 +252,7 @@ impl ToBatchPb for BatchLookupJoin { .iter() .map(ColumnId::get_id) .collect(), - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), worker_nodes: vec![], // To be filled in at local.rs null_safe: self.eq_join_predicate.null_safes(), lookup_prefix_len: self.lookup_prefix_len as u32, @@ -289,11 +277,11 @@ impl ExprRewritable for BatchLookupJoin { fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let base = self.base.clone_with_new_plan_id(); - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); + let mut core = self.core.clone(); + core.rewrite_exprs(r); Self { base, - logical, + core, eq_join_predicate: self.eq_join_predicate.rewrite_exprs(r), ..Self::clone(self) } diff --git a/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs b/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs index 79d9f07d8eadc..d743523d05911 100644 --- a/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs @@ -31,14 +31,14 @@ use crate::utils::ConditionDisplay; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchNestedLoopJoin { pub base: PlanBase, - logical: generic::Join, + core: generic::Join, } impl BatchNestedLoopJoin { - pub fn new(logical: generic::Join) -> Self { - let dist = Self::derive_dist(logical.left.distribution(), logical.right.distribution()); - let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any()); - Self { base, logical } + pub fn new(core: generic::Join) -> Self { + let dist = Self::derive_dist(core.left.distribution(), core.right.distribution()); + let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + Self { base, core } } fn derive_dist(left: &Distribution, right: &Distribution) -> Distribution { @@ -53,19 +53,19 @@ impl Distill for BatchNestedLoopJoin { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx.is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); - vec.push(("type", Pretty::debug(&self.logical.join_type))); + vec.push(("type", Pretty::debug(&self.core.join_type))); - let concat_schema = self.logical.concat_schema(); + let concat_schema = self.core.concat_schema(); vec.push(( "predicate", Pretty::debug(&ConditionDisplay { - condition: &self.logical.on, + condition: &self.core.on, input_schema: &concat_schema, }), )); if verbose { - let data = IndicesDisplay::from_join(&self.logical, &concat_schema); + let data = IndicesDisplay::from_join(&self.core, &concat_schema); vec.push(("output", data)); } @@ -75,18 +75,18 @@ impl Distill for BatchNestedLoopJoin { impl PlanTreeNodeBinary for BatchNestedLoopJoin { fn left(&self) -> PlanRef { - self.logical.left.clone() + self.core.left.clone() } fn right(&self) -> PlanRef { - self.logical.right.clone() + self.core.right.clone() } fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.left = left; - logical.right = right; - Self::new(logical) + let mut core = self.core.clone(); + core.left = left; + core.right = right; + Self::new(core) } } @@ -108,14 +108,9 @@ impl ToDistributedBatch for BatchNestedLoopJoin { impl ToBatchPb for BatchNestedLoopJoin { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::NestedLoopJoin(NestedLoopJoinNode { - join_type: self.logical.join_type as i32, - join_cond: Some(ExprImpl::from(self.logical.on.clone()).to_expr_proto()), - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), + join_type: self.core.join_type as i32, + join_cond: Some(ExprImpl::from(self.core.on.clone()).to_expr_proto()), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), }) } } @@ -138,8 +133,8 @@ impl ExprRewritable for BatchNestedLoopJoin { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/batch_over_window.rs b/src/frontend/src/optimizer/plan_node/batch_over_window.rs index aa6e53246697e..f04587059aecd 100644 --- a/src/frontend/src/optimizer/plan_node/batch_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_over_window.rs @@ -28,27 +28,26 @@ use crate::optimizer::property::{Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchOverWindow { pub base: PlanBase, - logical: generic::OverWindow, + core: generic::OverWindow, } impl BatchOverWindow { - pub fn new(logical: generic::OverWindow) -> Self { - assert!(logical.funcs_have_same_partition_and_order()); + pub fn new(core: generic::OverWindow) -> Self { + assert!(core.funcs_have_same_partition_and_order()); - let input = &logical.input; + let input = &core.input; let input_dist = input.distribution().clone(); let order = Order::new( - logical - .partition_key_indices() + core.partition_key_indices() .into_iter() .map(|idx| ColumnOrder::new(idx, OrderType::default())) - .chain(logical.order_key().iter().cloned()) + .chain(core.order_key().iter().cloned()) .collect(), ); - let base = PlanBase::new_batch_from_logical(&logical, input_dist, order); - BatchOverWindow { base, logical } + let base = PlanBase::new_batch_from_logical(&core, input_dist, order); + BatchOverWindow { base, core } } fn expected_input_order(&self) -> Order { @@ -56,17 +55,17 @@ impl BatchOverWindow { } } -impl_distill_by_unit!(BatchOverWindow, logical, "BatchOverWindow"); +impl_distill_by_unit!(BatchOverWindow, core, "BatchOverWindow"); impl PlanTreeNodeUnary for BatchOverWindow { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -78,7 +77,7 @@ impl ToDistributedBatch for BatchOverWindow { &self.expected_input_order(), &RequiredDist::shard_by_key( self.input().schema().len(), - &self.logical.partition_key_indices(), + &self.core.partition_key_indices(), ), )?; Ok(self.clone_with_input(new_input).into()) @@ -98,13 +97,13 @@ impl ToBatchPb for BatchOverWindow { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::SortOverWindow(SortOverWindowNode { calls: self - .logical + .core .window_functions() .iter() .map(PlanWindowFunction::to_protobuf) .collect(), partition_by: self - .logical + .core .partition_key_indices() .into_iter() .map(|idx| idx as _) diff --git a/src/frontend/src/optimizer/plan_node/batch_project.rs b/src/frontend/src/optimizer/plan_node/batch_project.rs index d3979b8aebdee..591d7d13caed8 100644 --- a/src/frontend/src/optimizer/plan_node/batch_project.rs +++ b/src/frontend/src/optimizer/plan_node/batch_project.rs @@ -31,46 +31,46 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchProject { pub base: PlanBase, - logical: generic::Project, + core: generic::Project, } impl BatchProject { - pub fn new(logical: generic::Project) -> Self { - let distribution = logical + pub fn new(core: generic::Project) -> Self { + let distribution = core .i2o_col_mapping() - .rewrite_provided_distribution(logical.input.distribution()); - let order = logical + .rewrite_provided_distribution(core.input.distribution()); + let order = core .i2o_col_mapping() - .rewrite_provided_order(logical.input.order()); + .rewrite_provided_order(core.input.order()); - let base = PlanBase::new_batch_from_logical(&logical, distribution, order); - BatchProject { base, logical } + let base = PlanBase::new_batch_from_logical(&core, distribution, order); + BatchProject { base, core } } pub fn as_logical(&self) -> &generic::Project { - &self.logical + &self.core } pub fn exprs(&self) -> &Vec { - &self.logical.exprs + &self.core.exprs } } impl Distill for BatchProject { fn distill<'a>(&self) -> XmlNode<'a> { - childless_record("BatchProject", self.logical.fields_pretty(self.schema())) + childless_record("BatchProject", self.core.fields_pretty(self.schema())) } } impl PlanTreeNodeUnary for BatchProject { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -86,7 +86,7 @@ impl ToDistributedBatch for BatchProject { impl ToBatchPb for BatchProject { fn to_batch_prost_body(&self) -> NodeBody { let select_list = self - .logical + .core .exprs .iter() .map(|expr| expr.to_expr_proto()) @@ -108,8 +108,8 @@ impl ExprRewritable for BatchProject { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/batch_project_set.rs b/src/frontend/src/optimizer/plan_node/batch_project_set.rs index b86211aaaa211..331ca8e5235de 100644 --- a/src/frontend/src/optimizer/plan_node/batch_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/batch_project_set.rs @@ -29,35 +29,35 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchProjectSet { pub base: PlanBase, - logical: generic::ProjectSet, + core: generic::ProjectSet, } impl BatchProjectSet { - pub fn new(logical: generic::ProjectSet) -> Self { - let distribution = logical + pub fn new(core: generic::ProjectSet) -> Self { + let distribution = core .i2o_col_mapping() - .rewrite_provided_distribution(logical.input.distribution()); + .rewrite_provided_distribution(core.input.distribution()); let base = PlanBase::new_batch_from_logical( - &logical, + &core, distribution, - logical.get_out_column_index_order(), + core.get_out_column_index_order(), ); - BatchProjectSet { base, logical } + BatchProjectSet { base, core } } } -impl_distill_by_unit!(BatchProjectSet, logical, "BatchProjectSet"); +impl_distill_by_unit!(BatchProjectSet, core, "BatchProjectSet"); impl PlanTreeNodeUnary for BatchProjectSet { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -76,7 +76,7 @@ impl ToBatchPb for BatchProjectSet { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::ProjectSet(ProjectSetNode { select_list: self - .logical + .core .select_list .iter() .map(|select_item| select_item.to_project_set_select_item_proto()) @@ -98,8 +98,8 @@ impl ExprRewritable for BatchProjectSet { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index cfc557fe375c6..dc0e553caf308 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -35,25 +35,25 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchSeqScan { pub base: PlanBase, - logical: generic::Scan, + core: generic::Scan, scan_ranges: Vec, } impl BatchSeqScan { - fn new_inner(logical: generic::Scan, dist: Distribution, scan_ranges: Vec) -> Self { + fn new_inner(core: generic::Scan, dist: Distribution, scan_ranges: Vec) -> Self { let order = if scan_ranges.len() > 1 { Order::any() } else { - logical.get_out_column_index_order() + core.get_out_column_index_order() }; - let base = PlanBase::new_batch_from_logical(&logical, dist, order); + let base = PlanBase::new_batch_from_logical(&core, dist, order); { // validate scan_range scan_ranges.iter().for_each(|scan_range| { assert!(!scan_range.is_full_table_scan()); let scan_pk_prefix_len = scan_range.eq_conds.len(); - let order_len = logical.table_desc.order_column_indices().len(); + let order_len = core.table_desc.order_column_indices().len(); assert!( scan_pk_prefix_len < order_len || (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)), @@ -64,23 +64,23 @@ impl BatchSeqScan { Self { base, - logical, + core, scan_ranges, } } - pub fn new(logical: generic::Scan, scan_ranges: Vec) -> Self { + pub fn new(core: generic::Scan, scan_ranges: Vec) -> Self { // Use `Single` by default, will be updated later with `clone_with_dist`. - Self::new_inner(logical, Distribution::Single, scan_ranges) + Self::new_inner(core, Distribution::Single, scan_ranges) } fn clone_with_dist(&self) -> Self { Self::new_inner( - self.logical.clone(), - if self.logical.is_sys_table { + self.core.clone(), + if self.core.is_sys_table { Distribution::Single } else { - match self.logical.distribution_key() { + match self.core.distribution_key() { None => Distribution::SomeShard, Some(distribution_key) => { if distribution_key.is_empty() { @@ -97,7 +97,7 @@ impl BatchSeqScan { // inserted. Distribution::UpstreamHashShard( distribution_key, - self.logical.table_desc.table_id, + self.core.table_desc.table_id, ) } } @@ -109,8 +109,8 @@ impl BatchSeqScan { /// Get a reference to the batch seq scan's logical. #[must_use] - pub fn logical(&self) -> &generic::Scan { - &self.logical + pub fn core(&self) -> &generic::Scan { + &self.core } pub fn scan_ranges(&self) -> &[ScanRange] { @@ -119,8 +119,8 @@ impl BatchSeqScan { fn scan_ranges_as_strs(&self, verbose: bool) -> Vec { let order_names = match verbose { - true => self.logical.order_names_with_table_prefix(), - false => self.logical.order_names(), + true => self.core.order_names_with_table_prefix(), + false => self.core.order_names(), }; let mut range_strs = vec![]; @@ -182,8 +182,8 @@ impl Distill for BatchSeqScan { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx.is_explain_verbose(); let mut vec = Vec::with_capacity(4); - vec.push(("table", Pretty::from(self.logical.table_name.clone()))); - vec.push(("columns", self.logical.columns_pretty(verbose))); + vec.push(("table", Pretty::from(self.core.table_name.clone()))); + vec.push(("columns", self.core.columns_pretty(verbose))); if !self.scan_ranges.is_empty() { let range_strs = self.scan_ranges_as_strs(verbose); @@ -214,22 +214,22 @@ impl ToDistributedBatch for BatchSeqScan { impl ToBatchPb for BatchSeqScan { fn to_batch_prost_body(&self) -> NodeBody { let column_descs = self - .logical + .core .column_descs() .iter() .map(PbColumnDesc::from) .collect(); - if self.logical.is_sys_table { + if self.core.is_sys_table { NodeBody::SysRowSeqScan(SysRowSeqScanNode { - table_id: self.logical.table_desc.table_id.table_id, + table_id: self.core.table_desc.table_id.table_id, column_descs, }) } else { NodeBody::RowSeqScan(RowSeqScanNode { - table_desc: Some(self.logical.table_desc.to_protobuf()), + table_desc: Some(self.core.table_desc.to_protobuf()), column_ids: self - .logical + .core .output_column_ids() .iter() .map(ColumnId::get_id) @@ -239,7 +239,7 @@ impl ToBatchPb for BatchSeqScan { vnode_bitmap: None, ordered: !self.order().is_any(), chunk_size: self - .logical + .core .chunk_size .map(|chunk_size| ChunkSize { chunk_size }), }) @@ -249,18 +249,18 @@ impl ToBatchPb for BatchSeqScan { impl ToLocalBatch for BatchSeqScan { fn to_local(&self) -> Result { - let dist = if self.logical.is_sys_table { + let dist = if self.core.is_sys_table { Distribution::Single - } else if let Some(distribution_key) = self.logical.distribution_key() + } else if let Some(distribution_key) = self.core.distribution_key() && !distribution_key.is_empty() { - Distribution::UpstreamHashShard(distribution_key, self.logical.table_desc.table_id) + Distribution::UpstreamHashShard(distribution_key, self.core.table_desc.table_id) } else { // NOTE(kwannoel): This is a hack to force an exchange to always be inserted before // scan. Distribution::SomeShard }; - Ok(Self::new_inner(self.logical.clone(), dist, self.scan_ranges.clone()).into()) + Ok(Self::new_inner(self.core.clone(), dist, self.scan_ranges.clone()).into()) } } @@ -270,8 +270,8 @@ impl ExprRewritable for BatchSeqScan { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical, self.scan_ranges.clone()).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core, self.scan_ranges.clone()).into() } } diff --git a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs index b414779385200..ff184324a5fb9 100644 --- a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -26,18 +26,18 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchSimpleAgg { pub base: PlanBase, - logical: generic::Agg, + core: generic::Agg, } impl BatchSimpleAgg { - pub fn new(logical: generic::Agg) -> Self { - let input_dist = logical.input.distribution().clone(); - let base = PlanBase::new_batch_from_logical(&logical, input_dist, Order::any()); - BatchSimpleAgg { base, logical } + pub fn new(core: generic::Agg) -> Self { + let input_dist = core.input.distribution().clone(); + let base = PlanBase::new_batch_from_logical(&core, input_dist, Order::any()); + BatchSimpleAgg { base, core } } pub fn agg_calls(&self) -> &[PlanAggCall] { - &self.logical.agg_calls + &self.core.agg_calls } fn two_phase_agg_enabled(&self) -> bool { @@ -46,24 +46,24 @@ impl BatchSimpleAgg { } pub(crate) fn can_two_phase_agg(&self) -> bool { - self.logical.can_two_phase_agg() && self.two_phase_agg_enabled() + self.core.can_two_phase_agg() && self.two_phase_agg_enabled() } } impl PlanTreeNodeUnary for BatchSimpleAgg { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { Self::new(generic::Agg { input, - ..self.logical.clone() + ..self.core.clone() }) } } impl_plan_tree_node_for_unary! { BatchSimpleAgg } -impl_distill_by_unit!(BatchSimpleAgg, logical, "BatchSimpleAgg"); +impl_distill_by_unit!(BatchSimpleAgg, core, "BatchSimpleAgg"); impl ToDistributedBatch for BatchSimpleAgg { fn to_distributed(&self) -> Result { @@ -83,7 +83,7 @@ impl ToDistributedBatch for BatchSimpleAgg { // insert total agg let total_agg_types = self - .logical + .core .agg_calls .iter() .enumerate() @@ -92,7 +92,7 @@ impl ToDistributedBatch for BatchSimpleAgg { }) .collect(); let total_agg_logical = - generic::Agg::new(total_agg_types, self.logical.group_key.clone(), exchange); + generic::Agg::new(total_agg_types, self.core.group_key.clone(), exchange); Ok(BatchSimpleAgg::new(total_agg_logical).into()) } else { let new_input = self @@ -134,8 +134,8 @@ impl ExprRewritable for BatchSimpleAgg { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs index 241f1195352e3..00facef473a37 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs @@ -28,18 +28,18 @@ use crate::utils::{ColIndexMappingRewriteExt, IndexSet}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchSortAgg { pub base: PlanBase, - logical: generic::Agg, + core: generic::Agg, input_order: Order, } impl BatchSortAgg { - pub fn new(logical: generic::Agg) -> Self { - assert!(!logical.group_key.is_empty()); - assert!(logical.input_provides_order_on_group_keys()); + pub fn new(core: generic::Agg) -> Self { + assert!(!core.group_key.is_empty()); + assert!(core.input_provides_order_on_group_keys()); - let input = logical.input.clone(); + let input = core.input.clone(); let input_dist = input.distribution(); - let dist = logical + let dist = core .i2o_col_mapping() .rewrite_provided_distribution(input_dist); let input_order = Order { @@ -47,46 +47,44 @@ impl BatchSortAgg { .order() .column_orders .iter() - .filter(|o| logical.group_key.indices().any(|g_k| g_k == o.column_index)) + .filter(|o| core.group_key.indices().any(|g_k| g_k == o.column_index)) .cloned() .collect(), }; - let order = logical - .i2o_col_mapping() - .rewrite_provided_order(&input_order); + let order = core.i2o_col_mapping().rewrite_provided_order(&input_order); - let base = PlanBase::new_batch_from_logical(&logical, dist, order); + let base = PlanBase::new_batch_from_logical(&core, dist, order); BatchSortAgg { base, - logical, + core, input_order, } } pub fn agg_calls(&self) -> &[PlanAggCall] { - &self.logical.agg_calls + &self.core.agg_calls } pub fn group_key(&self) -> &IndexSet { - &self.logical.group_key + &self.core.group_key } } impl PlanTreeNodeUnary for BatchSortAgg { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! { BatchSortAgg } -impl_distill_by_unit!(BatchSortAgg, logical, "BatchSortAgg"); +impl_distill_by_unit!(BatchSortAgg, core, "BatchSortAgg"); impl ToDistributedBatch for BatchSortAgg { fn to_distributed(&self) -> Result { @@ -136,7 +134,7 @@ impl ExprRewritable for BatchSortAgg { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut new_logical = self.logical.clone(); + let mut new_logical = self.core.clone(); new_logical.rewrite_exprs(r); Self::new(new_logical).into() } diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 3adfbf670343a..8d43b4a7f6663 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -30,19 +30,19 @@ use crate::optimizer::property::{Distribution, Order}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchSource { pub base: PlanBase, - logical: generic::Source, + core: generic::Source, } impl BatchSource { - pub fn new(logical: generic::Source) -> Self { + pub fn new(core: generic::Source) -> Self { let base = PlanBase::new_batch_from_logical( - &logical, + &core, // Use `Single` by default, will be updated later with `clone_with_dist`. Distribution::Single, Order::any(), ); - Self { base, logical } + Self { base, core } } pub fn column_names(&self) -> Vec<&str> { @@ -50,11 +50,11 @@ impl BatchSource { } pub fn source_catalog(&self) -> Option> { - self.logical.catalog.clone() + self.core.catalog.clone() } pub fn kafka_timestamp_range_value(&self) -> (Option, Option) { - self.logical.kafka_timestamp_range_value() + self.core.kafka_timestamp_range_value() } pub fn clone_with_dist(&self) -> Self { @@ -62,7 +62,7 @@ impl BatchSource { base.dist = Distribution::SomeShard; Self { base, - logical: self.logical.clone(), + core: self.core.clone(), } } } @@ -100,7 +100,7 @@ impl ToBatchPb for BatchSource { source_id: source_catalog.id, info: Some(source_catalog.info.clone()), columns: self - .logical + .core .column_catalog .iter() .map(|c| c.to_protobuf()) diff --git a/src/frontend/src/optimizer/plan_node/batch_topn.rs b/src/frontend/src/optimizer/plan_node/batch_topn.rs index b8b5ba710e468..e5f44bd2ef0e2 100644 --- a/src/frontend/src/optimizer/plan_node/batch_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_topn.rs @@ -29,38 +29,34 @@ use crate::optimizer::property::{Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchTopN { pub base: PlanBase, - logical: generic::TopN, + core: generic::TopN, } impl BatchTopN { - pub fn new(logical: generic::TopN) -> Self { - assert!(logical.group_key.is_empty()); + pub fn new(core: generic::TopN) -> Self { + assert!(core.group_key.is_empty()); let base = PlanBase::new_batch_from_logical( - &logical, - logical.input.distribution().clone(), + &core, + core.input.distribution().clone(), // BatchTopN outputs data in the order of specified order - logical.order.clone(), + core.order.clone(), ); - BatchTopN { base, logical } + BatchTopN { base, core } } fn two_phase_topn(&self, input: PlanRef) -> Result { let new_limit = TopNLimit::new( - self.logical.limit_attr.limit() + self.logical.offset, - self.logical.limit_attr.with_ties(), + self.core.limit_attr.limit() + self.core.offset, + self.core.limit_attr.with_ties(), ); let new_offset = 0; - let partial_input: PlanRef = if input.order().satisfies(&self.logical.order) { + let partial_input: PlanRef = if input.order().satisfies(&self.core.order) { let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset); let batch_partial_limit = BatchLimit::new(logical_partial_limit); batch_partial_limit.into() } else { - let logical_partial_topn = generic::TopN::without_group( - input, - new_limit, - new_offset, - self.logical.order.clone(), - ); + let logical_partial_topn = + generic::TopN::without_group(input, new_limit, new_offset, self.core.order.clone()); let batch_partial_topn = Self::new(logical_partial_topn); batch_partial_topn.into() }; @@ -78,17 +74,17 @@ impl BatchTopN { } } -impl_distill_by_unit!(BatchTopN, logical, "BatchTopN"); +impl_distill_by_unit!(BatchTopN, core, "BatchTopN"); impl PlanTreeNodeUnary for BatchTopN { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -102,12 +98,12 @@ impl ToDistributedBatch for BatchTopN { impl ToBatchPb for BatchTopN { fn to_batch_prost_body(&self) -> NodeBody { - let column_orders = self.logical.order.to_protobuf(); + let column_orders = self.core.order.to_protobuf(); NodeBody::TopN(TopNNode { - limit: self.logical.limit_attr.limit(), - offset: self.logical.offset, + limit: self.core.limit_attr.limit(), + offset: self.core.offset, column_orders, - with_ties: self.logical.limit_attr.with_ties(), + with_ties: self.core.limit_attr.with_ties(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_union.rs b/src/frontend/src/optimizer/plan_node/batch_union.rs index 1626d32db2cc8..31b4a541dfe4a 100644 --- a/src/frontend/src/optimizer/plan_node/batch_union.rs +++ b/src/frontend/src/optimizer/plan_node/batch_union.rs @@ -25,12 +25,12 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchUnion { pub base: PlanBase, - logical: generic::Union, + core: generic::Union, } impl BatchUnion { - pub fn new(logical: generic::Union) -> Self { - let dist = if logical + pub fn new(core: generic::Union) -> Self { + let dist = if core .inputs .iter() .all(|input| *input.distribution() == Distribution::Single) @@ -40,21 +40,21 @@ impl BatchUnion { Distribution::SomeShard }; - let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any()); - BatchUnion { base, logical } + let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + BatchUnion { base, core } } } -impl_distill_by_unit!(BatchUnion, logical, "BatchUnion"); +impl_distill_by_unit!(BatchUnion, core, "BatchUnion"); impl PlanTreeNode for BatchUnion { fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> { - smallvec::SmallVec::from_vec(self.logical.inputs.clone()) + smallvec::SmallVec::from_vec(self.core.inputs.clone()) } fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef { // For batch query, we don't need to clone `source_col`, so just use new. - let mut new = self.logical.clone(); + let mut new = self.core.clone(); new.inputs = inputs.to_vec(); Self::new(new).into() } diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 19bb60b9aa1d8..feebedeb07aaf 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -30,32 +30,32 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchUpdate { pub base: PlanBase, - pub logical: generic::Update, + pub core: generic::Update, } impl BatchUpdate { - pub fn new(logical: generic::Update, schema: Schema) -> Self { - assert_eq!(logical.input.distribution(), &Distribution::Single); - let ctx = logical.input.ctx(); + pub fn new(core: generic::Update, schema: Schema) -> Self { + assert_eq!(core.input.distribution(), &Distribution::Single); + let ctx = core.input.ctx(); let base = PlanBase::new_batch(ctx, schema, Distribution::Single, Order::any()); - Self { base, logical } + Self { base, core } } } impl PlanTreeNodeUnary for BatchUpdate { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical, self.schema().clone()) + let mut core = self.core.clone(); + core.input = input; + Self::new(core, self.schema().clone()) } } impl_plan_tree_node_for_unary! { BatchUpdate } -impl_distill_by_unit!(BatchUpdate, logical, "BatchUpdate"); +impl_distill_by_unit!(BatchUpdate, core, "BatchUpdate"); impl ToDistributedBatch for BatchUpdate { fn to_distributed(&self) -> Result { @@ -67,24 +67,19 @@ impl ToDistributedBatch for BatchUpdate { impl ToBatchPb for BatchUpdate { fn to_batch_prost_body(&self) -> NodeBody { - let exprs = self - .logical - .exprs - .iter() - .map(|x| x.to_expr_proto()) - .collect(); + let exprs = self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(); let update_column_indices = self - .logical + .core .update_column_indices .iter() .map(|i| *i as _) .collect_vec(); NodeBody::Update(UpdateNode { exprs, - table_id: self.logical.table_id.table_id(), - table_version_id: self.logical.table_version_id, - returning: self.logical.returning, + table_id: self.core.table_id.table_id(), + table_version_id: self.core.table_version_id, + returning: self.core.returning, update_column_indices, }) } @@ -104,8 +99,8 @@ impl ExprRewritable for BatchUpdate { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical, self.schema().clone()).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core, self.schema().clone()).into() } } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index cffe453879bde..0ad9b828ead4b 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -57,9 +57,9 @@ impl LogicalAgg { /// Should only be used iff input is distributed. Input must be converted to stream form. fn gen_stateless_two_phase_streaming_agg_plan(&self, stream_input: PlanRef) -> Result { debug_assert!(self.group_key().is_empty()); - let mut logical = self.core.clone(); - logical.input = stream_input; - let local_agg = StreamStatelessSimpleAgg::new(logical); + let mut core = self.core.clone(); + core.input = stream_input; + let local_agg = StreamStatelessSimpleAgg::new(core); let exchange = RequiredDist::single().enforce_if_not_satisfies(local_agg.into(), &Order::any())?; let global_agg = new_stream_simple_agg(Agg::new( @@ -165,19 +165,19 @@ impl LogicalAgg { } fn gen_single_plan(&self, stream_input: PlanRef) -> Result { - let mut logical = self.core.clone(); + let mut core = self.core.clone(); let input = RequiredDist::single().enforce_if_not_satisfies(stream_input, &Order::any())?; - logical.input = input; - Ok(new_stream_simple_agg(logical).into()) + core.input = input; + Ok(new_stream_simple_agg(core).into()) } fn gen_shuffle_plan(&self, stream_input: PlanRef) -> Result { let input = RequiredDist::shard_by_key(stream_input.schema().len(), &self.group_key().to_vec()) .enforce_if_not_satisfies(stream_input, &Order::any())?; - let mut logical = self.core.clone(); - logical.input = input; - Ok(new_stream_hash_agg(logical, None).into()) + let mut core = self.core.clone(); + core.input = input; + Ok(new_stream_hash_agg(core, None).into()) } /// Generates distributed stream plan. @@ -1125,13 +1125,13 @@ fn find_or_append_row_count(mut logical: Agg) -> (Agg, usize) (logical, row_count_idx) } -fn new_stream_simple_agg(logical: Agg) -> StreamSimpleAgg { - let (logical, row_count_idx) = find_or_append_row_count(logical); +fn new_stream_simple_agg(core: Agg) -> StreamSimpleAgg { + let (logical, row_count_idx) = find_or_append_row_count(core); StreamSimpleAgg::new(logical, row_count_idx) } -fn new_stream_hash_agg(logical: Agg, vnode_col_idx: Option) -> StreamHashAgg { - let (logical, row_count_idx) = find_or_append_row_count(logical); +fn new_stream_hash_agg(core: Agg, vnode_col_idx: Option) -> StreamHashAgg { + let (logical, row_count_idx) = find_or_append_row_count(core); StreamHashAgg::new(logical, vnode_col_idx, row_count_idx) } diff --git a/src/frontend/src/optimizer/plan_node/logical_insert.rs b/src/frontend/src/optimizer/plan_node/logical_insert.rs index 482c034302a38..f44000b502223 100644 --- a/src/frontend/src/optimizer/plan_node/logical_insert.rs +++ b/src/frontend/src/optimizer/plan_node/logical_insert.rs @@ -142,9 +142,9 @@ impl PredicatePushdown for LogicalInsert { impl ToBatch for LogicalInsert { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; - let mut logical = self.core.clone(); - logical.input = new_input; - Ok(BatchInsert::new(logical).into()) + let mut core = self.core.clone(); + core.input = new_input; + Ok(BatchInsert::new(core).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index 5efe0dac01602..b1796ddc62752 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -834,9 +834,9 @@ impl ToStream for LogicalOverWindow { .enforce_if_not_satisfies(stream_input, &Order::any())?; let sort = StreamEowcSort::new(sort_input, order_key_index); - let mut logical = self.core.clone(); - logical.input = sort.into(); - Ok(StreamEowcOverWindow::new(logical).into()) + let mut core = self.core.clone(); + core.input = sort.into(); + Ok(StreamEowcOverWindow::new(core).into()) } else { // General (Emit-On-Update) case @@ -865,9 +865,9 @@ impl ToStream for LogicalOverWindow { let new_input = RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices) .enforce_if_not_satisfies(stream_input, &Order::any())?; - let mut logical = self.core.clone(); - logical.input = new_input; - Ok(StreamOverWindow::new(logical).into()) + let mut core = self.core.clone(); + core.input = new_input; + Ok(StreamOverWindow::new(core).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index caf04021755f8..1d37da9eaa40f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -211,7 +211,7 @@ impl LogicalSource { false, FixedBitSet::with_capacity(logical_source.column_catalog.len()), ), - logical: logical_source, + core: logical_source, } .into(); new_s3_plan = RequiredDist::shard_by_key(3, &[0]) diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index f29ef16aa48ef..39d97a56fe3a6 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -130,9 +130,9 @@ impl LogicalTopN { fn gen_single_stream_top_n_plan(&self, stream_input: PlanRef) -> Result { let input = RequiredDist::single().enforce_if_not_satisfies(stream_input, &Order::any())?; - let mut logical = self.core.clone(); - logical.input = input; - Ok(StreamTopN::new(logical).into()) + let mut core = self.core.clone(); + core.input = input; + Ok(StreamTopN::new(core).into()) } fn gen_vnode_two_phase_stream_top_n_plan( @@ -336,9 +336,9 @@ impl ToStream for LogicalTopN { let input = self.input().to_stream(ctx)?; let input = RequiredDist::hash_shard(self.group_key()) .enforce_if_not_satisfies(input, &Order::any())?; - let mut logical = self.core.clone(); - logical.input = input; - StreamGroupTopN::new(logical, None).into() + let mut core = self.core.clone(); + core.input = input; + StreamGroupTopN::new(core, None).into() } else { self.gen_dist_stream_top_n_plan(self.input().to_stream(ctx)?)? }) diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs index 847616629355e..6e96d0eab0e93 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs @@ -28,27 +28,27 @@ use crate::TableCatalog; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamDedup { pub base: PlanBase, - logical: generic::Dedup, + core: generic::Dedup, } impl StreamDedup { - pub fn new(logical: generic::Dedup) -> Self { - let input = logical.input.clone(); + pub fn new(core: generic::Dedup) -> Self { + let input = core.input.clone(); // A dedup operator must be append-only. assert!(input.append_only()); let base = PlanBase::new_stream_with_logical( - &logical, + &core, input.distribution().clone(), true, input.emit_on_window_close(), input.watermark_columns().clone(), ); - StreamDedup { base, logical } + StreamDedup { base, core } } pub fn infer_internal_table_catalog(&self) -> TableCatalog { - let schema = self.logical.schema(); + let schema = self.core.schema(); let mut builder = TableCatalogBuilder::new(self.base.ctx().with_options().internal_table_subset()); @@ -56,7 +56,7 @@ impl StreamDedup { builder.add_column(field); }); - self.logical.dedup_cols.iter().for_each(|idx| { + self.core.dedup_cols.iter().for_each(|idx| { builder.add_order_column(*idx, OrderType::ascending()); }); @@ -70,17 +70,17 @@ impl StreamDedup { } // assert!(self.base.append_only()); -impl_distill_by_unit!(StreamDedup, logical, "StreamAppendOnlyDedup"); +impl_distill_by_unit!(StreamDedup, core, "StreamAppendOnlyDedup"); impl PlanTreeNodeUnary for StreamDedup { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -94,7 +94,7 @@ impl StreamNode for StreamDedup { PbNodeBody::AppendOnlyDedup(DedupNode { state_table: Some(table_catalog.to_internal_table_prost()), dedup_column_indices: self - .logical + .core .dedup_cols .iter() .map(|idx| *idx as _) diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 580c3563bfa09..db9e6ac296bbf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -35,7 +35,7 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamDeltaJoin { pub base: PlanBase, - logical: generic::Join, + core: generic::Join, /// The join condition must be equivalent to `logical.on`, but separated into equal and /// non-equal parts to facilitate execution later @@ -43,10 +43,10 @@ pub struct StreamDeltaJoin { } impl StreamDeltaJoin { - pub fn new(logical: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { + pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { // Inner join won't change the append-only behavior of the stream. The rest might. - let append_only = match logical.join_type { - JoinType::Inner => logical.left.append_only() && logical.right.append_only(), + let append_only = match core.join_type { + JoinType::Inner => core.left.append_only() && core.right.append_only(), _ => todo!("delta join only supports inner join for now"), }; if eq_join_predicate.has_non_eq() { @@ -57,18 +57,18 @@ impl StreamDeltaJoin { let dist = Distribution::SomeShard; let watermark_columns = { - let from_left = logical + let from_left = core .l2i_col_mapping() - .rewrite_bitset(logical.left.watermark_columns()); - let from_right = logical + .rewrite_bitset(core.left.watermark_columns()); + let from_right = core .r2i_col_mapping() - .rewrite_bitset(logical.right.watermark_columns()); + .rewrite_bitset(core.right.watermark_columns()); let watermark_columns = from_left.bitand(&from_right); - logical.i2o_col_mapping().rewrite_bitset(&watermark_columns) + core.i2o_col_mapping().rewrite_bitset(&watermark_columns) }; // TODO: derive from input let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, append_only, false, // TODO(rc): derive EOWC property from input @@ -77,7 +77,7 @@ impl StreamDeltaJoin { Self { base, - logical, + core, eq_join_predicate, } } @@ -92,9 +92,9 @@ impl Distill for StreamDeltaJoin { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx.is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); - vec.push(("type", Pretty::debug(&self.logical.join_type))); + vec.push(("type", Pretty::debug(&self.core.join_type))); - let concat_schema = self.logical.concat_schema(); + let concat_schema = self.core.concat_schema(); vec.push(( "predicate", Pretty::debug(&EqJoinPredicateDisplay { @@ -104,7 +104,7 @@ impl Distill for StreamDeltaJoin { )); if verbose { - let data = IndicesDisplay::from_join(&self.logical, &concat_schema); + let data = IndicesDisplay::from_join(&self.core, &concat_schema); vec.push(("output", data)); } @@ -114,18 +114,18 @@ impl Distill for StreamDeltaJoin { impl PlanTreeNodeBinary for StreamDeltaJoin { fn left(&self) -> PlanRef { - self.logical.left.clone() + self.core.left.clone() } fn right(&self) -> PlanRef { - self.logical.right.clone() + self.core.right.clone() } fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.left = left; - logical.right = right; - Self::new(logical, self.eq_join_predicate.clone()) + let mut core = self.core.clone(); + core.left = left; + core.right = right; + Self::new(core, self.eq_join_predicate.clone()) } } @@ -137,13 +137,13 @@ impl StreamNode for StreamDeltaJoin { let right = self.right(); let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() { - stream_table_scan.logical() + stream_table_scan.core() } else { unreachable!(); }; let left_table_desc = &*left_table.table_desc; let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() { - stream_table_scan.logical() + stream_table_scan.core() } else { unreachable!(); }; @@ -153,7 +153,7 @@ impl StreamNode for StreamDeltaJoin { // don't need an intermediate representation. let eq_join_predicate = &self.eq_join_predicate; NodeBody::DeltaIndexJoin(DeltaIndexJoinNode { - join_type: self.logical.join_type as i32, + join_type: self.core.join_type as i32, left_key: eq_join_predicate .left_eq_indexes() .iter() @@ -192,12 +192,7 @@ impl StreamNode for StreamDeltaJoin { .collect(), table_desc: Some(right_table_desc.to_protobuf()), }), - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), }) } } @@ -208,8 +203,8 @@ impl ExprRewritable for StreamDeltaJoin { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical, self.eq_join_predicate.rewrite_exprs(r)).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 797091338915e..9418af8e4a364 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -27,41 +27,41 @@ use crate::TableCatalog; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamEowcOverWindow { pub base: PlanBase, - logical: generic::OverWindow, + core: generic::OverWindow, } impl StreamEowcOverWindow { - pub fn new(logical: generic::OverWindow) -> Self { - assert!(logical.funcs_have_same_partition_and_order()); + pub fn new(core: generic::OverWindow) -> Self { + assert!(core.funcs_have_same_partition_and_order()); - let input = &logical.input; + let input = &core.input; assert!(input.append_only()); assert!(input.emit_on_window_close()); // Should order by a single watermark column. - let order_key = &logical.window_functions[0].order_by; + let order_key = &core.window_functions[0].order_by; assert_eq!(order_key.len(), 1); assert_eq!(order_key[0].order_type, OrderType::ascending()); let order_key_idx = order_key[0].column_index; - let input_watermark_cols = logical.input.watermark_columns(); + let input_watermark_cols = core.input.watermark_columns(); assert!(input_watermark_cols.contains(order_key_idx)); // `EowcOverWindowExecutor` cannot produce any watermark columns, because there may be some // ancient rows in some rarely updated partitions that are emitted at the end of time. - let watermark_columns = FixedBitSet::with_capacity(logical.output_len()); + let watermark_columns = FixedBitSet::with_capacity(core.output_len()); let base = PlanBase::new_stream_with_logical( - &logical, + &core, input.distribution().clone(), true, true, watermark_columns, ); - StreamEowcOverWindow { base, logical } + StreamEowcOverWindow { base, core } } fn window_functions(&self) -> &[PlanWindowFunction] { - &self.logical.window_functions + &self.core.window_functions } fn partition_key_indices(&self) -> Vec { @@ -79,7 +79,7 @@ impl StreamEowcOverWindow { fn infer_state_table(&self) -> TableCatalog { // The EOWC over window state table has the same schema as the input. - let in_fields = self.logical.input.schema().fields(); + let in_fields = self.core.input.schema().fields(); let mut tbl_builder = TableCatalogBuilder::new(self.ctx().with_options().internal_table_subset()); for field in in_fields { @@ -100,29 +100,29 @@ impl StreamEowcOverWindow { tbl_builder.add_order_column(order_key_index, OrderType::ascending()); order_cols.insert(order_key_index); } - for idx in self.logical.input.expect_stream_key() { + for idx in self.core.input.expect_stream_key() { if !order_cols.contains(idx) { tbl_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); } } - let in_dist_key = self.logical.input.distribution().dist_column_indices(); + let in_dist_key = self.core.input.distribution().dist_column_indices(); tbl_builder.build(in_dist_key.to_vec(), read_prefix_len_hint) } } -impl_distill_by_unit!(StreamEowcOverWindow, logical, "StreamEowcOverWindow"); +impl_distill_by_unit!(StreamEowcOverWindow, core, "StreamEowcOverWindow"); impl PlanTreeNodeUnary for StreamEowcOverWindow { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! { StreamEowcOverWindow } diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index c7a59b1f847fe..e0f8852a19fb5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -25,12 +25,12 @@ use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamExpand { pub base: PlanBase, - logical: generic::Expand, + core: generic::Expand, } impl StreamExpand { - pub fn new(logical: generic::Expand) -> Self { - let input = logical.input.clone(); + pub fn new(core: generic::Expand) -> Self { + let input = core.input.clone(); let dist = match input.distribution() { Distribution::Single => Distribution::Single, @@ -40,7 +40,7 @@ impl StreamExpand { Distribution::Broadcast => unreachable!(), }; - let mut watermark_columns = FixedBitSet::with_capacity(logical.output_len()); + let mut watermark_columns = FixedBitSet::with_capacity(core.output_len()); watermark_columns.extend( input .watermark_columns() @@ -49,34 +49,34 @@ impl StreamExpand { ); let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, input.append_only(), input.emit_on_window_close(), watermark_columns, ); - StreamExpand { base, logical } + StreamExpand { base, core } } pub fn column_subsets(&self) -> &[Vec] { - &self.logical.column_subsets + &self.core.column_subsets } } impl PlanTreeNodeUnary for StreamExpand { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! { StreamExpand } -impl_distill_by_unit!(StreamExpand, logical, "StreamExpand"); +impl_distill_by_unit!(StreamExpand, core, "StreamExpand"); impl StreamNode for StreamExpand { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index ed4d506b47aef..ff4d344607776 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -26,43 +26,43 @@ use crate::utils::Condition; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamFilter { pub base: PlanBase, - logical: generic::Filter, + core: generic::Filter, } impl StreamFilter { - pub fn new(logical: generic::Filter) -> Self { - let input = logical.input.clone(); + pub fn new(core: generic::Filter) -> Self { + let input = core.input.clone(); let dist = input.distribution().clone(); // Filter executor won't change the append-only behavior of the stream. let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), ); - StreamFilter { base, logical } + StreamFilter { base, core } } pub fn predicate(&self) -> &Condition { - &self.logical.predicate + &self.core.predicate } } impl PlanTreeNodeUnary for StreamFilter { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! { StreamFilter } -impl_distill_by_unit!(StreamFilter, logical, "StreamFilter"); +impl_distill_by_unit!(StreamFilter, core, "StreamFilter"); impl StreamNode for StreamFilter { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { @@ -78,8 +78,8 @@ impl ExprRewritable for StreamFilter { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 4a1e449a21ab6..190c05c0a5ba1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -32,7 +32,7 @@ use crate::stream_fragmenter::BuildFragmentGraphState; pub struct StreamFsFetch { pub base: PlanBase, input: PlanRef, - logical: generic::Source, + core: generic::Source, } impl PlanTreeNodeUnary for StreamFsFetch { @@ -41,7 +41,7 @@ impl PlanTreeNodeUnary for StreamFsFetch { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(input, self.logical.clone()) + Self::new(input, self.core.clone()) } } impl_plan_tree_node_for_unary! { StreamFsFetch } @@ -59,12 +59,12 @@ impl StreamFsFetch { Self { base, input, - logical: source, + core: source, } } fn get_columns(&self) -> Vec<&str> { - self.logical + self.core .column_catalog .iter() .map(|column| column.name()) @@ -72,7 +72,7 @@ impl StreamFsFetch { } pub fn source_catalog(&self) -> Option> { - self.logical.catalog.clone() + self.core.catalog.clone() } } @@ -103,9 +103,9 @@ impl StreamNode for StreamFsFetch { .to_internal_table_prost(), ), info: Some(source_catalog.info.clone()), - row_id_index: self.logical.row_id_index.map(|index| index as _), + row_id_index: self.core.row_id_index.map(|index| index as _), columns: self - .logical + .core .column_catalog .iter() .map(|c| c.to_protobuf()) diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index a6147164b8d45..14711d353f9d8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -27,24 +27,24 @@ use crate::PlanRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamGroupTopN { pub base: PlanBase, - logical: generic::TopN, + core: generic::TopN, /// an optional column index which is the vnode of each row computed by the input's consistent /// hash distribution vnode_col_idx: Option, } impl StreamGroupTopN { - pub fn new(logical: generic::TopN, vnode_col_idx: Option) -> Self { - assert!(!logical.group_key.is_empty()); - assert!(logical.limit_attr.limit() > 0); - let input = &logical.input; + pub fn new(core: generic::TopN, vnode_col_idx: Option) -> Self { + assert!(!core.group_key.is_empty()); + assert!(core.limit_attr.limit() > 0); + let input = &core.input; let schema = input.schema().clone(); let watermark_columns = if input.append_only() { input.watermark_columns().clone() } else { let mut watermark_columns = FixedBitSet::with_capacity(schema.len()); - for &idx in &logical.group_key { + for &idx in &core.group_key { if input.watermark_columns().contains(idx) { watermark_columns.insert(idx); } @@ -52,7 +52,7 @@ impl StreamGroupTopN { watermark_columns }; - let mut stream_key = logical + let mut stream_key = core .stream_key() .expect("logical node should have stream key here"); if let Some(vnode_col_idx) = vnode_col_idx && stream_key.len() > 1 { @@ -64,10 +64,10 @@ impl StreamGroupTopN { } let base = PlanBase::new_stream( - logical.ctx(), - logical.schema(), + core.ctx(), + core.schema(), Some(stream_key), - logical.functional_dependency(), + core.functional_dependency(), input.distribution().clone(), false, // TODO: https://github.com/risingwavelabs/risingwave/issues/8348 @@ -76,25 +76,25 @@ impl StreamGroupTopN { ); StreamGroupTopN { base, - logical, + core, vnode_col_idx, } } pub fn limit_attr(&self) -> TopNLimit { - self.logical.limit_attr + self.core.limit_attr } pub fn offset(&self) -> u64 { - self.logical.offset + self.core.offset } pub fn topn_order(&self) -> &Order { - &self.logical.order + &self.core.order } pub fn group_key(&self) -> &[usize] { - &self.logical.group_key + &self.core.group_key } } @@ -104,7 +104,7 @@ impl StreamNode for StreamGroupTopN { let input = self.input(); let table = self - .logical + .core .infer_internal_table_catalog( input.schema(), input.ctx(), @@ -134,7 +134,7 @@ impl Distill for StreamGroupTopN { let name = plan_node_name!("StreamGroupTopN", { "append_only", self.input().append_only() }, ); - let mut node = self.logical.distill_with_name(name); + let mut node = self.core.distill_with_name(name); if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { node.fields.push(("output_watermarks".into(), ow)); } @@ -146,13 +146,13 @@ impl_plan_tree_node_for_unary! { StreamGroupTopN } impl PlanTreeNodeUnary for StreamGroupTopN { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical, self.vnode_col_idx) + let mut core = self.core.clone(); + core.input = input; + Self::new(core, self.vnode_col_idx) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 5f0cfc16a4171..25e1ac801f97c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -28,7 +28,7 @@ use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamHashAgg { pub base: PlanBase, - logical: generic::Agg, + core: generic::Agg, /// An optional column index which is the vnode of each row computed by the input's consistent /// hash distribution. @@ -46,38 +46,38 @@ pub struct StreamHashAgg { impl StreamHashAgg { pub fn new( - logical: generic::Agg, + core: generic::Agg, vnode_col_idx: Option, row_count_idx: usize, ) -> Self { - Self::new_with_eowc(logical, vnode_col_idx, row_count_idx, false) + Self::new_with_eowc(core, vnode_col_idx, row_count_idx, false) } pub fn new_with_eowc( - logical: generic::Agg, + core: generic::Agg, vnode_col_idx: Option, row_count_idx: usize, emit_on_window_close: bool, ) -> Self { - assert_eq!(logical.agg_calls[row_count_idx], PlanAggCall::count_star()); + assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star()); - let input = logical.input.clone(); + let input = core.input.clone(); let input_dist = input.distribution(); - let dist = logical + let dist = core .i2o_col_mapping() .rewrite_provided_distribution(input_dist); - let mut watermark_columns = FixedBitSet::with_capacity(logical.output_len()); + let mut watermark_columns = FixedBitSet::with_capacity(core.output_len()); let mut window_col_idx = None; - let mapping = logical.i2o_col_mapping(); + let mapping = core.i2o_col_mapping(); if emit_on_window_close { - let wtmk_group_key = logical.watermark_group_key(input.watermark_columns()); + let wtmk_group_key = core.watermark_group_key(input.watermark_columns()); assert!(wtmk_group_key.len() == 1); // checked in `to_eowc_version` window_col_idx = Some(wtmk_group_key[0]); // EOWC HashAgg only produce one watermark column, i.e. the window column watermark_columns.insert(mapping.map(wtmk_group_key[0])); } else { - for idx in logical.group_key.indices() { + for idx in core.group_key.indices() { if input.watermark_columns().contains(idx) { watermark_columns.insert(mapping.map(idx)); } @@ -86,7 +86,7 @@ impl StreamHashAgg { // Hash agg executor might change the append-only behavior of the stream. let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, emit_on_window_close, // in EOWC mode, we produce append only output emit_on_window_close, @@ -94,7 +94,7 @@ impl StreamHashAgg { ); StreamHashAgg { base, - logical, + core, vnode_col_idx, row_count_idx, emit_on_window_close, @@ -103,22 +103,22 @@ impl StreamHashAgg { } pub fn agg_calls(&self) -> &[PlanAggCall] { - &self.logical.agg_calls + &self.core.agg_calls } pub fn group_key(&self) -> &IndexSet { - &self.logical.group_key + &self.core.group_key } pub(crate) fn i2o_col_mapping(&self) -> ColIndexMapping { - self.logical.i2o_col_mapping() + self.core.i2o_col_mapping() } // TODO(rc): It'll be better to force creation of EOWC version through `new`, especially when we // optimize for 2-phase EOWC aggregation later. pub fn to_eowc_version(&self) -> Result { let input = self.input(); - let wtmk_group_key = self.logical.watermark_group_key(input.watermark_columns()); + let wtmk_group_key = self.core.watermark_group_key(input.watermark_columns()); if wtmk_group_key.is_empty() || wtmk_group_key.len() > 1 { return Err(ErrorCode::NotSupported( @@ -130,7 +130,7 @@ impl StreamHashAgg { } Ok(Self::new_with_eowc( - self.logical.clone(), + self.core.clone(), self.vnode_col_idx, self.row_count_idx, true, @@ -141,7 +141,7 @@ impl StreamHashAgg { impl Distill for StreamHashAgg { fn distill<'a>(&self) -> XmlNode<'a> { - let mut vec = self.logical.fields_pretty(); + let mut vec = self.core.fields_pretty(); if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { vec.push(("output_watermarks", ow)); } @@ -158,13 +158,13 @@ impl Distill for StreamHashAgg { impl PlanTreeNodeUnary for StreamHashAgg { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { let logical = generic::Agg { input, - ..self.logical.clone() + ..self.core.clone() }; Self::new_with_eowc( logical, @@ -179,9 +179,9 @@ impl_plan_tree_node_for_unary! { StreamHashAgg } impl StreamNode for StreamHashAgg { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; - let (intermediate_state_table, agg_states, distinct_dedup_tables) = self - .logical - .infer_tables(&self.base, self.vnode_col_idx, self.window_col_idx); + let (intermediate_state_table, agg_states, distinct_dedup_tables) = + self.core + .infer_tables(&self.base, self.vnode_col_idx, self.window_col_idx); PbNodeBody::HashAgg(HashAggNode { group_key: self.group_key().to_vec_as_u32(), @@ -225,10 +225,10 @@ impl ExprRewritable for StreamHashAgg { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); + let mut core = self.core.clone(); + core.rewrite_exprs(r); Self::new_with_eowc( - logical, + core, self.vnode_col_idx, self.row_count_idx, self.emit_on_window_close, diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 989de9f8757e2..0075b1730b4eb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -38,7 +38,7 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamHashJoin { pub base: PlanBase, - logical: generic::Join, + core: generic::Join, /// The join condition must be equivalent to `logical.on`, but separated into equal and /// non-equal parts to facilitate execution later @@ -63,18 +63,14 @@ pub struct StreamHashJoin { } impl StreamHashJoin { - pub fn new(logical: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { + pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { // Inner join won't change the append-only behavior of the stream. The rest might. - let append_only = match logical.join_type { - JoinType::Inner => logical.left.append_only() && logical.right.append_only(), + let append_only = match core.join_type { + JoinType::Inner => core.left.append_only() && core.right.append_only(), _ => false, }; - let dist = Self::derive_dist( - logical.left.distribution(), - logical.right.distribution(), - &logical, - ); + let dist = Self::derive_dist(core.left.distribution(), core.right.distribution(), &core); let mut inequality_pairs = vec![]; let mut clean_left_state_conjunction_idx = None; @@ -83,8 +79,8 @@ impl StreamHashJoin { // Reorder `eq_join_predicate` by placing the watermark column at the beginning. let mut reorder_idx = vec![]; for (i, (left_key, right_key)) in eq_join_predicate.eq_indexes().iter().enumerate() { - if logical.left.watermark_columns().contains(*left_key) - && logical.right.watermark_columns().contains(*right_key) + if core.left.watermark_columns().contains(*left_key) + && core.right.watermark_columns().contains(*right_key) { reorder_idx.push(i); } @@ -92,14 +88,14 @@ impl StreamHashJoin { let eq_join_predicate = eq_join_predicate.reorder(&reorder_idx); let watermark_columns = { - let l2i = logical.l2i_col_mapping(); - let r2i = logical.r2i_col_mapping(); + let l2i = core.l2i_col_mapping(); + let r2i = core.r2i_col_mapping(); let mut equal_condition_clean_state = false; - let mut watermark_columns = FixedBitSet::with_capacity(logical.internal_column_num()); + let mut watermark_columns = FixedBitSet::with_capacity(core.internal_column_num()); for (left_key, right_key) in eq_join_predicate.eq_indexes() { - if logical.left.watermark_columns().contains(left_key) - && logical.right.watermark_columns().contains(right_key) + if core.left.watermark_columns().contains(left_key) + && core.right.watermark_columns().contains(right_key) { equal_condition_clean_state = true; if let Some(internal) = l2i.try_map(left_key) { @@ -121,20 +117,14 @@ impl StreamHashJoin { ) in original_inequality_pairs { let both_upstream_has_watermark = if key_required_larger < key_required_smaller { - logical - .left - .watermark_columns() - .contains(key_required_larger) - && logical + core.left.watermark_columns().contains(key_required_larger) + && core .right .watermark_columns() .contains(key_required_smaller - left_cols_num) } else { - logical - .left - .watermark_columns() - .contains(key_required_smaller) - && logical + core.left.watermark_columns().contains(key_required_smaller) + && core .right .watermark_columns() .contains(key_required_larger - left_cols_num) @@ -184,12 +174,12 @@ impl StreamHashJoin { )); } } - logical.i2o_col_mapping().rewrite_bitset(&watermark_columns) + core.i2o_col_mapping().rewrite_bitset(&watermark_columns) }; // TODO: derive from input let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, append_only, false, // TODO(rc): derive EOWC property from input @@ -198,7 +188,7 @@ impl StreamHashJoin { Self { base, - logical, + core, eq_join_predicate, inequality_pairs, is_append_only: append_only, @@ -209,7 +199,7 @@ impl StreamHashJoin { /// Get join type pub fn join_type(&self) -> JoinType { - self.logical.join_type + self.core.join_type } /// Get a reference to the batch hash join's eq join predicate. @@ -256,7 +246,7 @@ impl StreamHashJoin { /// Convert this hash join to a delta join plan pub fn into_delta_join(self) -> StreamDeltaJoin { - StreamDeltaJoin::new(self.logical, self.eq_join_predicate) + StreamDeltaJoin::new(self.core, self.eq_join_predicate) } pub fn derive_dist_key_in_join_key(&self) -> Vec { @@ -303,9 +293,9 @@ impl Distill for StreamHashJoin { ); let verbose = self.base.ctx.is_explain_verbose(); let mut vec = Vec::with_capacity(6); - vec.push(("type", Pretty::debug(&self.logical.join_type))); + vec.push(("type", Pretty::debug(&self.core.join_type))); - let concat_schema = self.logical.concat_schema(); + let concat_schema = self.core.concat_schema(); vec.push(( "predicate", Pretty::debug(&EqJoinPredicateDisplay { @@ -331,7 +321,7 @@ impl Distill for StreamHashJoin { } if verbose { - let data = IndicesDisplay::from_join(&self.logical, &concat_schema); + let data = IndicesDisplay::from_join(&self.core, &concat_schema); vec.push(("output", data)); } @@ -341,18 +331,18 @@ impl Distill for StreamHashJoin { impl PlanTreeNodeBinary for StreamHashJoin { fn left(&self) -> PlanRef { - self.logical.left.clone() + self.core.left.clone() } fn right(&self) -> PlanRef { - self.logical.right.clone() + self.core.right.clone() } fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.left = left; - logical.right = right; - Self::new(logical, self.eq_join_predicate.clone()) + let mut core = self.core.clone(); + core.left = left; + core.right = right; + Self::new(core, self.eq_join_predicate.clone()) } } @@ -402,7 +392,7 @@ impl StreamNode for StreamHashJoin { let null_safe_prost = self.eq_join_predicate.null_safes().into_iter().collect(); NodeBody::HashJoin(HashJoinNode { - join_type: self.logical.join_type as i32, + join_type: self.core.join_type as i32, left_key: left_jk_indices_prost, right_key: right_jk_indices_prost, null_safe: null_safe_prost, @@ -443,12 +433,7 @@ impl StreamNode for StreamHashJoin { right_degree_table: Some(right_degree_table.to_internal_table_prost()), left_deduped_input_pk_indices, right_deduped_input_pk_indices, - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), is_append_only: self.is_append_only, }) } @@ -460,8 +445,8 @@ impl ExprRewritable for StreamHashJoin { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical, self.eq_join_predicate.rewrite_exprs(r)).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 3780a6cda3f57..c68b1b307d470 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -27,37 +27,37 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamHopWindow { pub base: PlanBase, - logical: generic::HopWindow, + core: generic::HopWindow, window_start_exprs: Vec, window_end_exprs: Vec, } impl StreamHopWindow { pub fn new( - logical: generic::HopWindow, + core: generic::HopWindow, window_start_exprs: Vec, window_end_exprs: Vec, ) -> Self { - let input = logical.input.clone(); - let i2o = logical.i2o_col_mapping(); + let input = core.input.clone(); + let i2o = core.i2o_col_mapping(); let dist = i2o.rewrite_provided_distribution(input.distribution()); let mut watermark_columns = input.watermark_columns().clone(); - watermark_columns.grow(logical.internal_column_num()); + watermark_columns.grow(core.internal_column_num()); - if watermark_columns.contains(logical.time_col.index) { + if watermark_columns.contains(core.time_col.index) { // Watermark on `time_col` indicates watermark on both `window_start` and `window_end`. - watermark_columns.insert(logical.internal_window_start_col_idx()); - watermark_columns.insert(logical.internal_window_end_col_idx()); + watermark_columns.insert(core.internal_window_start_col_idx()); + watermark_columns.insert(core.internal_window_end_col_idx()); } let watermark_columns = ColIndexMapping::with_remaining_columns( - &logical.output_indices, - logical.internal_column_num(), + &core.output_indices, + core.internal_column_num(), ) .rewrite_bitset(&watermark_columns); let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, input.append_only(), input.emit_on_window_close(), @@ -65,7 +65,7 @@ impl StreamHopWindow { ); Self { base, - logical, + core, window_start_exprs, window_end_exprs, } @@ -74,7 +74,7 @@ impl StreamHopWindow { impl Distill for StreamHopWindow { fn distill<'a>(&self) -> XmlNode<'a> { - let mut vec = self.logical.fields_pretty(); + let mut vec = self.core.fields_pretty(); if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { vec.push(("output_watermarks", ow)); } @@ -84,14 +84,14 @@ impl Distill for StreamHopWindow { impl PlanTreeNodeUnary for StreamHopWindow { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; + let mut core = self.core.clone(); + core.input = input; Self::new( - logical, + core, self.window_start_exprs.clone(), self.window_end_exprs.clone(), ) @@ -103,15 +103,10 @@ impl_plan_tree_node_for_unary! {StreamHopWindow} impl StreamNode for StreamHopWindow { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { PbNodeBody::HopWindow(HopWindowNode { - time_col: self.logical.time_col.index() as _, - window_slide: Some(self.logical.window_slide.into()), - window_size: Some(self.logical.window_size.into()), - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), + time_col: self.core.time_col.index() as _, + window_slide: Some(self.core.window_slide.into()), + window_size: Some(self.core.window_size.into()), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), window_start_exprs: self .window_start_exprs .clone() @@ -135,7 +130,7 @@ impl ExprRewritable for StreamHopWindow { fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { Self::new( - self.logical.clone(), + self.core.clone(), self.window_start_exprs .clone() .into_iter() diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index d3a89129b4b82..0d749f0c7b0e6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -27,69 +27,69 @@ use crate::TableCatalog; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamOverWindow { pub base: PlanBase, - logical: generic::OverWindow, + core: generic::OverWindow, } impl StreamOverWindow { - pub fn new(logical: generic::OverWindow) -> Self { - assert!(logical.funcs_have_same_partition_and_order()); + pub fn new(core: generic::OverWindow) -> Self { + assert!(core.funcs_have_same_partition_and_order()); - let input = &logical.input; - let watermark_columns = FixedBitSet::with_capacity(logical.output_len()); + let input = &core.input; + let watermark_columns = FixedBitSet::with_capacity(core.output_len()); let base = PlanBase::new_stream_with_logical( - &logical, + &core, input.distribution().clone(), false, // general over window cannot be append-only false, watermark_columns, ); - StreamOverWindow { base, logical } + StreamOverWindow { base, core } } fn infer_state_table(&self) -> TableCatalog { let mut tbl_builder = TableCatalogBuilder::new(self.ctx().with_options().internal_table_subset()); - let out_schema = self.logical.schema(); + let out_schema = self.core.schema(); for field in out_schema.fields() { tbl_builder.add_column(field); } let mut order_cols = HashSet::new(); - for idx in self.logical.partition_key_indices() { + for idx in self.core.partition_key_indices() { if order_cols.insert(idx) { tbl_builder.add_order_column(idx, OrderType::ascending()); } } let read_prefix_len_hint = tbl_builder.get_current_pk_len(); - for o in self.logical.order_key() { + for o in self.core.order_key() { if order_cols.insert(o.column_index) { tbl_builder.add_order_column(o.column_index, o.order_type); } } - for &idx in self.logical.input.expect_stream_key() { + for &idx in self.core.input.expect_stream_key() { if order_cols.insert(idx) { tbl_builder.add_order_column(idx, OrderType::ascending()); } } - let in_dist_key = self.logical.input.distribution().dist_column_indices(); + let in_dist_key = self.core.input.distribution().dist_column_indices(); tbl_builder.build(in_dist_key.to_vec(), read_prefix_len_hint) } } -impl_distill_by_unit!(StreamOverWindow, logical, "StreamOverWindow"); +impl_distill_by_unit!(StreamOverWindow, core, "StreamOverWindow"); impl PlanTreeNodeUnary for StreamOverWindow { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! { StreamOverWindow } @@ -99,19 +99,19 @@ impl StreamNode for StreamOverWindow { use risingwave_pb::stream_plan::*; let calls = self - .logical + .core .window_functions() .iter() .map(PlanWindowFunction::to_protobuf) .collect(); let partition_by = self - .logical + .core .partition_key_indices() .into_iter() .map(|idx| idx as _) .collect(); let order_by = self - .logical + .core .order_key() .iter() .map(ColumnOrder::to_protobuf) diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index 8e1b30eaafad2..8a7665881e0cf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -28,7 +28,7 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamProject { pub base: PlanBase, - logical: generic::Project, + core: generic::Project, /// All the watermark derivations, (input_column_index, output_column_index). And the /// derivation expression is the project's expression itself. watermark_derivations: Vec<(usize, usize)>, @@ -39,7 +39,7 @@ pub struct StreamProject { impl Distill for StreamProject { fn distill<'a>(&self) -> XmlNode<'a> { let schema = self.schema(); - let mut vec = self.logical.fields_pretty(schema); + let mut vec = self.core.fields_pretty(schema); if let Some(display_output_watermarks) = watermark_pretty(&self.base.watermark_columns, schema) { @@ -50,16 +50,16 @@ impl Distill for StreamProject { } impl StreamProject { - pub fn new(logical: generic::Project) -> Self { - let input = logical.input.clone(); - let distribution = logical + pub fn new(core: generic::Project) -> Self { + let input = core.input.clone(); + let distribution = core .i2o_col_mapping() .rewrite_provided_distribution(input.distribution()); let mut watermark_derivations = vec![]; let mut nondecreasing_exprs = vec![]; - let mut watermark_columns = FixedBitSet::with_capacity(logical.exprs.len()); - for (expr_idx, expr) in logical.exprs.iter().enumerate() { + let mut watermark_columns = FixedBitSet::with_capacity(core.exprs.len()); + for (expr_idx, expr) in core.exprs.iter().enumerate() { match try_derive_watermark(expr) { WatermarkDerivation::Watermark(input_idx) => { if input.watermark_columns().contains(input_idx) { @@ -80,7 +80,7 @@ impl StreamProject { // Project executor won't change the append-only behavior of the stream, so it depends on // input's `append_only`. let base = PlanBase::new_stream_with_logical( - &logical, + &core, distribution, input.append_only(), input.emit_on_window_close(), @@ -88,30 +88,30 @@ impl StreamProject { ); StreamProject { base, - logical, + core, watermark_derivations, nondecreasing_exprs, } } pub fn as_logical(&self) -> &generic::Project { - &self.logical + &self.core } pub fn exprs(&self) -> &Vec { - &self.logical.exprs + &self.core.exprs } } impl PlanTreeNodeUnary for StreamProject { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! {StreamProject} @@ -124,12 +124,7 @@ impl StreamNode for StreamProject { .map(|(i, o)| (*i as u32, *o as u32)) .unzip(); PbNodeBody::Project(ProjectNode { - select_list: self - .logical - .exprs - .iter() - .map(|x| x.to_expr_proto()) - .collect(), + select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(), watermark_input_cols, watermark_output_cols, nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(), @@ -143,8 +138,8 @@ impl ExprRewritable for StreamProject { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 97c4b70433cb1..cadd600f3c3b7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -26,7 +26,7 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamProjectSet { pub base: PlanBase, - logical: generic::ProjectSet, + core: generic::ProjectSet, /// All the watermark derivations, (input_column_idx, expr_idx). And the /// derivation expression is the project_set's expression itself. watermark_derivations: Vec<(usize, usize)>, @@ -36,16 +36,16 @@ pub struct StreamProjectSet { } impl StreamProjectSet { - pub fn new(logical: generic::ProjectSet) -> Self { - let input = logical.input.clone(); - let distribution = logical + pub fn new(core: generic::ProjectSet) -> Self { + let input = core.input.clone(); + let distribution = core .i2o_col_mapping() .rewrite_provided_distribution(input.distribution()); let mut watermark_derivations = vec![]; let mut nondecreasing_exprs = vec![]; - let mut watermark_columns = FixedBitSet::with_capacity(logical.output_len()); - for (expr_idx, expr) in logical.select_list.iter().enumerate() { + let mut watermark_columns = FixedBitSet::with_capacity(core.output_len()); + for (expr_idx, expr) in core.select_list.iter().enumerate() { match try_derive_watermark(expr) { WatermarkDerivation::Watermark(input_idx) => { if input.watermark_columns().contains(input_idx) { @@ -67,7 +67,7 @@ impl StreamProjectSet { // ProjectSet executor won't change the append-only behavior of the stream, so it depends on // input's `append_only`. let base = PlanBase::new_stream_with_logical( - &logical, + &core, distribution, input.append_only(), input.emit_on_window_close(), @@ -75,24 +75,24 @@ impl StreamProjectSet { ); StreamProjectSet { base, - logical, + core, watermark_derivations, nondecreasing_exprs, } } } -impl_distill_by_unit!(StreamProjectSet, logical, "StreamProjectSet"); +impl_distill_by_unit!(StreamProjectSet, core, "StreamProjectSet"); impl_plan_tree_node_for_unary! { StreamProjectSet } impl PlanTreeNodeUnary for StreamProjectSet { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -105,7 +105,7 @@ impl StreamNode for StreamProjectSet { .unzip(); PbNodeBody::ProjectSet(ProjectSetNode { select_list: self - .logical + .core .select_list .iter() .map(|select_item| select_item.to_project_set_select_item_proto()) @@ -123,8 +123,8 @@ impl ExprRewritable for StreamProjectSet { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 969527fa69702..8b406005f40a6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -26,22 +26,22 @@ use crate::Explain; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamShare { pub base: PlanBase, - logical: generic::Share, + core: generic::Share, } impl StreamShare { - pub fn new(logical: generic::Share) -> Self { - let input = logical.input.borrow().0.clone(); + pub fn new(core: generic::Share) -> Self { + let input = core.input.borrow().0.clone(); let dist = input.distribution().clone(); // Filter executor won't change the append-only behavior of the stream. let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), ); - StreamShare { base, logical } + StreamShare { base, core } } } @@ -53,19 +53,19 @@ impl Distill for StreamShare { impl PlanTreeNodeUnary for StreamShare { fn input(&self) -> PlanRef { - self.logical.input.borrow().clone() + self.core.input.borrow().clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let logical = self.logical.clone(); - logical.replace_input(input); - Self::new(logical) + let core = self.core.clone(); + core.replace_input(input); + Self::new(core) } } impl StreamShare { pub fn replace_input(&self, plan: PlanRef) { - self.logical.replace_input(plan); + self.core.replace_input(plan); } } diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs index f0c0bab5fae77..59311dd22226c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -27,17 +27,17 @@ use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamSimpleAgg { pub base: PlanBase, - logical: generic::Agg, + core: generic::Agg, /// The index of `count(*)` in `agg_calls`. row_count_idx: usize, } impl StreamSimpleAgg { - pub fn new(logical: generic::Agg, row_count_idx: usize) -> Self { - assert_eq!(logical.agg_calls[row_count_idx], PlanAggCall::count_star()); + pub fn new(core: generic::Agg, row_count_idx: usize) -> Self { + assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star()); - let input = logical.input.clone(); + let input = core.input.clone(); let input_dist = input.distribution(); let dist = match input_dist { Distribution::Single => Distribution::Single, @@ -45,20 +45,19 @@ impl StreamSimpleAgg { }; // Empty because watermark column(s) must be in group key and simple agg have no group key. - let watermark_columns = FixedBitSet::with_capacity(logical.output_len()); + let watermark_columns = FixedBitSet::with_capacity(core.output_len()); // Simple agg executor might change the append-only behavior of the stream. - let base = - PlanBase::new_stream_with_logical(&logical, dist, false, false, watermark_columns); + let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns); StreamSimpleAgg { base, - logical, + core, row_count_idx, } } pub fn agg_calls(&self) -> &[PlanAggCall] { - &self.logical.agg_calls + &self.core.agg_calls } } @@ -67,19 +66,19 @@ impl Distill for StreamSimpleAgg { let name = plan_node_name!("StreamSimpleAgg", { "append_only", self.input().append_only() }, ); - childless_record(name, self.logical.fields_pretty()) + childless_record(name, self.core.fields_pretty()) } } impl PlanTreeNodeUnary for StreamSimpleAgg { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { let logical = generic::Agg { input, - ..self.logical.clone() + ..self.core.clone() }; Self::new(logical, self.row_count_idx) } @@ -90,7 +89,7 @@ impl StreamNode for StreamSimpleAgg { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; let (intermediate_state_table, agg_states, distinct_dedup_tables) = - self.logical.infer_tables(&self.base, None, None); + self.core.infer_tables(&self.base, None, None); PbNodeBody::SimpleAgg(SimpleAggNode { agg_calls: self @@ -138,8 +137,8 @@ impl ExprRewritable for StreamSimpleAgg { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical, self.row_count_idx).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core, self.row_count_idx).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index a870be1de5840..377e2704776bb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -32,23 +32,23 @@ use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamSource { pub base: PlanBase, - pub(crate) logical: generic::Source, + pub(crate) core: generic::Source, } impl StreamSource { - pub fn new(logical: generic::Source) -> Self { + pub fn new(core: generic::Source) -> Self { let base = PlanBase::new_stream_with_logical( - &logical, + &core, Distribution::SomeShard, - logical.catalog.as_ref().map_or(true, |s| s.append_only), + core.catalog.as_ref().map_or(true, |s| s.append_only), false, - FixedBitSet::with_capacity(logical.column_catalog.len()), + FixedBitSet::with_capacity(core.column_catalog.len()), ); - Self { base, logical } + Self { base, core } } pub fn source_catalog(&self) -> Option> { - self.logical.catalog.clone() + self.core.catalog.clone() } } @@ -79,9 +79,9 @@ impl StreamNode for StreamSource { .to_internal_table_prost(), ), info: Some(source_catalog.info.clone()), - row_id_index: self.logical.row_id_index.map(|index| index as _), + row_id_index: self.core.row_id_index.map(|index| index as _), columns: self - .logical + .core .column_catalog .iter() .map(|c| c.to_protobuf()) diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index 296c58944fac0..0af7ebded94d9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -32,52 +32,48 @@ use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamStatelessSimpleAgg { pub base: PlanBase, - logical: generic::Agg, + core: generic::Agg, } impl StreamStatelessSimpleAgg { - pub fn new(logical: generic::Agg) -> Self { - let input = logical.input.clone(); + pub fn new(core: generic::Agg) -> Self { + let input = core.input.clone(); let input_dist = input.distribution(); debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard)); - let mut watermark_columns = FixedBitSet::with_capacity(logical.output_len()); + let mut watermark_columns = FixedBitSet::with_capacity(core.output_len()); // Watermark column(s) must be in group key. - for (idx, input_idx) in logical.group_key.indices().enumerate() { + for (idx, input_idx) in core.group_key.indices().enumerate() { if input.watermark_columns().contains(input_idx) { watermark_columns.insert(idx); } } let base = PlanBase::new_stream_with_logical( - &logical, + &core, input_dist.clone(), input.append_only(), input.emit_on_window_close(), watermark_columns, ); - StreamStatelessSimpleAgg { base, logical } + StreamStatelessSimpleAgg { base, core } } pub fn agg_calls(&self) -> &[PlanAggCall] { - &self.logical.agg_calls + &self.core.agg_calls } } -impl_distill_by_unit!( - StreamStatelessSimpleAgg, - logical, - "StreamStatelessSimpleAgg" -); +impl_distill_by_unit!(StreamStatelessSimpleAgg, core, "StreamStatelessSimpleAgg"); impl PlanTreeNodeUnary for StreamStatelessSimpleAgg { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } impl_plan_tree_node_for_unary! { StreamStatelessSimpleAgg } @@ -112,8 +108,8 @@ impl ExprRewritable for StreamStatelessSimpleAgg { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index dfe1243e881db..907a41db28525 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -40,56 +40,53 @@ use crate::{Explain, TableCatalog}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamTableScan { pub base: PlanBase, - logical: generic::Scan, + core: generic::Scan, batch_plan_id: PlanNodeId, chain_type: ChainType, } impl StreamTableScan { - pub fn new(logical: generic::Scan) -> Self { - Self::new_with_chain_type(logical, ChainType::Backfill) + pub fn new(core: generic::Scan) -> Self { + Self::new_with_chain_type(core, ChainType::Backfill) } - pub fn new_with_chain_type(logical: generic::Scan, chain_type: ChainType) -> Self { - let batch_plan_id = logical.ctx.next_plan_node_id(); + pub fn new_with_chain_type(core: generic::Scan, chain_type: ChainType) -> Self { + let batch_plan_id = core.ctx.next_plan_node_id(); let distribution = { - match logical.distribution_key() { + match core.distribution_key() { Some(distribution_key) => { if distribution_key.is_empty() { Distribution::Single } else { // See also `BatchSeqScan::clone_with_dist`. - Distribution::UpstreamHashShard( - distribution_key, - logical.table_desc.table_id, - ) + Distribution::UpstreamHashShard(distribution_key, core.table_desc.table_id) } } None => Distribution::SomeShard, } }; let base = PlanBase::new_stream_with_logical( - &logical, + &core, distribution, - logical.table_desc.append_only, + core.table_desc.append_only, false, - logical.watermark_columns(), + core.watermark_columns(), ); Self { base, - logical, + core, batch_plan_id, chain_type, } } pub fn table_name(&self) -> &str { - &self.logical.table_name + &self.core.table_name } - pub fn logical(&self) -> &generic::Scan { - &self.logical + pub fn core(&self) -> &generic::Scan { + &self.core } pub fn to_index_scan( @@ -100,7 +97,7 @@ impl StreamTableScan { function_mapping: &HashMap, chain_type: ChainType, ) -> StreamTableScan { - let logical_index_scan = self.logical.to_index_scan( + let logical_index_scan = self.core.to_index_scan( index_name, index_table_desc, primary_to_secondary_mapping, @@ -153,7 +150,7 @@ impl StreamTableScan { ) -> TableCatalog { let properties = self.ctx().with_options().internal_table_subset(); let mut catalog_builder = TableCatalogBuilder::new(properties); - let upstream_schema = &self.logical.table_desc.columns; + let upstream_schema = &self.core.table_desc.columns; // We use vnode as primary key in state table. // If `Distribution::Single`, vnode will just be `VirtualNode::default()`. @@ -161,7 +158,7 @@ impl StreamTableScan { catalog_builder.add_order_column(0, OrderType::ascending()); // pk columns - for col_order in self.logical.primary_key() { + for col_order in self.core.primary_key() { let col = &upstream_schema[col_order.column_index]; catalog_builder.add_column(&Field::from(col)); } @@ -197,8 +194,8 @@ impl Distill for StreamTableScan { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx.is_explain_verbose(); let mut vec = Vec::with_capacity(4); - vec.push(("table", Pretty::from(self.logical.table_name.clone()))); - vec.push(("columns", self.logical.columns_pretty(verbose))); + vec.push(("table", Pretty::from(self.core.table_name.clone()))); + vec.push(("columns", self.core.columns_pretty(verbose))); if verbose { let pk = IndicesDisplay { @@ -242,9 +239,9 @@ impl StreamTableScan { // The required columns from the table (both scan and upstream). let upstream_column_ids = match self.chain_type { // For backfill, we additionally need the primary key columns. - ChainType::Backfill => self.logical.output_and_pk_column_ids(), + ChainType::Backfill => self.core.output_and_pk_column_ids(), ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => { - self.logical.output_column_ids() + self.core.output_column_ids() } ChainType::ChainUnspecified => unreachable!(), } @@ -257,7 +254,7 @@ impl StreamTableScan { .iter() .map(|&id| { let col = self - .logical + .core .table_desc .columns .iter() @@ -268,7 +265,7 @@ impl StreamTableScan { .collect_vec(); let output_indices = self - .logical + .core .output_column_ids() .iter() .map(|i| { @@ -281,7 +278,7 @@ impl StreamTableScan { // TODO: snapshot read of upstream mview let batch_plan_node = BatchPlanNode { - table_desc: Some(self.logical.table_desc.to_protobuf()), + table_desc: Some(self.core.table_desc.to_protobuf()), column_ids: upstream_column_ids.clone(), }; @@ -311,13 +308,13 @@ impl StreamTableScan { }, ], node_body: Some(PbNodeBody::Chain(ChainNode { - table_id: self.logical.table_desc.table_id.table_id, + table_id: self.core.table_desc.table_id.table_id, chain_type: self.chain_type as i32, // The column indices need to be forwarded to the downstream output_indices, upstream_column_ids, // The table desc used by backfill executor - table_desc: Some(self.logical.table_desc.to_protobuf()), + table_desc: Some(self.core.table_desc.to_protobuf()), state_table: Some(catalog), rate_limit: self .base @@ -344,8 +341,8 @@ impl ExprRewritable for StreamTableScan { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new_with_chain_type(logical, self.chain_type).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new_with_chain_type(core, self.chain_type).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index eb883103fe511..2191ca322342d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -32,15 +32,15 @@ use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamTemporalJoin { pub base: PlanBase, - logical: generic::Join, + core: generic::Join, eq_join_predicate: EqJoinPredicate, } impl StreamTemporalJoin { - pub fn new(logical: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { - assert!(logical.join_type == JoinType::Inner || logical.join_type == JoinType::LeftOuter); - assert!(logical.left.append_only()); - let right = logical.right.clone(); + pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { + assert!(core.join_type == JoinType::Inner || core.join_type == JoinType::LeftOuter); + assert!(core.left.append_only()); + let right = core.right.clone(); let exchange: &StreamExchange = right .as_stream_exchange() .expect("should be a no shuffle stream exchange"); @@ -49,22 +49,20 @@ impl StreamTemporalJoin { let scan: &StreamTableScan = exchange_input .as_stream_table_scan() .expect("should be a stream table scan"); - assert!(scan.logical().for_system_time_as_of_proctime); + assert!(scan.core().for_system_time_as_of_proctime); - let l2o = logical - .l2i_col_mapping() - .composite(&logical.i2o_col_mapping()); - let dist = l2o.rewrite_provided_distribution(logical.left.distribution()); + let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping()); + let dist = l2o.rewrite_provided_distribution(core.left.distribution()); // Use left side watermark directly. - let watermark_columns = logical.i2o_col_mapping().rewrite_bitset( - &logical + let watermark_columns = core.i2o_col_mapping().rewrite_bitset( + &core .l2i_col_mapping() - .rewrite_bitset(logical.left.watermark_columns()), + .rewrite_bitset(core.left.watermark_columns()), ); let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, true, false, // TODO(rc): derive EOWC property from input @@ -73,14 +71,14 @@ impl StreamTemporalJoin { Self { base, - logical, + core, eq_join_predicate, } } /// Get join type pub fn join_type(&self) -> JoinType { - self.logical.join_type + self.core.join_type } pub fn eq_join_predicate(&self) -> &EqJoinPredicate { @@ -92,9 +90,9 @@ impl Distill for StreamTemporalJoin { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx.is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); - vec.push(("type", Pretty::debug(&self.logical.join_type))); + vec.push(("type", Pretty::debug(&self.core.join_type))); - let concat_schema = self.logical.concat_schema(); + let concat_schema = self.core.concat_schema(); vec.push(( "predicate", Pretty::debug(&EqJoinPredicateDisplay { @@ -108,7 +106,7 @@ impl Distill for StreamTemporalJoin { } if verbose { - let data = IndicesDisplay::from_join(&self.logical, &concat_schema); + let data = IndicesDisplay::from_join(&self.core, &concat_schema); vec.push(("output", data)); } @@ -118,18 +116,18 @@ impl Distill for StreamTemporalJoin { impl PlanTreeNodeBinary for StreamTemporalJoin { fn left(&self) -> PlanRef { - self.logical.left.clone() + self.core.left.clone() } fn right(&self) -> PlanRef { - self.logical.right.clone() + self.core.right.clone() } fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.left = left; - logical.right = right; - Self::new(logical, self.eq_join_predicate.clone()) + let mut core = self.core.clone(); + core.left = left; + core.right = right; + Self::new(core, self.eq_join_predicate.clone()) } } @@ -155,7 +153,7 @@ impl StreamNode for StreamTemporalJoin { .expect("should be a stream table scan"); NodeBody::TemporalJoin(TemporalJoinNode { - join_type: self.logical.join_type as i32, + join_type: self.core.join_type as i32, left_key: left_jk_indices_prost, right_key: right_jk_indices_prost, null_safe: null_safe_prost, @@ -164,19 +162,9 @@ impl StreamNode for StreamTemporalJoin { .other_cond() .as_expr_unless_true() .map(|x| x.to_expr_proto()), - output_indices: self - .logical - .output_indices - .iter() - .map(|&x| x as u32) - .collect(), - table_desc: Some(scan.logical().table_desc.to_protobuf()), - table_output_indices: scan - .logical() - .output_col_idx - .iter() - .map(|&i| i as _) - .collect(), + output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), + table_desc: Some(scan.core().table_desc.to_protobuf()), + table_output_indices: scan.core().output_col_idx.iter().map(|&i| i as _).collect(), }) } } @@ -187,8 +175,8 @@ impl ExprRewritable for StreamTemporalJoin { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - let mut logical = self.logical.clone(); - logical.rewrite_exprs(r); - Self::new(logical, self.eq_join_predicate.rewrite_exprs(r)).into() + let mut core = self.core.clone(); + core.rewrite_exprs(r); + Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 641b144242a92..e7a880fa7d757 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -26,35 +26,34 @@ use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamTopN { pub base: PlanBase, - logical: generic::TopN, + core: generic::TopN, } impl StreamTopN { - pub fn new(logical: generic::TopN) -> Self { - assert!(logical.group_key.is_empty()); - assert!(logical.limit_attr.limit() > 0); - let input = &logical.input; + pub fn new(core: generic::TopN) -> Self { + assert!(core.group_key.is_empty()); + assert!(core.limit_attr.limit() > 0); + let input = &core.input; let dist = match input.distribution() { Distribution::Single => Distribution::Single, _ => panic!(), }; let watermark_columns = FixedBitSet::with_capacity(input.schema().len()); - let base = - PlanBase::new_stream_with_logical(&logical, dist, false, false, watermark_columns); - StreamTopN { base, logical } + let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns); + StreamTopN { base, core } } pub fn limit_attr(&self) -> TopNLimit { - self.logical.limit_attr + self.core.limit_attr } pub fn offset(&self) -> u64 { - self.logical.offset + self.core.offset } pub fn topn_order(&self) -> &Order { - &self.logical.order + &self.core.order } } @@ -63,19 +62,19 @@ impl Distill for StreamTopN { let name = plan_node_name!("StreamTopN", { "append_only", self.input().append_only() }, ); - self.logical.distill_with_name(name) + self.core.distill_with_name(name) } } impl PlanTreeNodeUnary for StreamTopN { fn input(&self) -> PlanRef { - self.logical.input.clone() + self.core.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - let mut logical = self.logical.clone(); - logical.input = input; - Self::new(logical) + let mut core = self.core.clone(); + core.input = input; + Self::new(core) } } @@ -91,7 +90,7 @@ impl StreamNode for StreamTopN { offset: self.offset(), with_ties: self.limit_attr().with_ties(), table: Some( - self.logical + self.core .infer_internal_table_catalog( input.schema(), input.ctx(), diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs index 1d259115b5ced..8f6353d6be44c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_union.rs +++ b/src/frontend/src/optimizer/plan_node/stream_union.rs @@ -29,17 +29,17 @@ use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamUnion { pub base: PlanBase, - logical: generic::Union, + core: generic::Union, } impl StreamUnion { - pub fn new(logical: generic::Union) -> Self { - let inputs = &logical.inputs; + pub fn new(core: generic::Union) -> Self { + let inputs = &core.inputs; let dist = inputs[0].distribution().clone(); assert!(inputs.iter().all(|input| *input.distribution() == dist)); let watermark_columns = inputs.iter().fold( { - let mut bitset = FixedBitSet::with_capacity(logical.schema().len()); + let mut bitset = FixedBitSet::with_capacity(core.schema().len()); bitset.toggle_range(..); bitset }, @@ -47,19 +47,19 @@ impl StreamUnion { ); let base = PlanBase::new_stream_with_logical( - &logical, + &core, dist, inputs.iter().all(|x| x.append_only()), inputs.iter().all(|x| x.emit_on_window_close()), watermark_columns, ); - StreamUnion { base, logical } + StreamUnion { base, core } } } impl Distill for StreamUnion { fn distill<'a>(&self) -> XmlNode<'a> { - let mut vec = self.logical.fields_pretty(); + let mut vec = self.core.fields_pretty(); if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { vec.push(("output_watermarks", ow)); } @@ -69,11 +69,11 @@ impl Distill for StreamUnion { impl PlanTreeNode for StreamUnion { fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> { - smallvec::SmallVec::from_vec(self.logical.inputs.clone()) + smallvec::SmallVec::from_vec(self.core.inputs.clone()) } fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef { - let mut new = self.logical.clone(); + let mut new = self.core.clone(); new.inputs = inputs.to_vec(); Self::new(new).into() } diff --git a/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs b/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs index a4b7102906c69..770f099aab529 100644 --- a/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/relation_collector_visitor.rs @@ -52,8 +52,8 @@ impl PlanVisitor for RelationCollectorVisitor { } fn visit_batch_seq_scan(&mut self, plan: &crate::optimizer::plan_node::BatchSeqScan) { - if !plan.logical().is_sys_table { - self.relations.insert(plan.logical().table_desc.table_id); + if !plan.core().is_sys_table { + self.relations.insert(plan.core().table_desc.table_id); } } @@ -64,7 +64,7 @@ impl PlanVisitor for RelationCollectorVisitor { } fn visit_stream_table_scan(&mut self, plan: &StreamTableScan) { - let logical = plan.logical(); + let logical = plan.core(); if !logical.is_sys_table { self.relations.insert(logical.table_desc.table_id); } diff --git a/src/frontend/src/optimizer/plan_visitor/sys_table_visitor.rs b/src/frontend/src/optimizer/plan_visitor/sys_table_visitor.rs index 35f4a7246e49a..dcbfb2d93d3f5 100644 --- a/src/frontend/src/optimizer/plan_visitor/sys_table_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/sys_table_visitor.rs @@ -37,7 +37,7 @@ impl PlanVisitor for SysTableVisitor { } fn visit_batch_seq_scan(&mut self, batch_seq_scan: &BatchSeqScan) -> bool { - batch_seq_scan.logical().is_sys_table + batch_seq_scan.core().is_sys_table } fn visit_logical_scan(&mut self, logical_scan: &LogicalScan) -> bool { @@ -45,6 +45,6 @@ impl PlanVisitor for SysTableVisitor { } fn visit_stream_table_scan(&mut self, stream_table_scan: &StreamTableScan) -> bool { - stream_table_scan.logical().is_sys_table + stream_table_scan.core().is_sys_table } } diff --git a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs index d7da70b7a030b..e80fcddb87324 100644 --- a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs +++ b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs @@ -39,11 +39,11 @@ impl PlanVisitor for TemporalJoinValidator { } fn visit_stream_table_scan(&mut self, stream_table_scan: &StreamTableScan) -> bool { - stream_table_scan.logical().for_system_time_as_of_proctime + stream_table_scan.core().for_system_time_as_of_proctime } fn visit_batch_seq_scan(&mut self, batch_seq_scan: &BatchSeqScan) -> bool { - batch_seq_scan.logical().for_system_time_as_of_proctime + batch_seq_scan.core().for_system_time_as_of_proctime } fn visit_logical_scan(&mut self, logical_scan: &LogicalScan) -> bool { diff --git a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs index e9147ce0ec882..30435d635568b 100644 --- a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs +++ b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs @@ -54,7 +54,7 @@ impl Rule for IndexDeltaJoinRule { table_scan: &StreamTableScan, chain_type: ChainType, ) -> Option { - for index in &table_scan.logical().indexes { + for index in &table_scan.core().indexes { // Only full covering index can be used in delta join if !index.full_covering() { continue; @@ -68,7 +68,7 @@ impl Rule for IndexDeltaJoinRule { // keys here. let join_indices_ref_to_index_table = join_indices .iter() - .map(|&i| table_scan.logical().output_col_idx[i]) + .map(|&i| table_scan.core().output_col_idx[i]) .map(|x| *p2s_mapping.get(&x).unwrap()) .collect_vec(); @@ -103,7 +103,7 @@ impl Rule for IndexDeltaJoinRule { } // Primary table is also an index. - let primary_table = table_scan.logical(); + let primary_table = table_scan.core(); if let Some(primary_table_distribution_key) = primary_table.distribution_key() && primary_table_distribution_key == join_indices { @@ -123,7 +123,7 @@ impl Rule for IndexDeltaJoinRule { if chain_type != table_scan.chain_type() { Some( StreamTableScan::new_with_chain_type( - table_scan.logical().clone(), + table_scan.core().clone(), chain_type, ) .into(), diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 7fa512fcbb05a..4e16bc6cd0b21 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -918,11 +918,11 @@ impl BatchPlanFragmenter { } if let Some(scan_node) = node.as_batch_seq_scan() { - let name = scan_node.logical().table_name.to_owned(); - let info = if scan_node.logical().is_sys_table { + let name = scan_node.core().table_name.to_owned(); + let info = if scan_node.core().is_sys_table { TableScanInfo::system_table(name) } else { - let table_desc = &*scan_node.logical().table_desc; + let table_desc = &*scan_node.core().table_desc; let table_catalog = self .catalog_reader .read_guard() @@ -951,11 +951,11 @@ impl BatchPlanFragmenter { return None; } if let Some(insert) = node.as_batch_insert() { - Some(insert.logical.table_id) + Some(insert.core.table_id) } else if let Some(update) = node.as_batch_update() { - Some(update.logical.table_id) + Some(update.core.table_id) } else if let Some(delete) = node.as_batch_delete() { - Some(delete.logical.table_id) + Some(delete.core.table_id) } else { node.inputs() .into_iter()