Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): add columns_monotonicity field for PlanNode #17600

Merged
merged 6 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::catalog::ColumnId;
use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::optimizer::property::{FunctionalDependencySet, MonotonicityMap};
use crate::WithOptions;

/// [`CdcScan`] reads rows of a table from an external upstream database
Expand Down Expand Up @@ -125,6 +125,10 @@ impl CdcScan {
FixedBitSet::with_capacity(self.get_table_columns().len())
}

pub fn columns_monotonicity(&self) -> MonotonicityMap {
MonotonicityMap::new()
}

pub(crate) fn column_names_with_table_prefix(&self) -> Vec<String> {
self.output_col_idx
.iter()
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::optimizer::plan_node::{
ToStreamContext,
};
use crate::optimizer::property::Distribution::HashShard;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::optimizer::property::{Distribution, MonotonicityMap, Order, RequiredDist};
use crate::utils::{ColIndexMapping, Condition, IndexRewriter};

/// `LogicalSource` returns contents of a table or other equivalent object
Expand Down Expand Up @@ -229,6 +229,7 @@ impl LogicalSource {
true, // `list` will keep listing all objects, it must be append-only
false,
FixedBitSet::with_capacity(logical_source.column_catalog.len()),
MonotonicityMap::new(),
),
core: logical_source,
}
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use self::batch::BatchPlanRef;
use self::generic::{GenericPlanRef, PhysicalPlanRef};
use self::stream::StreamPlanRef;
use self::utils::Distill;
use super::property::{Distribution, FunctionalDependencySet, Order};
use super::property::{Distribution, FunctionalDependencySet, MonotonicityMap, Order};
use crate::error::{ErrorCode, Result};
use crate::optimizer::ExpressionSimplifyRewriter;
use crate::session::current::notice_to_user;
Expand Down Expand Up @@ -609,6 +609,10 @@ impl StreamPlanRef for PlanRef {
fn watermark_columns(&self) -> &FixedBitSet {
self.plan_base().watermark_columns()
}

fn columns_monotonicity(&self) -> &MonotonicityMap {
self.plan_base().columns_monotonicity()
}
}

/// Allow access to all fields defined in [`BatchPlanRef`] for the type-erased plan node.
Expand Down
18 changes: 18 additions & 0 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct StreamExtra {
/// The watermark column indices of the `PlanNode`'s output. There could be watermark output from
/// this stream operator.
watermark_columns: FixedBitSet,
/// The monotonicity of columns in the output.
columns_monotonicity: MonotonicityMap,
}

impl GetPhysicalCommon for StreamExtra {
Expand Down Expand Up @@ -168,6 +170,10 @@ impl stream::StreamPlanRef for PlanBase<Stream> {
fn watermark_columns(&self) -> &FixedBitSet {
&self.extra.watermark_columns
}

fn columns_monotonicity(&self) -> &MonotonicityMap {
&self.extra.columns_monotonicity
}
}

impl batch::BatchPlanRef for PlanBase<Batch> {
Expand Down Expand Up @@ -222,6 +228,7 @@ impl PlanBase<Stream> {
append_only: bool,
emit_on_window_close: bool,
watermark_columns: FixedBitSet,
columns_monotonicity: MonotonicityMap,
) -> Self {
let id = ctx.next_plan_node_id();
assert_eq!(watermark_columns.len(), schema.len());
Expand All @@ -236,6 +243,7 @@ impl PlanBase<Stream> {
append_only,
emit_on_window_close,
watermark_columns,
columns_monotonicity,
},
}
}
Expand All @@ -246,6 +254,7 @@ impl PlanBase<Stream> {
append_only: bool,
emit_on_window_close: bool,
watermark_columns: FixedBitSet,
columns_monotonicity: MonotonicityMap,
) -> Self {
Self::new_stream(
core.ctx(),
Expand All @@ -256,6 +265,7 @@ impl PlanBase<Stream> {
append_only,
emit_on_window_close,
watermark_columns,
columns_monotonicity,
)
}
}
Expand Down Expand Up @@ -383,6 +393,10 @@ impl<'a> PlanBaseRef<'a> {
dispatch_plan_base!(self, [Stream], StreamPlanRef::watermark_columns)
}

pub(super) fn columns_monotonicity(self) -> &'a MonotonicityMap {
dispatch_plan_base!(self, [Stream], StreamPlanRef::columns_monotonicity)
}

pub(super) fn order(self) -> &'a Order {
dispatch_plan_base!(self, [Batch], BatchPlanRef::order)
}
Expand Down Expand Up @@ -428,6 +442,10 @@ impl StreamPlanRef for PlanBaseRef<'_> {
fn watermark_columns(&self) -> &FixedBitSet {
(*self).watermark_columns()
}

fn columns_monotonicity(&self) -> &MonotonicityMap {
(*self).columns_monotonicity()
}
}

impl BatchPlanRef for PlanBaseRef<'_> {
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use fixedbitset::FixedBitSet;

use super::generic::PhysicalPlanRef;
use crate::optimizer::property::MonotonicityMap;

/// A subtrait of [`PhysicalPlanRef`] for stream plans.
///
Expand All @@ -29,6 +30,7 @@ pub trait StreamPlanRef: PhysicalPlanRef {
fn append_only(&self) -> bool;
fn emit_on_window_close(&self) -> bool;
fn watermark_columns(&self) -> &FixedBitSet;
fn columns_monotonicity(&self) -> &MonotonicityMap;
}

/// Prelude for stream plan nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl StreamCdcTableScan {
core.append_only(),
false,
core.watermark_columns(),
core.columns_monotonicity(),
);
Self { base, core }
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::stream::prelude::PhysicalPlanRef;
use super::stream::StreamPlanRef;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode};
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

Expand Down Expand Up @@ -48,6 +49,7 @@ impl StreamChangeLog {
true,
input.emit_on_window_close(),
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);
StreamChangeLog { base, core }
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl StreamDedup {
true,
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
StreamDedup { base, core }
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::expr::{Expr, ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;
Expand Down Expand Up @@ -76,6 +76,7 @@ impl StreamDeltaJoin {
append_only,
false, // TODO(rc): derive EOWC property from input
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);

Self {
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::stream::prelude::*;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -41,6 +42,7 @@ impl StreamDml {
append_only,
false, // TODO(rc): decide EOWC property
FixedBitSet::with_capacity(input.schema().len()), // no watermark if dml is allowed
MonotonicityMap::new(), // TODO: derive monotonicity
);

Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::{generic, ExprRewritable, PlanTreeNodeUnary};
use crate::expr::Expr;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::optimizer::PlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand Down Expand Up @@ -77,6 +77,7 @@ impl StreamDynamicFilter {
out_append_only,
false, // TODO(rc): decide EOWC property
Self::derive_watermark_columns(&core),
MonotonicityMap::new(), // TODO: derive monotonicity
);
let cleaned_by_watermark = Self::cleaned_by_watermark(&core);
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::stream::prelude::*;
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::TableCatalog;

Expand Down Expand Up @@ -58,6 +59,8 @@ impl StreamEowcOverWindow {
true,
true,
watermark_columns,
// we cannot derive monotonicity for any column for the same reason as watermark columns
MonotonicityMap::new(),
);
StreamEowcOverWindow { base, core }
}
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::stream::prelude::*;
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;

/// `StreamExchange` imposes a particular distribution on its input
Expand All @@ -44,6 +44,7 @@ impl StreamExchange {
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
MonotonicityMap::new(), // we lost monotonicity information when shuffling
);
StreamExchange {
base,
Expand All @@ -64,6 +65,7 @@ impl StreamExchange {
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
StreamExchange {
base,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::stream::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -52,6 +52,7 @@ impl StreamExpand {
input.append_only(),
input.emit_on_window_close(),
watermark_columns,
MonotonicityMap::new(),
);
StreamExpand { base, core }
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl StreamFilter {
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
StreamFilter { base, core }
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::catalog::source_catalog::SourceCatalog;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -55,6 +55,7 @@ impl StreamFsFetch {
source.catalog.as_ref().map_or(true, |s| s.append_only),
false,
FixedBitSet::with_capacity(source.column_catalog.len()),
MonotonicityMap::new(), // TODO: derive monotonicity
);

Self {
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::utils::{plan_node_name, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::property::Order;
use crate::optimizer::property::{MonotonicityMap, Order};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

Expand Down Expand Up @@ -79,6 +79,7 @@ impl StreamGroupTopN {
// TODO: https://github.com/risingwavelabs/risingwave/issues/8348
false,
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);
StreamGroupTopN {
base,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::error::{ErrorCode, Result};
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};

Expand Down Expand Up @@ -93,6 +94,7 @@ impl StreamHashAgg {
emit_on_window_close, // in EOWC mode, we produce append only output
emit_on_window_close,
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);
StreamHashAgg {
base,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::expr::{Expr, ExprDisplay, ExprRewriter, ExprVisitor, InequalityInputP
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::IndicesDisplay;
use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay};
use crate::optimizer::property::Distribution;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;

Expand Down Expand Up @@ -196,6 +196,7 @@ impl StreamHashJoin {
append_only,
false, // TODO(rc): derive EOWC property from input
watermark_columns,
MonotonicityMap::new(), // TODO: derive monotonicity
);

Self {
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;

Expand Down Expand Up @@ -62,6 +63,7 @@ impl StreamHopWindow {
input.append_only(),
input.emit_on_window_close(),
internal2output.rewrite_bitset(&watermark_columns),
MonotonicityMap::new(), /* hop window start/end jumps, so monotonicity is not propagated */
);
Self {
base,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl StreamMaterialize {
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
Self { base, input, table }
}
Expand Down
Loading
Loading