Skip to content

Commit

Permalink
refactor(plan node): move plannode generic (risingwavelabs#6633)
Browse files Browse the repository at this point in the history
* move filt

* dyn filter

* tmp

* clippy

* clippy

* add license

* refactor

Co-authored-by: ice1000 <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 29, 2022
1 parent cc47a61 commit 1b4cfdb
Show file tree
Hide file tree
Showing 16 changed files with 1,197 additions and 954 deletions.

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::util::sort_util::OrderType;

use crate::optimizer::plan_node::stream;
use crate::optimizer::plan_node::utils::TableCatalogBuilder;
use crate::utils::Condition;
use crate::TableCatalog;

#[derive(Clone, Debug)]
pub struct DynamicFilter<PlanRef> {
/// The predicate (formed with exactly one of < , <=, >, >=)
pub predicate: Condition,
// dist_key_l: Distribution,
pub left_index: usize,
pub left: PlanRef,
pub right: PlanRef,
}

pub fn infer_left_internal_table_catalog(
me: &impl stream::StreamPlanRef,
left_key_index: usize,
) -> TableCatalog {
let schema = me.schema();

let dist_keys = me.distribution().dist_column_indices().to_vec();

// The pk of dynamic filter internal table should be left_key + input_pk.
let mut pk_indices = vec![left_key_index];
// TODO(yuhao): dedup the dist key and pk.
pk_indices.extend(me.logical_pk());

let mut internal_table_catalog_builder =
TableCatalogBuilder::new(me.ctx().inner().with_options.internal_table_subset());

schema.fields().iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
});

pk_indices.iter().for_each(|idx| {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending)
});

internal_table_catalog_builder.build(dist_keys)
}

pub fn infer_right_internal_table_catalog(input: &impl stream::StreamPlanRef) -> TableCatalog {
let schema = input.schema();

// We require that the right table has distribution `Single`
assert_eq!(
input.distribution().dist_column_indices().to_vec(),
Vec::<usize>::new()
);

let mut internal_table_catalog_builder =
TableCatalogBuilder::new(input.ctx().inner().with_options.internal_table_subset());

schema.fields().iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
});

// No distribution keys
internal_table_catalog_builder.build(vec![])
}
76 changes: 76 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::{Field, FieldDisplay, Schema};
use risingwave_common::types::DataType;

use super::{GenericPlanNode, GenericPlanRef};
use crate::session::OptimizerContextRef;

/// [`Expand`] expand one row multiple times according to `column_subsets` and also keep
/// original columns of input. It can be used to implement distinct aggregation and group set.
///
/// This is the schema of `Expand`:
/// | expanded columns(i.e. some columns are set to null) | original columns of input | flag |.
///
/// Aggregates use expanded columns as their arguments and original columns for their filter. `flag`
/// is used to distinguish between different `subset`s in `column_subsets`.
#[derive(Debug, Clone)]
pub struct Expand<PlanRef> {
// `column_subsets` has many `subset`s which specifies the columns that need to be
// reserved and other columns will be filled with NULL.
pub column_subsets: Vec<Vec<usize>>,
pub input: PlanRef,
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
fn schema(&self) -> Schema {
let mut fields = self.input.schema().clone().into_fields();
fields.extend(fields.clone());
fields.push(Field::with_name(DataType::Int64, "flag"));
Schema::new(fields)
}

fn logical_pk(&self) -> Option<Vec<usize>> {
let input_schema_len = self.input.schema().len();
let mut pk_indices = self
.input
.logical_pk()
.iter()
.map(|&pk| pk + input_schema_len)
.collect_vec();
// The last column should be the flag.
pk_indices.push(input_schema_len * 2);
Some(pk_indices)
}

fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}
}

impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
pub fn column_subsets_display(&self) -> Vec<Vec<FieldDisplay<'_>>> {
self.column_subsets
.iter()
.map(|subset| {
subset
.iter()
.map(|&i| FieldDisplay(self.input.schema().fields.get(i).unwrap()))
.collect_vec()
})
.collect_vec()
}
}
43 changes: 43 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;

use super::{GenericPlanNode, GenericPlanRef};
use crate::session::OptimizerContextRef;
use crate::utils::Condition;

/// [`Filter`] iterates over its input and returns elements for which `predicate` evaluates to
/// true, filtering out the others.
///
/// If the condition allows nulls, then a null value is treated the same as false.
#[derive(Debug, Clone)]
pub struct Filter<PlanRef> {
pub predicate: Condition,
pub input: PlanRef,
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
fn schema(&self) -> Schema {
self.input.schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.input.logical_pk().to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}
}
145 changes: 145 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, IntervalUnit};

use super::super::utils::IndicesDisplay;
use super::{GenericPlanNode, GenericPlanRef};
use crate::expr::{InputRef, InputRefDisplay};
use crate::session::OptimizerContextRef;

/// [`HopWindow`] implements Hop Table Function.
#[derive(Debug, Clone)]
pub struct HopWindow<PlanRef> {
pub input: PlanRef,
pub time_col: InputRef,
pub window_slide: IntervalUnit,
pub window_size: IntervalUnit,
pub output_indices: Vec<usize>,
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
fn schema(&self) -> Schema {
let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
let original_schema: Schema = self
.input
.schema()
.clone()
.into_fields()
.into_iter()
.chain([
Field::with_name(output_type.clone(), "window_start"),
Field::with_name(output_type, "window_end"),
])
.collect();
self.output_indices
.iter()
.map(|&idx| original_schema[idx].clone())
.collect()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
let window_start_index = self
.output_indices
.iter()
.position(|&idx| idx == self.input.schema().len());
let window_end_index = self
.output_indices
.iter()
.position(|&idx| idx == self.input.schema().len() + 1);
if window_start_index.is_none() && window_end_index.is_none() {
None
} else {
let mut pk = self
.input
.logical_pk()
.iter()
.filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
.collect_vec();
if let Some(start_idx) = window_start_index {
pk.push(start_idx);
};
if let Some(end_idx) = window_end_index {
pk.push(end_idx);
};
Some(pk)
}
}

fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}
}

impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
pub fn into_parts(self) -> (PlanRef, InputRef, IntervalUnit, IntervalUnit, Vec<usize>) {
(
self.input,
self.time_col,
self.window_slide,
self.window_size,
self.output_indices,
)
}

pub fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
write!(
f,
"{} {{ time_col: {}, slide: {}, size: {}, output: {} }}",
name,
format_args!(
"{}",
InputRefDisplay {
input_ref: &self.time_col,
input_schema: self.input.schema()
}
),
self.window_slide,
self.window_size,
if self
.output_indices
.iter()
.copied()
// Behavior is the same as `LogicalHopWindow::internal_column_num`
.eq(0..(self.input.schema().len() + 2))
{
"all".to_string()
} else {
let original_schema: Schema = self
.input
.schema()
.clone()
.into_fields()
.into_iter()
.chain([
Field::with_name(output_type.clone(), "window_start"),
Field::with_name(output_type, "window_end"),
])
.collect();
format!(
"{:?}",
&IndicesDisplay {
indices: &self.output_indices,
input_schema: &original_schema,
}
)
},
)
}
}
Loading

0 comments on commit 1b4cfdb

Please sign in to comment.