Skip to content

Commit

Permalink
fix(optimizer): do not push temporal filter predicate passing the tem…
Browse files Browse the repository at this point in the history
…poral join (#13549) (#13813)

Co-authored-by: stonepage <[email protected]>
  • Loading branch information
github-actions[bot] and st1page authored Dec 5, 2023
1 parent 8a34407 commit 425c06f
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
create table t1 (ts timestamp with time zone);
select * from t1 where now() - interval '15 minutes' > ts;
expected_outputs:
- stream_error
- stream_error
- name: Temporal filter reorders now expressions correctly
sql: |
create table t1 (ts timestamp with time zone);
Expand Down Expand Up @@ -71,3 +71,10 @@
select * from t1 right join t2 on a = b AND tb < now() - interval '1 hour' and tb >= now() - interval '2 hour';
expected_outputs:
- stream_error
- name: Temporal filter after temporal join
sql: |
create table stream(id1 int, a1 int, b1 int, v1 timestamp with time zone) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where v1 > now();
expected_outputs:
- stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@
├─LogicalScan { table: t1, columns: [t1.v1] }
└─LogicalProject { exprs: [t2.v2, t2.v3, ('2021-04-01 00:00:00+00:00':Timestamptz + t2.v3) as $expr1] }
└─LogicalScan { table: t2, columns: [t2.v2, t2.v3] }
stream_error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`.
stream_error: |-
Not supported: optimizer has tried to separate the temporal predicate(with now() expression) from the on condition, but it still reminded in on join's condition. Considering move it into WHERE clause?
HINT: please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: now() in complex cmp expr pushed onto join ON clause results in dynamic filter
sql: |
create table t1(v1 timestamp with time zone);
Expand All @@ -291,7 +293,9 @@
├─LogicalScan { table: t1, columns: [t1.v1] }
└─LogicalProject { exprs: [t2.v2, t2.v3, ('2021-04-01 00:00:00+00:00':Timestamptz + t2.v3) as $expr1] }
└─LogicalScan { table: t2, columns: [t2.v2, t2.v3] }
stream_error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`.
stream_error: |-
Not supported: optimizer has tried to separate the temporal predicate(with now() expression) from the on condition, but it still reminded in on join's condition. Considering move it into WHERE clause?
HINT: please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: now() does not get pushed to scan, but others do
sql: |
create table t1(v1 timestamp with time zone, v2 int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,19 @@
stream_error: |-
Not supported: optimizer has tried to separate the temporal predicate(with now() expression) from the on condition, but it still reminded in on join's condition. Considering move it into WHERE clause?
HINT: please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: Temporal filter after temporal join
sql: |
create table stream(id1 int, a1 int, b1 int, v1 timestamp with time zone) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where v1 > now();
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, v1, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamProject { exprs: [stream.id1, stream.a1, version.id2, stream.v1, stream._row_id], output_watermarks: [stream.v1] }
└─StreamDynamicFilter { predicate: (stream.v1 > now), output_watermarks: [stream.v1], output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id], cleaned_by_watermark: true }
├─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] }
│ ├─StreamExchange { dist: HashShard(stream.id1) }
│ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.v1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
│ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) }
│ └─StreamTableScan { table: version, columns: [version.id2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
40 changes: 29 additions & 11 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,26 +480,32 @@ pub fn push_down_into_join(
left_col_num: usize,
right_col_num: usize,
ty: JoinType,
push_temporal_predicate: bool,
) -> (Condition, Condition, Condition) {
let (left, right) = push_down_to_inputs(
predicate,
left_col_num,
right_col_num,
can_push_left_from_filter(ty),
can_push_right_from_filter(ty),
push_temporal_predicate,
);

let on = if can_push_on_from_filter(ty) {
let mut conjunctions = std::mem::take(&mut predicate.conjunctions);

// Do not push now on to the on, it will be pulled up into a filter instead.
let on = Condition {
conjunctions: conjunctions
.extract_if(|expr| expr.count_nows() == 0)
.collect(),
};
predicate.conjunctions = conjunctions;
on
if push_temporal_predicate {
Condition { conjunctions }
} else {
// Do not push now on to the on, it will be pulled up into a filter instead.
let on = Condition {
conjunctions: conjunctions
.extract_if(|expr| expr.count_nows() == 0)
.collect(),
};
predicate.conjunctions = conjunctions;
on
}
} else {
Condition::true_cond()
};
Expand All @@ -516,13 +522,15 @@ pub fn push_down_join_condition(
left_col_num: usize,
right_col_num: usize,
ty: JoinType,
push_temporal_predicate: bool,
) -> (Condition, Condition) {
push_down_to_inputs(
on_condition,
left_col_num,
right_col_num,
can_push_left_from_on(ty),
can_push_right_from_on(ty),
push_temporal_predicate,
)
}

Expand All @@ -536,11 +544,21 @@ fn push_down_to_inputs(
right_col_num: usize,
push_left: bool,
push_right: bool,
push_temporal_predicate: bool,
) -> (Condition, Condition) {
let conjunctions = std::mem::take(&mut predicate.conjunctions);
let mut conjunctions = std::mem::take(&mut predicate.conjunctions);
let (mut left, right, mut others) = if push_temporal_predicate {
Condition { conjunctions }.split(left_col_num, right_col_num)
} else {
let temporal_filter_cons = conjunctions
.extract_if(|e| e.count_nows() != 0)
.collect_vec();
let (left, right, mut others) =
Condition { conjunctions }.split(left_col_num, right_col_num);

let (mut left, right, mut others) =
Condition { conjunctions }.split(left_col_num, right_col_num);
others.conjunctions.extend(temporal_filter_cons);
(left, right, others)
};

if !push_left {
others.conjunctions.extend(left);
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,11 @@ impl PredicatePushdown for LogicalApply {
let join_type = self.join_type();

let (left_from_filter, right_from_filter, on) =
push_down_into_join(&mut predicate, left_col_num, right_col_num, join_type);
push_down_into_join(&mut predicate, left_col_num, right_col_num, join_type, true);

let mut new_on = self.on.clone().and(on);
let (left_from_on, right_from_on) =
push_down_join_condition(&mut new_on, left_col_num, right_col_num, join_type);
push_down_join_condition(&mut new_on, left_col_num, right_col_num, join_type, true);

let left_predicate = left_from_filter.and(left_from_on);
let right_predicate = right_from_filter.and(right_from_on);
Expand Down
70 changes: 16 additions & 54 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,56 +188,6 @@ impl LogicalJoin {
self.output_indices() == &(0..self.internal_column_num()).collect_vec()
}

/// Try to split and pushdown `predicate` into a join's left/right child or the on clause.
/// Returns the pushed predicates. The pushed part will be removed from the original predicate.
///
/// `InputRef`s in the right `Condition` are shifted by `-left_col_num`.
pub fn push_down(
predicate: &mut Condition,
left_col_num: usize,
right_col_num: usize,
push_left: bool,
push_right: bool,
push_on: bool,
) -> (Condition, Condition, Condition) {
let conjunctions = std::mem::take(&mut predicate.conjunctions);

let (mut left, right, mut others) =
Condition { conjunctions }.split(left_col_num, right_col_num);

if !push_left {
others.conjunctions.extend(left);
left = Condition::true_cond();
};

let right = if push_right {
let mut mapping = ColIndexMapping::with_shift_offset(
left_col_num + right_col_num,
-(left_col_num as isize),
);
right.rewrite_expr(&mut mapping)
} else {
others.conjunctions.extend(right);
Condition::true_cond()
};

let on = if push_on {
// Do not push now on to the on, it will be pulled up into a filter instead.
Condition {
conjunctions: others
.conjunctions
.extract_if(|expr| expr.count_nows() == 0)
.collect(),
}
} else {
Condition::true_cond()
};

predicate.conjunctions = others.conjunctions;

(left, right, on)
}

/// Try to simplify the outer join with the predicate on the top of the join
///
/// now it is just a naive implementation for comparison expression, we can give a more general
Expand Down Expand Up @@ -793,12 +743,24 @@ impl PredicatePushdown for LogicalJoin {
let right_col_num = self.right().schema().len();
let join_type = LogicalJoin::simplify_outer(&predicate, left_col_num, self.join_type());

let (left_from_filter, right_from_filter, on) =
push_down_into_join(&mut predicate, left_col_num, right_col_num, join_type);
let push_down_temporal_predicate = !self.should_be_temporal_join();

let (left_from_filter, right_from_filter, on) = push_down_into_join(
&mut predicate,
left_col_num,
right_col_num,
join_type,
push_down_temporal_predicate,
);

let mut new_on = self.on().clone().and(on);
let (left_from_on, right_from_on) =
push_down_join_condition(&mut new_on, left_col_num, right_col_num, join_type);
let (left_from_on, right_from_on) = push_down_join_condition(
&mut new_on,
left_col_num,
right_col_num,
join_type,
push_down_temporal_predicate,
);

let left_predicate = left_from_filter.and(left_from_on);
let right_predicate = right_from_filter.and(right_from_on);
Expand Down

0 comments on commit 425c06f

Please sign in to comment.