Skip to content

Commit

Permalink
fix(optimizer): fix lateral subquery with cte (#19162) (#19202)
Browse files Browse the repository at this point in the history
Co-authored-by: Dylan <[email protected]>
  • Loading branch information
github-actions[bot] and chenzl25 authored Nov 1, 2024
1 parent 3e96f7c commit 4f78be9
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,12 @@
)d on true;
expected_outputs:
- batch_plan
- name: lateral join with CTE
sql: |
create table t1(x int, y int);
create table t2(x int, y int);
select * from t1, lateral (
with cte as (select * from t2 where t2.y = t1.y) select x from cte
);
expected_outputs:
- batch_plan
Original file line number Diff line number Diff line change
Expand Up @@ -584,3 +584,11 @@
from rawdata
expected_outputs:
- optimized_logical_plan_for_batch
- name: subquery with CTE
sql: |
create table t1(x int, y int);
create table t2(x int, y int);
select * from t1 where t1.x = ( with cte as (select * from t2 where t2.y = t1.y) select x from cte limit 1);
expected_outputs:
- batch_plan
- stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,18 @@
│ └─BatchValues { rows: [[1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz]] }
└─BatchExchange { order: [], dist: HashShard(r.src_id) }
└─BatchScan { table: r, columns: [r.ts, r.src_id, r.dev_id], distribution: SomeShard }
- name: lateral join with CTE
sql: |
create table t1(x int, y int);
create table t2(x int, y int);
select * from t1, lateral (
with cte as (select * from t2 where t2.y = t1.y) select x from cte
);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: t1.y IS NOT DISTINCT FROM t2.y, output: [t1.x, t1.y, t2.x] }
├─BatchExchange { order: [], dist: HashShard(t1.y) }
│ └─BatchScan { table: t1, columns: [t1.x, t1.y], distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t2.y) }
└─BatchFilter { predicate: IsNotNull(t2.y) }
└─BatchScan { table: t2, columns: [t2.x, t2.y], distribution: SomeShard }
Original file line number Diff line number Diff line change
Expand Up @@ -2193,3 +2193,29 @@
└─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
└─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
- name: subquery with CTE
sql: |
create table t1(x int, y int);
create table t2(x int, y int);
select * from t1 where t1.x = ( with cte as (select * from t2 where t2.y = t1.y) select x from cte limit 1);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: t1.y IS NOT DISTINCT FROM t2.y AND t1.x = t2.x, output: [t1.x, t1.y] }
├─BatchExchange { order: [], dist: HashShard(t1.y) }
│ └─BatchScan { table: t1, columns: [t1.x, t1.y], distribution: SomeShard }
└─BatchGroupTopN { order: [t2.y ASC], limit: 1, offset: 0, group_key: [t2.y] }
└─BatchExchange { order: [], dist: HashShard(t2.y) }
└─BatchProject { exprs: [t2.y, t2.x] }
└─BatchFilter { predicate: IsNotNull(t2.y) }
└─BatchScan { table: t2, columns: [t2.x, t2.y], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, y, t1._row_id(hidden), t2.y(hidden)], stream_key: [t1._row_id, y, x], pk_columns: [t1._row_id, y, x], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(t1.x, t1.y, t1._row_id) }
└─StreamHashJoin { type: Inner, predicate: t1.y IS NOT DISTINCT FROM t2.y AND t1.x = t2.x, output: [t1.x, t1.y, t1._row_id, t2.y] }
├─StreamExchange { dist: HashShard(t1.y) }
│ └─StreamTableScan { table: t1, columns: [t1.x, t1.y, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamGroupTopN { order: [t2.y ASC], limit: 1, offset: 0, group_key: [t2.y] }
└─StreamExchange { dist: HashShard(t2.y) }
└─StreamProject { exprs: [t2.y, t2.x, t2._row_id] }
└─StreamFilter { predicate: IsNotNull(t2.y) }
└─StreamTableScan { table: t2, columns: [t2.x, t2.y, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
4 changes: 2 additions & 2 deletions src/frontend/src/binder/expr/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ impl Binder {
}
}

for (i, lateral_context) in lateral_contexts.iter().rev().enumerate() {
for (j, lateral_context) in lateral_contexts.iter().rev().enumerate() {
if lateral_context.is_visible {
let context = &lateral_context.context;
if matches!(context.clause, Some(Clause::Insert)) {
continue;
}
// correlated input ref from lateral context `depth` starts from 1.
let depth = i + 1;
let depth = i + j + 1;
match context.get_column_binding_index(&table_name, &column_name) {
Ok(index) => {
let column = &context.columns[index];
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::hash_map::Entry;
use std::ops::Deref;

use either::Either;
use itertools::{EitherOrBoth, Itertools};
use risingwave_common::bail;
use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME};
Expand Down Expand Up @@ -134,6 +135,15 @@ impl Relation {
with_ordinality: _,
} => table_function
.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
Relation::Share(share) => match &mut share.input {
BoundShareInput::Query(query) => match query {
Either::Left(query) => query
.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
Either::Right(_) => vec![],
},
BoundShareInput::ChangeLog(change_log) => change_log
.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
},
_ => vec![],
}
}
Expand Down

0 comments on commit 4f78be9

Please sign in to comment.