Skip to content

Commit

Permalink
do not involve watermark in Monotonicity
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jul 9, 2024
1 parent d5ea13c commit 6b9feea
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl StreamProject {
watermark_columns.insert(expr_idx);
}
}
Inherent(Increasing) => {
Inherent(NonDecreasing) => {
nondecreasing_exprs.push(expr_idx);
watermark_columns.insert(expr_idx);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl StreamProjectSet {
watermark_columns.insert(expr_idx + 1);
}
}
Inherent(Increasing) => {
Inherent(NonDecreasing) => {
nondecreasing_exprs.push(expr_idx);
watermark_columns.insert(expr_idx + 1);
}
Expand Down
36 changes: 9 additions & 27 deletions src/frontend/src/optimizer/property/monotonicity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,22 @@ impl MonotonicityDerivation {
}
}

/// Represents the monotonicity of a column.
/// Represents the monotonicity of a column. `NULL`s are considered largest when analyzing monotonicity.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
pub enum Monotonicity {
/// The column is constant. In the future, we may derive watermark messages for this monotonicity.
Constant,
/// The column is strictly non-decreasing. For this monotonicity, a watermark definition should be automatically
/// derived. Basically it is similar to `WATERMARK FOR col AS col`, except that the watermark messages are supposed
/// to be produced by `Project/ProjectSet` instead of `WatermarkFilter`.
Increasing,
/// The column is GENERALLY non-decreasing, but NOT STRICTLY. The general-non-decreasing property is normally defined
/// by user in DDL queries, and should be enforced by `WatermarkFilter`. Watermark messages should be produced by
/// `WatermarkFilter` and forwarded to downstream with necessary transformation in other operators like `Project/ProjectSet`.
IncreasingByWatermark,
/// The column is strictly non-increasing.
Decreasing,
/// The monotonicity of the column is unknown, meaning that we have to do everything conservatively for this column.
NonDecreasing,
NonIncreasing,
Unknown,
}

impl Monotonicity {
pub fn has_watermark(self) -> bool {
matches!(
self,
Monotonicity::Increasing | Monotonicity::IncreasingByWatermark
)
}

pub fn inverse(self) -> Self {
use Monotonicity::*;
match self {
Constant => Constant,
Increasing => Decreasing,
IncreasingByWatermark => Unknown,
Decreasing => Increasing,
NonDecreasing => NonIncreasing,
NonIncreasing => NonDecreasing,
Unknown => Unknown,
}
}
Expand All @@ -101,7 +83,7 @@ impl MonotonicityAnalyzer {
// recursion base
ExprImpl::InputRef(inner) => FollowingInput(inner.index()),
ExprImpl::Literal(_) => Inherent(Constant),
ExprImpl::Now(_) => Inherent(Increasing),
ExprImpl::Now(_) => Inherent(NonDecreasing),
ExprImpl::UserDefinedFunction(_) => Inherent(Unknown),

// recursively visit children
Expand Down Expand Up @@ -166,8 +148,8 @@ impl MonotonicityAnalyzer {
ExprType::Unspecified => unreachable!(),
ExprType::Add => match self.visit_binary_op(func_call.inputs()) {
(Inherent(Constant), any) | (any, Inherent(Constant)) => any,
(Inherent(Increasing), Inherent(Increasing)) => Inherent(Increasing),
(Inherent(Decreasing), Inherent(Decreasing)) => Inherent(Decreasing),
(Inherent(NonDecreasing), Inherent(NonDecreasing)) => Inherent(NonDecreasing),
(Inherent(NonIncreasing), Inherent(NonIncreasing)) => Inherent(NonIncreasing),
_ => Inherent(Unknown),
},
ExprType::Subtract => match self.visit_binary_op(func_call.inputs()) {
Expand Down Expand Up @@ -278,7 +260,7 @@ impl MonotonicityAnalyzer {
// TODO: do we need derive watermark when every case can derive a common watermark?
Inherent(Unknown)
}
ExprType::Proctime => Inherent(Increasing),
ExprType::Proctime => Inherent(NonDecreasing),
_ => Inherent(Unknown),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Rule for FilterWithNowToJoinRule {
if let Some((input_expr, cmp, now_expr)) = expr.as_now_comparison_cond() {
// ensure that this expression is increasing
use monotonicity_variants::*;
if matches!(analyze_monotonicity(&now_expr), Inherent(Increasing)) {
if matches!(analyze_monotonicity(&now_expr), Inherent(NonDecreasing)) {
now_filters.push(
FunctionCall::new(cmp, vec![input_expr, now_expr])
.unwrap()
Expand Down

0 comments on commit 6b9feea

Please sign in to comment.