Skip to content

Commit

Permalink
feat(dyn-filter): derive condition_always_relax from column monoton…
Browse files Browse the repository at this point in the history
…icity (#17704)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Jul 18, 2024
1 parent 6e2c82f commit 4321a81
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 27 deletions.
25 changes: 4 additions & 21 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use super::stream::prelude::*;
use super::utils::{
childless_record, column_names_pretty, plan_node_name, watermark_pretty, Distill,
};
use super::{generic, ExprRewritable, PlanTreeNodeUnary};
use super::{generic, ExprRewritable};
use crate::expr::Expr;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode};
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::optimizer::property::MonotonicityMap;
use crate::optimizer::PlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand All @@ -41,25 +41,8 @@ pub struct StreamDynamicFilter {

impl StreamDynamicFilter {
pub fn new(core: DynamicFilter<PlanRef>) -> Self {
// TODO(st1page): here we just check if RHS
// is a `StreamNow`. It will be generalized to more cases
// by introducing monotonically increasing property of the node in https://github.com/risingwavelabs/risingwave/pull/13984.
let right_monotonically_increasing = {
if let Some(e) = core.right().as_stream_exchange()
&& *e.distribution() == Distribution::Broadcast
{
if e.input().as_stream_now().is_some() {
true
} else if let Some(proj) = e.input().as_stream_project() {
proj.input().as_stream_now().is_some()
} else {
false
}
} else {
false
}
};
let condition_always_relax = right_monotonically_increasing
let right_non_decreasing = core.right().columns_monotonicity()[0].is_non_decreasing();
let condition_always_relax = right_non_decreasing
&& matches!(
core.comparator(),
ExprType::LessThan | ExprType::LessThanOrEqual
Expand Down
19 changes: 13 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use super::stream::prelude::*;
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap};
use crate::optimizer::property::{
Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
};
use crate::stream_fragmenter::BuildFragmentGraphState;

/// `StreamExchange` imposes a particular distribution on its input
Expand All @@ -34,17 +36,23 @@ pub struct StreamExchange {

impl StreamExchange {
pub fn new(input: PlanRef, dist: Distribution) -> Self {
// Dispatch executor won't change the append-only behavior of the stream.
let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
// If the input is a singleton, the monotonicity will be preserved during shuffle
// since we use ordered channel/buffer when exchanging data.
input.columns_monotonicity().clone()
} else {
MonotonicityMap::new()
};
let base = PlanBase::new_stream(
input.ctx(),
input.schema().clone(),
input.stream_key().map(|v| v.to_vec()),
input.functional_dependency().clone(),
dist,
input.append_only(),
input.append_only(), // append-only property won't change
input.emit_on_window_close(),
input.watermark_columns().clone(),
MonotonicityMap::new(), // we lost monotonicity information when shuffling
columns_monotonicity,
);
StreamExchange {
base,
Expand All @@ -55,14 +63,13 @@ impl StreamExchange {

pub fn new_no_shuffle(input: PlanRef) -> Self {
let ctx = input.ctx();
// Dispatch executor won't change the append-only behavior of the stream.
let base = PlanBase::new_stream(
ctx,
input.schema().clone(),
input.stream_key().map(|v| v.to_vec()),
input.functional_dependency().clone(),
input.distribution().clone(),
input.append_only(),
input.append_only(), // append-only property won't change
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
Expand Down

0 comments on commit 4321a81

Please sign in to comment.