diff --git a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml index 8b9126f18d641..acc07a17d3555 100644 --- a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml @@ -134,3 +134,12 @@ )d on true; expected_outputs: - batch_plan +- name: lateral join with CTE + sql: | + create table t1(x int, y int); + create table t2(x int, y int); + select * from t1, lateral ( + with cte as (select * from t2 where t2.y = t1.y) select x from cte + ); + expected_outputs: + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml index 7c87713e85c20..7194a2806a2e8 100644 --- a/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml @@ -584,3 +584,11 @@ from rawdata expected_outputs: - optimized_logical_plan_for_batch +- name: subquery with CTE + sql: | + create table t1(x int, y int); + create table t2(x int, y int); + select * from t1 where t1.x = ( with cte as (select * from t2 where t2.y = t1.y) select x from cte limit 1); + expected_outputs: + - batch_plan + - stream_plan \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml index 815890d6a73b8..2e3046c7a2e4b 100644 --- a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml @@ -270,3 +270,18 @@ │ └─BatchValues { rows: [[1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz]] } └─BatchExchange { order: [], dist: HashShard(r.src_id) } └─BatchScan { table: r, columns: [r.ts, r.src_id, r.dev_id], distribution: SomeShard } +- name: lateral join with CTE + sql: | + create table t1(x int, y int); + create table t2(x int, y int); + select * from t1, lateral ( + with cte as (select * from t2 where t2.y = t1.y) select x from cte + ); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: Inner, predicate: t1.y IS NOT DISTINCT FROM t2.y, output: [t1.x, t1.y, t2.x] } + ├─BatchExchange { order: [], dist: HashShard(t1.y) } + │ └─BatchScan { table: t1, columns: [t1.x, t1.y], distribution: SomeShard } + └─BatchExchange { order: [], dist: HashShard(t2.y) } + └─BatchFilter { predicate: IsNotNull(t2.y) } + └─BatchScan { table: t2, columns: [t2.x, t2.y], distribution: SomeShard } diff --git a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml index 07399b433c0f6..13d363a6e4877 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml @@ -2193,3 +2193,29 @@ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } +- name: subquery with CTE + sql: | + create table t1(x int, y int); + create table t2(x int, y int); + select * from t1 where t1.x = ( with cte as (select * from t2 where t2.y = t1.y) select x from cte limit 1); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: Inner, predicate: t1.y IS NOT DISTINCT FROM t2.y AND t1.x = t2.x, output: [t1.x, t1.y] } + ├─BatchExchange { order: [], dist: HashShard(t1.y) } + │ └─BatchScan { table: t1, columns: [t1.x, t1.y], distribution: SomeShard } + └─BatchGroupTopN { order: [t2.y ASC], limit: 1, offset: 0, group_key: [t2.y] } + └─BatchExchange { order: [], dist: HashShard(t2.y) } + └─BatchProject { exprs: [t2.y, t2.x] } + └─BatchFilter { predicate: IsNotNull(t2.y) } + └─BatchScan { table: t2, columns: [t2.x, t2.y], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, y, t1._row_id(hidden), t2.y(hidden)], stream_key: [t1._row_id, y, x], pk_columns: [t1._row_id, y, x], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(t1.x, t1.y, t1._row_id) } + └─StreamHashJoin { type: Inner, predicate: t1.y IS NOT DISTINCT FROM t2.y AND t1.x = t2.x, output: [t1.x, t1.y, t1._row_id, t2.y] } + ├─StreamExchange { dist: HashShard(t1.y) } + │ └─StreamTableScan { table: t1, columns: [t1.x, t1.y, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamGroupTopN { order: [t2.y ASC], limit: 1, offset: 0, group_key: [t2.y] } + └─StreamExchange { dist: HashShard(t2.y) } + └─StreamProject { exprs: [t2.y, t2.x, t2._row_id] } + └─StreamFilter { predicate: IsNotNull(t2.y) } + └─StreamTableScan { table: t2, columns: [t2.x, t2.y, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) } diff --git a/src/frontend/src/binder/expr/column.rs b/src/frontend/src/binder/expr/column.rs index cf4f295e960a0..8a8ea6b87f7f5 100644 --- a/src/frontend/src/binder/expr/column.rs +++ b/src/frontend/src/binder/expr/column.rs @@ -151,14 +151,14 @@ impl Binder { } } - for (i, lateral_context) in lateral_contexts.iter().rev().enumerate() { + for (j, lateral_context) in lateral_contexts.iter().rev().enumerate() { if lateral_context.is_visible { let context = &lateral_context.context; if matches!(context.clause, Some(Clause::Insert)) { continue; } // correlated input ref from lateral context `depth` starts from 1. - let depth = i + 1; + let depth = i + j + 1; match context.get_column_binding_index(&table_name, &column_name) { Ok(index) => { let column = &context.columns[index]; diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index af4121e1b9056..861edf575f510 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -15,6 +15,7 @@ use std::collections::hash_map::Entry; use std::ops::Deref; +use either::Either; use itertools::{EitherOrBoth, Itertools}; use risingwave_common::bail; use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME}; @@ -134,6 +135,15 @@ impl Relation { with_ordinality: _, } => table_function .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id), + Relation::Share(share) => match &mut share.input { + BoundShareInput::Query(query) => match query { + Either::Left(query) => query + .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id), + Either::Right(_) => vec![], + }, + BoundShareInput::ChangeLog(change_log) => change_log + .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id), + }, _ => vec![], } }