diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index f32bd63753d2..1d01650d68a0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -23,11 +23,11 @@ use super::stream::prelude::*; use super::utils::{ childless_record, column_names_pretty, plan_node_name, watermark_pretty, Distill, }; -use super::{generic, ExprRewritable, PlanTreeNodeUnary}; +use super::{generic, ExprRewritable}; use crate::expr::Expr; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode}; -use crate::optimizer::property::{Distribution, MonotonicityMap}; +use crate::optimizer::property::MonotonicityMap; use crate::optimizer::PlanRef; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -41,25 +41,8 @@ pub struct StreamDynamicFilter { impl StreamDynamicFilter { pub fn new(core: DynamicFilter) -> Self { - // TODO(st1page): here we just check if RHS - // is a `StreamNow`. It will be generalized to more cases - // by introducing monotonically increasing property of the node in https://github.com/risingwavelabs/risingwave/pull/13984. - let right_monotonically_increasing = { - if let Some(e) = core.right().as_stream_exchange() - && *e.distribution() == Distribution::Broadcast - { - if e.input().as_stream_now().is_some() { - true - } else if let Some(proj) = e.input().as_stream_project() { - proj.input().as_stream_now().is_some() - } else { - false - } - } else { - false - } - }; - let condition_always_relax = right_monotonically_increasing + let right_non_decreasing = core.right().columns_monotonicity()[0].is_non_decreasing(); + let condition_always_relax = right_non_decreasing && matches!( core.comparator(), ExprType::LessThan | ExprType::LessThanOrEqual diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 802f2e3d227c..878e34d577b3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -20,7 +20,9 @@ use super::stream::prelude::*; use super::utils::{childless_record, plan_node_name, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap}; +use crate::optimizer::property::{ + Distribution, DistributionDisplay, MonotonicityMap, RequiredDist, +}; use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamExchange` imposes a particular distribution on its input @@ -34,17 +36,23 @@ pub struct StreamExchange { impl StreamExchange { pub fn new(input: PlanRef, dist: Distribution) -> Self { - // Dispatch executor won't change the append-only behavior of the stream. + let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) { + // If the input is a singleton, the monotonicity will be preserved during shuffle + // since we use ordered channel/buffer when exchanging data. + input.columns_monotonicity().clone() + } else { + MonotonicityMap::new() + }; let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), input.stream_key().map(|v| v.to_vec()), input.functional_dependency().clone(), dist, - input.append_only(), + input.append_only(), // append-only property won't change input.emit_on_window_close(), input.watermark_columns().clone(), - MonotonicityMap::new(), // we lost monotonicity information when shuffling + columns_monotonicity, ); StreamExchange { base, @@ -55,14 +63,13 @@ impl StreamExchange { pub fn new_no_shuffle(input: PlanRef) -> Self { let ctx = input.ctx(); - // Dispatch executor won't change the append-only behavior of the stream. let base = PlanBase::new_stream( ctx, input.schema().clone(), input.stream_key().map(|v| v.to_vec()), input.functional_dependency().clone(), input.distribution().clone(), - input.append_only(), + input.append_only(), // append-only property won't change input.emit_on_window_close(), input.watermark_columns().clone(), input.columns_monotonicity().clone(),