Skip to content

Commit

Permalink
feat(optimizer): support correlated column in order by (#12341)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Sep 15, 2023
1 parent 8a36ca3 commit c443197
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
13 changes: 13 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,16 @@
select x from t1 where y not in (select y from t2);
expected_outputs:
- logical_plan
- sql: |
create table t1 (a int);
create table t2 (b int);
SELECT *
FROM t1
WHERE EXISTS
(SELECT 1
FROM t2
GROUP BY a
ORDER BY a DESC LIMIT 90);
expected_outputs:
- logical_plan
- batch_plan
38 changes: 38 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,41 @@
├─LogicalScan { table: t1, columns: [t1.x, t1.y, t1._row_id] }
└─LogicalProject { exprs: [t2.y] }
└─LogicalScan { table: t2, columns: [t2.x, t2.y, t2._row_id] }
- sql: |
create table t1 (a int);
create table t2 (b int);
SELECT *
FROM t1
WHERE EXISTS
(SELECT 1
FROM t2
GROUP BY a
ORDER BY a DESC LIMIT 90);
logical_plan: |-
LogicalProject { exprs: [t1.a] }
└─LogicalApply { type: LeftSemi, on: true, correlated_id: 1 }
├─LogicalScan { table: t1, columns: [t1.a, t1._row_id] }
└─LogicalProject { exprs: [1:Int32] }
└─LogicalTopN { order: [$expr2 DESC], limit: 90, offset: 0 }
└─LogicalProject { exprs: [1:Int32, CorrelatedInputRef { index: 0, correlated_id: 1 } as $expr2] }
└─LogicalAgg { group_key: [$expr1], aggs: [] }
└─LogicalProject { exprs: [CorrelatedInputRef { index: 0, correlated_id: 1 } as $expr1] }
└─LogicalScan { table: t2, columns: [t2.b, t2._row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t1.a, output: all }
├─BatchExchange { order: [], dist: HashShard(t1.a) }
│ └─BatchScan { table: t1, columns: [t1.a], distribution: SomeShard }
└─BatchProject { exprs: [t1.a] }
└─BatchGroupTopN { order: [t1.a DESC], limit: 90, offset: 0, group_key: [t1.a] }
└─BatchExchange { order: [], dist: HashShard(t1.a) }
└─BatchProject { exprs: [t1.a, t1.a] }
└─BatchHashAgg { group_key: [t1.a], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t1.a) }
└─BatchNestedLoopJoin { type: Inner, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
│ └─BatchHashAgg { group_key: [t1.a], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(t1.a) }
│ └─BatchScan { table: t1, columns: [t1.a], distribution: SomeShard }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t2, columns: [], distribution: SomeShard }
14 changes: 11 additions & 3 deletions src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,17 @@ impl BoundQuery {
depth: Depth,
correlated_id: CorrelatedId,
) -> Vec<usize> {
// TODO: collect `correlated_input_ref` in `extra_order_exprs`.
self.body
.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id)
let mut correlated_indices = vec![];

correlated_indices.extend(
self.body
.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
);

correlated_indices.extend(self.extra_order_exprs.iter_mut().flat_map(|expr| {
expr.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id)
}));
correlated_indices
}

/// Simple `VALUES` without other clauses.
Expand Down

0 comments on commit c443197

Please sign in to comment.