Skip to content

Commit

Permalink
fix(optimizer): forbid correlated input ref predicate push down throu…
Browse files Browse the repository at this point in the history
…gh share (#13569)

Co-authored-by: stonepage <[email protected]>
  • Loading branch information
chenzl25 and st1page authored Nov 22, 2023
1 parent 9e1a147 commit 545b89d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@
create table t2 (v2 int, k2 int);
select * from t1 where v1 in ( select v2 from t2 where k2 = k1 limit 1);
expected_outputs:
- batch_plan
- stream_plan

- batch_plan
- stream_plan
- name: test correlated input ref predicate and share operator
sql: |
create table t (a int, b int, c int);
create table dl(c1 int, c2 int);
create table di(d1 int, d2 int);
select (select 1 from t, di where t.a = dl.c1 and t.b = di.d1 limit 1) name, (select 1 from t, di where t.a = dl.c2 and t.c = di.d2 limit 1) name2 from dl;
expected_outputs:
- optimized_logical_plan_for_stream
Original file line number Diff line number Diff line change
Expand Up @@ -947,3 +947,27 @@
└─StreamProject { exprs: [t2.k2, t2.v2, t2._row_id] }
└─StreamFilter { predicate: IsNotNull(t2.k2) }
└─StreamTableScan { table: t2, columns: [t2.v2, t2.k2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: test correlated input ref predicate and share operator
sql: |
create table t (a int, b int, c int);
create table dl(c1 int, c2 int);
create table di(d1 int, d2 int);
select (select 1 from t, di where t.a = dl.c1 and t.b = di.d1 limit 1) name, (select 1 from t, di where t.a = dl.c2 and t.c = di.d2 limit 1) name2 from dl;
optimized_logical_plan_for_stream: |-
LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(dl.c2, t.a), output: [1:Int32, 1:Int32] }
├─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(dl.c1, t.a), output: [dl.c2, 1:Int32] }
│ ├─LogicalScan { table: dl, columns: [dl.c1, dl.c2] }
│ └─LogicalTopN { order: [t.a ASC], limit: 1, offset: 0, group_key: [t.a] }
│ └─LogicalProject { exprs: [t.a, 1:Int32] }
│ └─LogicalFilter { predicate: (t.b = di.d1) AND IsNotNull(t.a) }
│ └─LogicalShare { id: 2 }
│ └─LogicalJoin { type: Inner, on: ((t.b = di.d1) OR (t.c = di.d2)), output: all }
│ ├─LogicalScan { table: t, columns: [t.a, t.b, t.c], predicate: IsNotNull(t.a) }
│ └─LogicalScan { table: di, columns: [di.d1, di.d2] }
└─LogicalTopN { order: [t.a ASC], limit: 1, offset: 0, group_key: [t.a] }
└─LogicalProject { exprs: [t.a, 1:Int32] }
└─LogicalFilter { predicate: (t.c = di.d2) AND IsNotNull(t.a) }
└─LogicalShare { id: 2 }
└─LogicalJoin { type: Inner, on: ((t.b = di.d1) OR (t.c = di.d2)), output: all }
├─LogicalScan { table: t, columns: [t.a, t.b, t.c], predicate: IsNotNull(t.a) }
└─LogicalScan { table: di, columns: [di.d1, di.d2] }
13 changes: 11 additions & 2 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,15 @@ impl PlanRef {
.map(|mut c| Condition {
conjunctions: c
.conjunctions
.extract_if(|e| e.count_nows() == 0 && e.is_pure())
.extract_if(|e| {
// If predicates contain now, impure or correlated input ref, don't push through share operator.
// The predicate with now() function is regarded as a temporal filter predicate, which will be transformed to a temporal filter operator and can not do the OR operation with other predicates.
let mut finder = ExprCorrelatedIdFinder::default();
finder.visit_expr(e);
e.count_nows() == 0
&& e.is_pure()
&& !finder.has_correlated_input_ref()
})
.collect(),
})
.reduce(|a, b| a.or(b))
Expand Down Expand Up @@ -923,9 +931,10 @@ pub use stream_union::StreamUnion;
pub use stream_values::StreamValues;
pub use stream_watermark_filter::StreamWatermarkFilter;

use crate::expr::{ExprImpl, ExprRewriter, InputRef, Literal};
use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_rewriter::PlanCloner;
use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};

Expand Down

0 comments on commit 545b89d

Please sign in to comment.