Skip to content

Commit

Permalink
chore(optimizer): rename logical to core (#12975)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Oct 23, 2023
1 parent ebfafe5 commit abafae0
Show file tree
Hide file tree
Showing 53 changed files with 794 additions and 901 deletions.
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bo
fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
fn is_user_table(plan: &PlanRef) -> bool {
plan.as_batch_seq_scan()
.map(|node| !node.logical().is_sys_table)
.map(|node| !node.core().is_sys_table)
.unwrap_or(false)
}

Expand Down Expand Up @@ -649,7 +649,7 @@ fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> boo
fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
fn is_user_table(plan: &PlanRef) -> bool {
plan.as_batch_seq_scan()
.map(|node| !node.logical().is_sys_table)
.map(|node| !node.core().is_sys_table)
.unwrap_or(false)
}

Expand Down
24 changes: 12 additions & 12 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,35 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchDelete {
pub base: PlanBase,
pub logical: generic::Delete<PlanRef>,
pub core: generic::Delete<PlanRef>,
}

impl BatchDelete {
pub fn new(logical: generic::Delete<PlanRef>) -> Self {
assert_eq!(logical.input.distribution(), &Distribution::Single);
pub fn new(core: generic::Delete<PlanRef>) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let base: PlanBase = PlanBase::new_batch_from_logical(
&logical,
logical.input.distribution().clone(),
&core,
core.input.distribution().clone(),
Order::any(),
);
Self { base, logical }
Self { base, core }
}
}

impl PlanTreeNodeUnary for BatchDelete {
fn input(&self) -> PlanRef {
self.logical.input.clone()
self.core.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
let mut core = self.logical.clone();
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}

impl_plan_tree_node_for_unary! { BatchDelete }
impl_distill_by_unit!(BatchDelete, logical, "BatchDelete");
impl_distill_by_unit!(BatchDelete, core, "BatchDelete");

impl ToDistributedBatch for BatchDelete {
fn to_distributed(&self) -> Result<PlanRef> {
Expand All @@ -68,9 +68,9 @@ impl ToDistributedBatch for BatchDelete {
impl ToBatchPb for BatchDelete {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::Delete(DeleteNode {
table_id: self.logical.table_id.table_id(),
table_version_id: self.logical.table_version_id,
returning: self.logical.returning,
table_id: self.core.table_id.table_id(),
table_version_id: self.core.table_version_id,
returning: self.core.returning,
})
}
}
Expand Down
22 changes: 11 additions & 11 deletions src/frontend/src/optimizer/plan_node/batch_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,38 @@ use crate::optimizer::PlanRef;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchExpand {
pub base: PlanBase,
logical: generic::Expand<PlanRef>,
core: generic::Expand<PlanRef>,
}

impl BatchExpand {
pub fn new(logical: generic::Expand<PlanRef>) -> Self {
let dist = match logical.input.distribution() {
pub fn new(core: generic::Expand<PlanRef>) -> Self {
let dist = match core.input.distribution() {
Distribution::Single => Distribution::Single,
Distribution::SomeShard
| Distribution::HashShard(_)
| Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
Distribution::Broadcast => unreachable!(),
};
let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any());
BatchExpand { base, logical }
let base = PlanBase::new_batch_from_logical(&core, dist, Order::any());
BatchExpand { base, core }
}

pub fn column_subsets(&self) -> &[Vec<usize>] {
&self.logical.column_subsets
&self.core.column_subsets
}
}

impl_distill_by_unit!(BatchExpand, logical, "BatchExpand");
impl_distill_by_unit!(BatchExpand, core, "BatchExpand");

impl PlanTreeNodeUnary for BatchExpand {
fn input(&self) -> PlanRef {
self.logical.input.clone()
self.core.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
let mut logical = self.logical.clone();
logical.input = input;
Self::new(logical)
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}

Expand Down
32 changes: 16 additions & 16 deletions src/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,35 @@ use crate::utils::Condition;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchFilter {
pub base: PlanBase,
logical: generic::Filter<PlanRef>,
core: generic::Filter<PlanRef>,
}

impl BatchFilter {
pub fn new(logical: generic::Filter<PlanRef>) -> Self {
pub fn new(core: generic::Filter<PlanRef>) -> Self {
// TODO: derive from input
let base = PlanBase::new_batch_from_logical(
&logical,
logical.input.distribution().clone(),
logical.input.order().clone(),
&core,
core.input.distribution().clone(),
core.input.order().clone(),
);
BatchFilter { base, logical }
BatchFilter { base, core }
}

pub fn predicate(&self) -> &Condition {
&self.logical.predicate
&self.core.predicate
}
}
impl_distill_by_unit!(BatchFilter, logical, "BatchFilter");
impl_distill_by_unit!(BatchFilter, core, "BatchFilter");

impl PlanTreeNodeUnary for BatchFilter {
fn input(&self) -> PlanRef {
self.logical.input.clone()
self.core.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
let mut logical = self.logical.clone();
logical.input = input;
Self::new(logical)
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}

Expand All @@ -70,7 +70,7 @@ impl ToDistributedBatch for BatchFilter {
impl ToBatchPb for BatchFilter {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::Filter(FilterNode {
search_condition: Some(ExprImpl::from(self.logical.predicate.clone()).to_expr_proto()),
search_condition: Some(ExprImpl::from(self.core.predicate.clone()).to_expr_proto()),
})
}
}
Expand All @@ -88,8 +88,8 @@ impl ExprRewritable for BatchFilter {
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let mut logical = self.logical.clone();
logical.rewrite_exprs(r);
Self::new(logical).into()
let mut core = self.core.clone();
core.rewrite_exprs(r);
Self::new(core).into()
}
}
32 changes: 16 additions & 16 deletions src/frontend/src/optimizer/plan_node/batch_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,36 @@ use crate::optimizer::property::{Order, RequiredDist};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchGroupTopN {
pub base: PlanBase,
logical: generic::TopN<PlanRef>,
core: generic::TopN<PlanRef>,
}

impl BatchGroupTopN {
pub fn new(logical: generic::TopN<PlanRef>) -> Self {
assert!(!logical.group_key.is_empty());
pub fn new(core: generic::TopN<PlanRef>) -> Self {
assert!(!core.group_key.is_empty());
let base = PlanBase::new_batch_from_logical(
&logical,
logical.input.distribution().clone(),
&core,
core.input.distribution().clone(),
Order::any(),
);
BatchGroupTopN { base, logical }
BatchGroupTopN { base, core }
}

fn group_key(&self) -> &[usize] {
&self.logical.group_key
&self.core.group_key
}
}

impl_distill_by_unit!(BatchGroupTopN, logical, "BatchGroupTopN");
impl_distill_by_unit!(BatchGroupTopN, core, "BatchGroupTopN");

impl PlanTreeNodeUnary for BatchGroupTopN {
fn input(&self) -> PlanRef {
self.logical.input.clone()
self.core.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
let mut logical = self.logical.clone();
logical.input = input;
Self::new(logical)
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}

Expand All @@ -73,13 +73,13 @@ impl ToDistributedBatch for BatchGroupTopN {

impl ToBatchPb for BatchGroupTopN {
fn to_batch_prost_body(&self) -> NodeBody {
let column_orders = self.logical.order.to_protobuf();
let column_orders = self.core.order.to_protobuf();
NodeBody::GroupTopN(GroupTopNNode {
limit: self.logical.limit_attr.limit(),
offset: self.logical.offset,
limit: self.core.limit_attr.limit(),
offset: self.core.offset,
column_orders,
group_key: self.group_key().iter().map(|c| *c as u32).collect(),
with_ties: self.logical.limit_attr.with_ties(),
with_ties: self.core.limit_attr.with_ties(),
})
}
}
Expand Down
42 changes: 20 additions & 22 deletions src/frontend/src/optimizer/plan_node/batch_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@ use crate::utils::{ColIndexMappingRewriteExt, IndexSet};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHashAgg {
pub base: PlanBase,
logical: generic::Agg<PlanRef>,
core: generic::Agg<PlanRef>,
}

impl BatchHashAgg {
pub fn new(logical: generic::Agg<PlanRef>) -> Self {
assert!(!logical.group_key.is_empty());
let input = logical.input.clone();
pub fn new(core: generic::Agg<PlanRef>) -> Self {
assert!(!core.group_key.is_empty());
let input = core.input.clone();
let input_dist = input.distribution();
let dist = logical
let dist = core
.i2o_col_mapping()
.rewrite_provided_distribution(input_dist);
let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any());
BatchHashAgg { base, logical }
let base = PlanBase::new_batch_from_logical(&core, dist, Order::any());
BatchHashAgg { base, core }
}

pub fn agg_calls(&self) -> &[PlanAggCall] {
&self.logical.agg_calls
&self.core.agg_calls
}

pub fn group_key(&self) -> &IndexSet {
&self.logical.group_key
&self.core.group_key
}

fn to_two_phase_agg(&self, dist_input: PlanRef) -> Result<PlanRef> {
Expand All @@ -68,7 +68,7 @@ impl BatchHashAgg {

// insert total agg
let total_agg_types = self
.logical
.core
.agg_calls
.iter()
.enumerate()
Expand All @@ -95,29 +95,27 @@ impl BatchHashAgg {
}
}

impl_distill_by_unit!(BatchHashAgg, logical, "BatchHashAgg");
impl_distill_by_unit!(BatchHashAgg, core, "BatchHashAgg");

impl PlanTreeNodeUnary for BatchHashAgg {
fn input(&self) -> PlanRef {
self.logical.input.clone()
self.core.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
let mut logical = self.logical.clone();
logical.input = input;
Self::new(logical)
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}

impl_plan_tree_node_for_unary! { BatchHashAgg }
impl ToDistributedBatch for BatchHashAgg {
fn to_distributed(&self) -> Result<PlanRef> {
if self.logical.must_try_two_phase_agg() {
if self.core.must_try_two_phase_agg() {
let input = self.input().to_distributed()?;
let input_dist = input.distribution();
if !self
.logical
.hash_agg_dist_satisfied_by_input_dist(input_dist)
if !self.core.hash_agg_dist_satisfied_by_input_dist(input_dist)
&& matches!(
input_dist,
Distribution::HashShard(_)
Expand Down Expand Up @@ -162,8 +160,8 @@ impl ExprRewritable for BatchHashAgg {
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let mut logical = self.logical.clone();
logical.rewrite_exprs(r);
Self::new(logical).into()
let mut core = self.core.clone();
core.rewrite_exprs(r);
Self::new(core).into()
}
}
Loading

0 comments on commit abafae0

Please sign in to comment.