Skip to content

Commit

Permalink
fix: only pushdown partition by cols
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 28, 2023
1 parent 9a78a14 commit 30f1fd6
Showing 1 changed file with 13 additions and 16 deletions.
29 changes: 13 additions & 16 deletions src/frontend/src/optimizer/plan_node/logical_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind};
use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder};
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalProject,
PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, StreamEowcSort,
StreamOverWindow, ToBatch, ToStream,
gen_filter_and_pushdown, BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter,
LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow,
StreamEowcSort, StreamOverWindow, ToBatch, ToStream,
};
use crate::expr::{
Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef, WindowFunction,
Expand Down Expand Up @@ -698,22 +698,19 @@ impl PredicatePushdown for LogicalOverWindow {
predicate: Condition,
ctx: &mut PredicatePushdownContext,
) -> PlanRef {
let in_schema_len = self.core.input.schema().len();
let out_schema_len = self.schema().len();
if !self.core.funcs_have_same_partition_and_order() {
// Window function calls with different PARTITION BY and ORDER BY clauses are not split yet.
return LogicalFilter::create(self.clone().into(), predicate);
}

let window_func_input_refs = self.window_functions().iter().flat_map(|func| {
func.args
.iter()
.map(|arg| arg.index)
.chain(func.order_by.iter().map(|o| o.column_index))
});
let mut over_window_related_cols: FixedBitSet = window_func_input_refs
.chain(in_schema_len..out_schema_len)
let all_out_cols: FixedBitSet = (0..self.schema().len()).collect();
let mut remain_cols: FixedBitSet = all_out_cols
.difference(&self.partition_key_indices().into_iter().collect())
.collect();
over_window_related_cols.grow(out_schema_len);
remain_cols.grow(self.schema().len());

let (window_pred, other_pred) = predicate.split_disjoint(&over_window_related_cols);
gen_filter_and_pushdown(self, window_pred, other_pred, ctx)
let (remain_pred, pushed_pred) = predicate.split_disjoint(&remain_cols);
gen_filter_and_pushdown(self, remain_pred, pushed_pred, ctx)
}
}

Expand Down

0 comments on commit 30f1fd6

Please sign in to comment.