diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index d720e961607e..fa03bb9f1912 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -274,44 +274,43 @@ impl ScalarExpr { } .fail() }; - if let Self::CallBinary { + + let Self::CallBinary { mut func, mut expr1, mut expr2, } = self.clone() - { - // TODO: support simple transform like `now() + a < b` to `now() < b - a` + else { + return unsupported_err("Not a binary expression"); + }; - let expr1_is_now = - *expr1 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); - let expr2_is_now = - *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); + // TODO: support simple transform like `now() + a < b` to `now() < b - a` - if !(expr1_is_now ^ expr2_is_now) { - return unsupported_err("None of the sides of the comparison is `now()`"); - } + let expr1_is_now = *expr1 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); + let expr2_is_now = *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); - if expr2_is_now { - std::mem::swap(&mut expr1, &mut expr2); - func = BinaryFunc::reverse_compare(&func)?; - } + if !(expr1_is_now ^ expr2_is_now) { + return unsupported_err("None of the sides of the comparison is `now()`"); + } - let step = |expr: ScalarExpr| expr.call_unary(UnaryFunc::StepTimestamp); - match func { - // now == expr2 -> now <= expr2 && now < expr2 + 1 - BinaryFunc::Eq => Ok((Some(*expr2.clone()), Some(step(*expr2)))), - // now < expr2 -> now < expr2 - BinaryFunc::Lt => Ok((None, Some(*expr2))), - // now <= expr2 -> now < expr2 + 1 - BinaryFunc::Lte => Ok((None, Some(step(*expr2)))), - // now > expr2 -> now >= expr2 + 1 - BinaryFunc::Gt => Ok((Some(step(*expr2)), None)), - // now >= expr2 -> now >= expr2 - BinaryFunc::Gte => Ok((Some(*expr2), None)), - _ => unreachable!("Already checked"), - } - } else { - unsupported_err("None of the sides of the comparison is `now()`") + if expr2_is_now { + std::mem::swap(&mut expr1, &mut expr2); + func = BinaryFunc::reverse_compare(&func)?; + } + + let step = |expr: ScalarExpr| expr.call_unary(UnaryFunc::StepTimestamp); + match func { + // now == expr2 -> now <= expr2 && now < expr2 + 1 + BinaryFunc::Eq => Ok((Some(*expr2.clone()), Some(step(*expr2)))), + // now < expr2 -> now < expr2 + BinaryFunc::Lt => Ok((None, Some(*expr2))), + // now <= expr2 -> now < expr2 + 1 + BinaryFunc::Lte => Ok((None, Some(step(*expr2)))), + // now > expr2 -> now >= expr2 + 1 + BinaryFunc::Gt => Ok((Some(step(*expr2)), None)), + // now >= expr2 -> now >= expr2 + BinaryFunc::Gte => Ok((Some(*expr2), None)), + _ => unreachable!("Already checked"), } } }