Skip to content

Commit

Permalink
refactor(optimizer): clean up dead code for optimizer v2 (#12973)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Oct 20, 2023
1 parent 37d0a79 commit fa66cbd
Show file tree
Hide file tree
Showing 26 changed files with 94 additions and 1,602 deletions.
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use self::plan_visitor::{has_batch_exchange, CardinalityVisitor};
use self::property::{Cardinality, RequiredDist};
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::{
BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive,
};
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
use super::generic::GenericPlanRef;
use crate::optimizer::property::Order;

/// A subtrait of [`GenericPlanRef`] for batch plans.
///
/// Due to the lack of refactoring, all plan nodes currently implement this trait
/// through [`super::PlanBase`]. One may still use this trait as a bound for
/// expecting a batch plan, in contrast to [`GenericPlanRef`].
pub trait BatchPlanRef: GenericPlanRef {
fn order(&self) -> &Order;
}
78 changes: 76 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::EitherOrBoth;
use risingwave_common::catalog::Schema;
use itertools::{EitherOrBoth, Itertools};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::plan_common::JoinType;

use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef};
use crate::expr::ExprRewriter;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::stream;
use crate::optimizer::plan_node::utils::TableCatalogBuilder;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
use crate::TableCatalog;

/// [`Join`] combines two relations according to some condition.
///
Expand Down Expand Up @@ -65,6 +70,75 @@ impl<PlanRef> Join<PlanRef> {
}
}

impl<PlanRef: stream::StreamPlanRef> Join<PlanRef> {
/// Return stream hash join internal table catalog and degree table catalog.
pub fn infer_internal_and_degree_table_catalog(
input: &PlanRef,
join_key_indices: Vec<usize>,
dk_indices_in_jk: Vec<usize>,
) -> (TableCatalog, TableCatalog, Vec<usize>) {
let schema = input.schema();

let internal_table_dist_keys = dk_indices_in_jk
.iter()
.map(|idx| join_key_indices[*idx])
.collect_vec();

let degree_table_dist_keys = dk_indices_in_jk.clone();

// The pk of hash join internal and degree table should be join_key + input_pk.
let join_key_len = join_key_indices.len();
let mut pk_indices = join_key_indices;

// dedup the pk in dist key..
let mut deduped_input_pk_indices = vec![];
for input_pk_idx in input.stream_key().unwrap() {
if !pk_indices.contains(input_pk_idx)
&& !deduped_input_pk_indices.contains(input_pk_idx)
{
deduped_input_pk_indices.push(*input_pk_idx);
}
}

pk_indices.extend(deduped_input_pk_indices.clone());

// Build internal table
let mut internal_table_catalog_builder =
TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset());
let internal_columns_fields = schema.fields().to_vec();

internal_columns_fields.iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
});
pk_indices.iter().for_each(|idx| {
internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending())
});

// Build degree table.
let mut degree_table_catalog_builder =
TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset());

let degree_column_field = Field::with_name(DataType::Int64, "_degree");

pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending());
});
degree_table_catalog_builder.add_column(&degree_column_field);
degree_table_catalog_builder
.set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]);

internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone());
degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk);

(
internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len),
degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len),
deduped_input_pk_indices,
)
}
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
fn schema(&self) -> Schema {
let left_schema = self.left.schema();
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::expr::{
WindowFunction,
};
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::{
gen_filter_and_pushdown, BatchSortAgg, ColumnPruningContext, LogicalDedup, LogicalProject,
PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use super::{
};
use crate::expr::{CollectInputRef, Expr, ExprImpl, ExprRewriter, ExprType, InputRef};
use crate::optimizer::plan_node::generic::DynamicFilter;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{
BatchHashJoin, BatchLookupJoin, BatchNestedLoopJoin, ColumnPruningContext, EqJoinPredicate,
Expand Down
3 changes: 0 additions & 3 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,6 @@ impl dyn PlanNode {
}

mod plan_base;
#[macro_use]
mod plan_tree_node_v2;
pub use plan_base::*;
#[macro_use]
mod plan_tree_node;
Expand All @@ -641,7 +639,6 @@ pub use merge_eq_nodes::*;
pub mod batch;
pub mod generic;
pub mod stream;
pub mod stream_derive;

pub use generic::{PlanAggCall, PlanAggCallDisplay};

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ impl stream::StreamPlanRef for PlanBase {
self.emit_on_window_close
}
}

impl batch::BatchPlanRef for PlanBase {
fn order(&self) -> &Order {
&self.order
}
}

impl PlanBase {
pub fn new_logical(
ctx: OptimizerContextRef,
Expand Down
41 changes: 0 additions & 41 deletions src/frontend/src/optimizer/plan_node/plan_tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,6 @@ macro_rules! impl_plan_tree_node_for_leaf {
self.clone().into()
}
}

impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $leaf_node_type {
type PlanRef = crate::optimizer::PlanRef;

fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
smallvec::smallvec![]
}

fn clone_with_inputs(&self, mut inputs: impl Iterator<Item = Self::PlanRef>) -> Self {
assert!(inputs.next().is_none(), "expect exactly no input");
self.clone()
}
}
};
}

Expand All @@ -141,20 +128,6 @@ macro_rules! impl_plan_tree_node_for_unary {
self.clone_with_input(inputs[0].clone()).into()
}
}

impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $unary_node_type {
type PlanRef = crate::optimizer::PlanRef;

fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
smallvec::smallvec![self.input()]
}

fn clone_with_inputs(&self, mut inputs: impl Iterator<Item = Self::PlanRef>) -> Self {
let input = inputs.next().expect("expect exactly 1 input");
assert!(inputs.next().is_none(), "expect exactly 1 input");
self.clone_with_input(input).into()
}
}
};
}

Expand All @@ -174,19 +147,5 @@ macro_rules! impl_plan_tree_node_for_binary {
.into()
}
}
impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $binary_node_type {
type PlanRef = crate::optimizer::PlanRef;

fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> {
smallvec::smallvec![self.left(), self.right()]
}

fn clone_with_inputs(&self, mut inputs: impl Iterator<Item = Self::PlanRef>) -> Self {
let left = inputs.next().expect("expect exactly 2 input");
let right = inputs.next().expect("expect exactly 2 input");
assert!(inputs.next().is_none(), "expect exactly 2 input");
self.clone_with_left_right(left, right).into()
}
}
};
}
124 changes: 0 additions & 124 deletions src/frontend/src/optimizer/plan_node/plan_tree_node_v2.rs

This file was deleted.

Loading

0 comments on commit fa66cbd

Please sign in to comment.