From ebfafe571f72cc22a3eceb5674ca2d3cc9caa42c Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 23 Oct 2023 13:39:14 +0800 Subject: [PATCH] feat(planner): support dml subquery (#12995) --- .../tests/testdata/input/update.yaml | 17 ++++++++- .../tests/testdata/output/update.yaml | 38 +++++++++++++++++++ src/frontend/src/planner/delete.rs | 4 +- src/frontend/src/planner/select.rs | 6 ++- src/frontend/src/planner/update.rs | 3 +- 5 files changed, 62 insertions(+), 6 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/update.yaml b/src/frontend/planner_test/tests/testdata/input/update.yaml index a63e5192073e6..9487b396d924a 100644 --- a/src/frontend/planner_test/tests/testdata/input/update.yaml +++ b/src/frontend/planner_test/tests/testdata/input/update.yaml @@ -75,4 +75,19 @@ create table t(v1 int as v2-1, v2 int, v3 int as v2+1, primary key (v3)); update t set v2 = 3; expected_outputs: - - binder_error \ No newline at end of file + - binder_error +- name: update subquery + sql: | + create table t (a int, b int); + update t set a = 777 where b not in (select a from t); + expected_outputs: + - logical_plan + - batch_plan + +- name: delete subquery + sql: | + create table t (a int, b int); + delete from t where a not in (select b from t); + expected_outputs: + - logical_plan + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/update.yaml b/src/frontend/planner_test/tests/testdata/output/update.yaml index f3a27a3d2e514..3db7ac3501018 100644 --- a/src/frontend/planner_test/tests/testdata/output/update.yaml +++ b/src/frontend/planner_test/tests/testdata/output/update.yaml @@ -116,3 +116,41 @@ create table t(v1 int as v2-1, v2 int, v3 int as v2+1, primary key (v3)); update t set v2 = 3; binder_error: 'Bind error: update modifying the column referenced by generated columns that are part of the primary key is not allowed' +- name: update subquery + sql: | + create table t (a int, b int); + update t set a = 777 where b not in (select a from t); + logical_plan: |- + LogicalUpdate { table: t, exprs: [777:Int32, $1, $2] } + └─LogicalApply { type: LeftAnti, on: (t.b = t.a), correlated_id: 1 } + ├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] } + └─LogicalProject { exprs: [t.a] } + └─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] } + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchUpdate { table: t, exprs: [777:Int32, $1, $2] } + └─BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: LeftAnti, predicate: t.b = t.a, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.b) } + │ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) } + └─BatchExchange { order: [], dist: HashShard(t.a) } + └─BatchScan { table: t, columns: [t.a], distribution: SomeShard } +- name: delete subquery + sql: | + create table t (a int, b int); + delete from t where a not in (select b from t); + logical_plan: |- + LogicalDelete { table: t } + └─LogicalApply { type: LeftAnti, on: (t.a = t.b), correlated_id: 1 } + ├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] } + └─LogicalProject { exprs: [t.b] } + └─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] } + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchDelete { table: t } + └─BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: LeftAnti, predicate: t.a = t.b, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.a) } + │ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) } + └─BatchExchange { order: [], dist: HashShard(t.b) } + └─BatchScan { table: t, columns: [t.b], distribution: SomeShard } diff --git a/src/frontend/src/planner/delete.rs b/src/frontend/src/planner/delete.rs index c6d779ed26603..00bb7fd59ae9a 100644 --- a/src/frontend/src/planner/delete.rs +++ b/src/frontend/src/planner/delete.rs @@ -17,7 +17,7 @@ use risingwave_common::error::Result; use super::Planner; use crate::binder::BoundDelete; -use crate::optimizer::plan_node::{generic, LogicalDelete, LogicalFilter, LogicalProject}; +use crate::optimizer::plan_node::{generic, LogicalDelete, LogicalProject}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{PlanRef, PlanRoot}; @@ -25,7 +25,7 @@ impl Planner { pub(super) fn plan_delete(&mut self, delete: BoundDelete) -> Result { let scan = self.plan_base_table(&delete.table)?; let input = if let Some(expr) = delete.selection { - LogicalFilter::create_with_expr(scan, expr) + self.plan_where(scan, expr)? } else { scan }; diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index c21538534ac24..96b32680309df 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -216,7 +216,11 @@ impl Planner { /// `LeftSemi/LeftAnti` [`LogicalApply`] /// For other subqueries, we plan it as `LeftOuter` [`LogicalApply`] using /// [`Self::substitute_subqueries`]. - fn plan_where(&mut self, mut input: PlanRef, where_clause: ExprImpl) -> Result { + pub(super) fn plan_where( + &mut self, + mut input: PlanRef, + where_clause: ExprImpl, + ) -> Result { if !where_clause.has_subquery() { return Ok(LogicalFilter::create_with_expr(input, where_clause)); } diff --git a/src/frontend/src/planner/update.rs b/src/frontend/src/planner/update.rs index d04ffc4132391..f73f5354436fd 100644 --- a/src/frontend/src/planner/update.rs +++ b/src/frontend/src/planner/update.rs @@ -16,7 +16,6 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::error::Result; -use super::select::LogicalFilter; use super::Planner; use crate::binder::BoundUpdate; use crate::optimizer::plan_node::{generic, LogicalProject, LogicalUpdate}; @@ -27,7 +26,7 @@ impl Planner { pub(super) fn plan_update(&mut self, update: BoundUpdate) -> Result { let scan = self.plan_base_table(&update.table)?; let input = if let Some(expr) = update.selection { - LogicalFilter::create_with_expr(scan, expr) + self.plan_where(scan, expr)? } else { scan };