Skip to content

Commit

Permalink
feat(planner): support dml subquery (#12995)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Oct 23, 2023
1 parent 3881de2 commit ebfafe5
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 6 deletions.
17 changes: 16 additions & 1 deletion src/frontend/planner_test/tests/testdata/input/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,19 @@
create table t(v1 int as v2-1, v2 int, v3 int as v2+1, primary key (v3));
update t set v2 = 3;
expected_outputs:
- binder_error
- binder_error
- name: update subquery
sql: |
create table t (a int, b int);
update t set a = 777 where b not in (select a from t);
expected_outputs:
- logical_plan
- batch_plan

- name: delete subquery
sql: |
create table t (a int, b int);
delete from t where a not in (select b from t);
expected_outputs:
- logical_plan
- batch_plan
38 changes: 38 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,41 @@
create table t(v1 int as v2-1, v2 int, v3 int as v2+1, primary key (v3));
update t set v2 = 3;
binder_error: 'Bind error: update modifying the column referenced by generated columns that are part of the primary key is not allowed'
- name: update subquery
sql: |
create table t (a int, b int);
update t set a = 777 where b not in (select a from t);
logical_plan: |-
LogicalUpdate { table: t, exprs: [777:Int32, $1, $2] }
└─LogicalApply { type: LeftAnti, on: (t.b = t.a), correlated_id: 1 }
├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
└─LogicalProject { exprs: [t.a] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchUpdate { table: t, exprs: [777:Int32, $1, $2] }
└─BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: LeftAnti, predicate: t.b = t.a, output: all }
├─BatchExchange { order: [], dist: HashShard(t.b) }
│ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
└─BatchExchange { order: [], dist: HashShard(t.a) }
└─BatchScan { table: t, columns: [t.a], distribution: SomeShard }
- name: delete subquery
sql: |
create table t (a int, b int);
delete from t where a not in (select b from t);
logical_plan: |-
LogicalDelete { table: t }
└─LogicalApply { type: LeftAnti, on: (t.a = t.b), correlated_id: 1 }
├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
└─LogicalProject { exprs: [t.b] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchDelete { table: t }
└─BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: LeftAnti, predicate: t.a = t.b, output: all }
├─BatchExchange { order: [], dist: HashShard(t.a) }
│ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
└─BatchExchange { order: [], dist: HashShard(t.b) }
└─BatchScan { table: t, columns: [t.b], distribution: SomeShard }
4 changes: 2 additions & 2 deletions src/frontend/src/planner/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ use risingwave_common::error::Result;

use super::Planner;
use crate::binder::BoundDelete;
use crate::optimizer::plan_node::{generic, LogicalDelete, LogicalFilter, LogicalProject};
use crate::optimizer::plan_node::{generic, LogicalDelete, LogicalProject};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{PlanRef, PlanRoot};

impl Planner {
pub(super) fn plan_delete(&mut self, delete: BoundDelete) -> Result<PlanRoot> {
let scan = self.plan_base_table(&delete.table)?;
let input = if let Some(expr) = delete.selection {
LogicalFilter::create_with_expr(scan, expr)
self.plan_where(scan, expr)?
} else {
scan
};
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/planner/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ impl Planner {
/// `LeftSemi/LeftAnti` [`LogicalApply`]
/// For other subqueries, we plan it as `LeftOuter` [`LogicalApply`] using
/// [`Self::substitute_subqueries`].
fn plan_where(&mut self, mut input: PlanRef, where_clause: ExprImpl) -> Result<PlanRef> {
pub(super) fn plan_where(
&mut self,
mut input: PlanRef,
where_clause: ExprImpl,
) -> Result<PlanRef> {
if !where_clause.has_subquery() {
return Ok(LogicalFilter::create_with_expr(input, where_clause));
}
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/planner/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::error::Result;

use super::select::LogicalFilter;
use super::Planner;
use crate::binder::BoundUpdate;
use crate::optimizer::plan_node::{generic, LogicalProject, LogicalUpdate};
Expand All @@ -27,7 +26,7 @@ impl Planner {
pub(super) fn plan_update(&mut self, update: BoundUpdate) -> Result<PlanRoot> {
let scan = self.plan_base_table(&update.table)?;
let input = if let Some(expr) = update.selection {
LogicalFilter::create_with_expr(scan, expr)
self.plan_where(scan, expr)?
} else {
scan
};
Expand Down

0 comments on commit ebfafe5

Please sign in to comment.