diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index fc70c5d0f6ea9..43d3b1c886805 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -23,9 +23,9 @@ use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind}; use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder}; use super::utils::impl_distill_by_unit; use super::{ - gen_filter_and_pushdown, BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalProject, - PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, StreamEowcSort, - StreamOverWindow, ToBatch, ToStream, + gen_filter_and_pushdown, BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter, + LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, + StreamEowcSort, StreamOverWindow, ToBatch, ToStream, }; use crate::expr::{ Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef, WindowFunction, @@ -698,22 +698,19 @@ impl PredicatePushdown for LogicalOverWindow { predicate: Condition, ctx: &mut PredicatePushdownContext, ) -> PlanRef { - let in_schema_len = self.core.input.schema().len(); - let out_schema_len = self.schema().len(); + if !self.core.funcs_have_same_partition_and_order() { + // Window function calls with different PARTITION BY and ORDER BY clauses are not split yet. + return LogicalFilter::create(self.clone().into(), predicate); + } - let window_func_input_refs = self.window_functions().iter().flat_map(|func| { - func.args - .iter() - .map(|arg| arg.index) - .chain(func.order_by.iter().map(|o| o.column_index)) - }); - let mut over_window_related_cols: FixedBitSet = window_func_input_refs - .chain(in_schema_len..out_schema_len) + let all_out_cols: FixedBitSet = (0..self.schema().len()).collect(); + let mut remain_cols: FixedBitSet = all_out_cols + .difference(&self.partition_key_indices().into_iter().collect()) .collect(); - over_window_related_cols.grow(out_schema_len); + remain_cols.grow(self.schema().len()); - let (window_pred, other_pred) = predicate.split_disjoint(&over_window_related_cols); - gen_filter_and_pushdown(self, window_pred, other_pred, ctx) + let (remain_pred, pushed_pred) = predicate.split_disjoint(&remain_cols); + gen_filter_and_pushdown(self, remain_pred, pushed_pred, ctx) } }