diff --git a/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml index c51307cd2468f..6f16abeb813a3 100644 --- a/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml @@ -438,6 +438,13 @@ create table t2 (v2 int, k2 int); select * from t1 where v1 in ( select v2 from t2 where k2 = k1 limit 1); expected_outputs: - - batch_plan - - stream_plan - + - batch_plan + - stream_plan +- name: test correlated input ref predicate and share operator + sql: | + create table t (a int, b int, c int); + create table dl(c1 int, c2 int); + create table di(d1 int, d2 int); + select (select 1 from t, di where t.a = dl.c1 and t.b = di.d1 limit 1) name, (select 1 from t, di where t.a = dl.c2 and t.c = di.d2 limit 1) name2 from dl; + expected_outputs: + - optimized_logical_plan_for_stream diff --git a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml index 7aa042f7a119e..f21af89d1c6db 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml @@ -947,3 +947,27 @@ └─StreamProject { exprs: [t2.k2, t2.v2, t2._row_id] } └─StreamFilter { predicate: IsNotNull(t2.k2) } └─StreamTableScan { table: t2, columns: [t2.v2, t2.k2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } +- name: test correlated input ref predicate and share operator + sql: | + create table t (a int, b int, c int); + create table dl(c1 int, c2 int); + create table di(d1 int, d2 int); + select (select 1 from t, di where t.a = dl.c1 and t.b = di.d1 limit 1) name, (select 1 from t, di where t.a = dl.c2 and t.c = di.d2 limit 1) name2 from dl; + optimized_logical_plan_for_stream: |- + LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(dl.c2, t.a), output: [1:Int32, 1:Int32] } + ├─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(dl.c1, t.a), output: [dl.c2, 1:Int32] } + │ ├─LogicalScan { table: dl, columns: [dl.c1, dl.c2] } + │ └─LogicalTopN { order: [t.a ASC], limit: 1, offset: 0, group_key: [t.a] } + │ └─LogicalProject { exprs: [t.a, 1:Int32] } + │ └─LogicalFilter { predicate: (t.b = di.d1) AND IsNotNull(t.a) } + │ └─LogicalShare { id: 2 } + │ └─LogicalJoin { type: Inner, on: ((t.b = di.d1) OR (t.c = di.d2)), output: all } + │ ├─LogicalScan { table: t, columns: [t.a, t.b, t.c], predicate: IsNotNull(t.a) } + │ └─LogicalScan { table: di, columns: [di.d1, di.d2] } + └─LogicalTopN { order: [t.a ASC], limit: 1, offset: 0, group_key: [t.a] } + └─LogicalProject { exprs: [t.a, 1:Int32] } + └─LogicalFilter { predicate: (t.c = di.d2) AND IsNotNull(t.a) } + └─LogicalShare { id: 2 } + └─LogicalJoin { type: Inner, on: ((t.b = di.d1) OR (t.c = di.d2)), output: all } + ├─LogicalScan { table: t, columns: [t.a, t.b, t.c], predicate: IsNotNull(t.a) } + └─LogicalScan { table: di, columns: [di.d1, di.d2] } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index a0be5132c92ac..949e2fd2210d9 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -423,7 +423,15 @@ impl PlanRef { .map(|mut c| Condition { conjunctions: c .conjunctions - .extract_if(|e| e.count_nows() == 0 && e.is_pure()) + .extract_if(|e| { + // If predicates contain now, impure or correlated input ref, don't push through share operator. + // The predicate with now() function is regarded as a temporal filter predicate, which will be transformed to a temporal filter operator and can not do the OR operation with other predicates. + let mut finder = ExprCorrelatedIdFinder::default(); + finder.visit_expr(e); + e.count_nows() == 0 + && e.is_pure() + && !finder.has_correlated_input_ref() + }) .collect(), }) .reduce(|a, b| a.or(b)) @@ -923,9 +931,10 @@ pub use stream_union::StreamUnion; pub use stream_values::StreamValues; pub use stream_watermark_filter::StreamWatermarkFilter; -use crate::expr::{ExprImpl, ExprRewriter, InputRef, Literal}; +use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef, Literal}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_rewriter::PlanCloner; +use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit};