From aa827ce2ff2dc0a8c03885701f6c1c7ed7bcc93e Mon Sep 17 00:00:00 2001 From: Bowen <36908971+BowenXiao1999@users.noreply.github.com> Date: Tue, 6 Sep 2022 15:56:40 +0800 Subject: [PATCH] refactor: always insert a exchange singleton on top dml (#4752) * refactor: always insert a exchange singleton on top dml * detect chunk empty error problem & update planner test * fix rebase Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- src/frontend/src/optimizer/mod.rs | 10 +++- .../tests/testdata/basic_query.yaml | 16 +++--- .../tests/testdata/index_selection.yaml | 18 ++++--- .../test_runner/tests/testdata/insert.yaml | 51 +++++++++++-------- .../test_runner/tests/testdata/update.yaml | 39 +++++++------- 5 files changed, 79 insertions(+), 55 deletions(-) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index fb4721a3ad1ee..35480d715099b 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -35,7 +35,7 @@ use self::plan_node::{BatchProject, Convention, LogicalProject, StreamMaterializ use self::property::RequiredDist; use self::rule::*; use crate::catalog::TableId; -use crate::optimizer::plan_node::BatchExchange; +use crate::optimizer::plan_node::{BatchExchange, PlanNodeType}; use crate::optimizer::plan_visitor::{has_batch_exchange, has_logical_apply}; use crate::optimizer::property::Distribution; use crate::utils::Condition; @@ -325,6 +325,14 @@ impl PlanRoot { ctx.trace("To Batch Distributed Plan:".to_string()); ctx.trace(plan.explain_to_string().unwrap()); } + // Always insert a exchange singleton for batch dml. + // TODO: Support local dml and + if plan.node_type() == PlanNodeType::BatchUpdate + || plan.node_type() == PlanNodeType::BatchInsert + || plan.node_type() == PlanNodeType::BatchDelete + { + plan = BatchExchange::new(plan, Order::any(), Distribution::Single).into(); + } Ok(plan) } diff --git a/src/frontend/test_runner/tests/testdata/basic_query.yaml b/src/frontend/test_runner/tests/testdata/basic_query.yaml index 69500f157685b..fccac1fec9223 100644 --- a/src/frontend/test_runner/tests/testdata/basic_query.yaml +++ b/src/frontend/test_runner/tests/testdata/basic_query.yaml @@ -110,17 +110,19 @@ create table t (v1 int, v2 int); delete from t; batch_plan: | - BatchDelete { table: t } - BatchExchange { order: [], dist: Single } - BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } + BatchExchange { order: [], dist: Single } + BatchDelete { table: t } + BatchExchange { order: [], dist: Single } + BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); delete from t where v1 = 1; batch_plan: | - BatchDelete { table: t } - BatchExchange { order: [], dist: Single } - BatchFilter { predicate: (t.v1 = 1:Int32) } - BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } + BatchExchange { order: [], dist: Single } + BatchDelete { table: t } + BatchExchange { order: [], dist: Single } + BatchFilter { predicate: (t.v1 = 1:Int32) } + BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } - sql: | select * from generate_series('2'::INT,'10'::INT,'2'::INT); batch_plan: | diff --git a/src/frontend/test_runner/tests/testdata/index_selection.yaml b/src/frontend/test_runner/tests/testdata/index_selection.yaml index f43af573cbc96..2c2cbe273433b 100644 --- a/src/frontend/test_runner/tests/testdata/index_selection.yaml +++ b/src/frontend/test_runner/tests/testdata/index_selection.yaml @@ -181,10 +181,11 @@ create index idx3 on t1(c); delete from t1 where b = 2; batch_plan: | - BatchDelete { table: t1 } - BatchExchange { order: [], dist: Single } - BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } - BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard } + BatchExchange { order: [], dist: Single } + BatchDelete { table: t1 } + BatchExchange { order: [], dist: Single } + BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } + BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard } batch_local_plan: | BatchDelete { table: t1 } BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } @@ -197,10 +198,11 @@ create index idx3 on t1(c); update t1 set c = 3 where a = 1 and b = 2; batch_plan: | - BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] } - BatchExchange { order: [], dist: Single } - BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } - BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)), idx2.a = Int32(1)], distribution: SomeShard } + BatchExchange { order: [], dist: Single } + BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] } + BatchExchange { order: [], dist: Single } + BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } + BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)), idx2.a = Int32(1)], distribution: SomeShard } batch_local_plan: | BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] } BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } diff --git a/src/frontend/test_runner/tests/testdata/insert.yaml b/src/frontend/test_runner/tests/testdata/insert.yaml index d28da4560ac7e..b0033b6aeefef 100644 --- a/src/frontend/test_runner/tests/testdata/insert.yaml +++ b/src/frontend/test_runner/tests/testdata/insert.yaml @@ -4,15 +4,17 @@ create table t (v1 int, v2 int); insert into t values (22, 33), (44, 55); batch_plan: | - BatchInsert { table: t } - BatchValues { rows: [[22:Int32, 33:Int32], [44:Int32, 55:Int32]] } + BatchExchange { order: [], dist: Single } + BatchInsert { table: t } + BatchValues { rows: [[22:Int32, 33:Int32], [44:Int32, 55:Int32]] } - sql: | /* insert values on assign-castable types */ create table t (v1 real, v2 int); insert into t values (22.33, '33'), (44, 55.0); batch_plan: | - BatchInsert { table: t } - BatchValues { rows: [[22.33:Decimal::Float32, '33':Varchar::Int32], [44:Int32::Float32, 55.0:Decimal::Int32]] } + BatchExchange { order: [], dist: Single } + BatchInsert { table: t } + BatchValues { rows: [[22.33:Decimal::Float32, '33':Varchar::Int32], [44:Int32::Float32, 55.0:Decimal::Int32]] } - sql: | /* insert values on non-assign-castable types */ create table t (v1 real, v2 int); @@ -28,15 +30,17 @@ create table t(v1 int); insert into t values(NULL); batch_plan: | - BatchInsert { table: t } - BatchValues { rows: [[null:Int32]] } + BatchExchange { order: [], dist: Single } + BatchInsert { table: t } + BatchValues { rows: [[null:Int32]] } - sql: | /* insert values cast each expr rather than whole `VALUES` (compare with below) */ create table t (v1 time); insert into t values (timestamp '2020-01-01 01:02:03'), (time '03:04:05'); batch_plan: | - BatchInsert { table: t } - BatchValues { rows: [['2020-01-01 01:02:03':Varchar::Timestamp::Time], ['03:04:05':Varchar::Time]] } + BatchExchange { order: [], dist: Single } + BatchInsert { table: t } + BatchValues { rows: [['2020-01-01 01:02:03':Varchar::Timestamp::Time], ['03:04:05':Varchar::Time]] } - sql: | /* a `VALUES` without insert context may be invalid on its own (compare with above) */ create table t (v1 time); @@ -66,18 +70,20 @@ create table t (v1 time); insert into t select v1 from t; batch_plan: | - BatchInsert { table: t } - BatchExchange { order: [], dist: Single } - BatchScan { table: t, columns: [t.v1], distribution: SomeShard } + BatchExchange { order: [], dist: Single } + BatchInsert { table: t } + BatchExchange { order: [], dist: Single } + BatchScan { table: t, columns: [t.v1], distribution: SomeShard } - sql: | /* insert into select with cast */ create table t (v1 time, v2 int, v3 real); insert into t select timestamp '2020-01-01 01:02:03', 11, 4.5 from t; batch_plan: | - BatchInsert { table: t } - BatchExchange { order: [], dist: Single } - BatchProject { exprs: ['2020-01-01 01:02:03':Varchar::Timestamp::Time, 11:Int32, 4.5:Decimal::Float32] } - BatchScan { table: t, columns: [], distribution: SomeShard } + BatchExchange { order: [], dist: Single } + BatchInsert { table: t } + BatchExchange { order: [], dist: Single } + BatchProject { exprs: ['2020-01-01 01:02:03':Varchar::Timestamp::Time, 11:Int32, 4.5:Decimal::Float32] } + BatchScan { table: t, columns: [], distribution: SomeShard } - sql: | /* insert into select with cast error */ create table t (v1 timestamp, v2 real); @@ -96,10 +102,11 @@ create table t3 (e int, f int); insert into t1 select c, e from t2 join t3 on t2.d = t3.f batch_plan: | - BatchInsert { table: t1 } - BatchExchange { order: [], dist: Single } - BatchHashJoin { type: Inner, predicate: t2.d = t3.f, output: [t2.c, t3.e] } - BatchExchange { order: [], dist: HashShard(t2.d) } - BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard } - BatchExchange { order: [], dist: HashShard(t3.f) } - BatchScan { table: t3, columns: [t3.e, t3.f], distribution: SomeShard } + BatchExchange { order: [], dist: Single } + BatchInsert { table: t1 } + BatchExchange { order: [], dist: Single } + BatchHashJoin { type: Inner, predicate: t2.d = t3.f, output: [t2.c, t3.e] } + BatchExchange { order: [], dist: HashShard(t2.d) } + BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard } + BatchExchange { order: [], dist: HashShard(t3.f) } + BatchScan { table: t3, columns: [t3.e, t3.f], distribution: SomeShard } diff --git a/src/frontend/test_runner/tests/testdata/update.yaml b/src/frontend/test_runner/tests/testdata/update.yaml index c3687db95ab72..68a9101534082 100644 --- a/src/frontend/test_runner/tests/testdata/update.yaml +++ b/src/frontend/test_runner/tests/testdata/update.yaml @@ -3,9 +3,10 @@ create table t (v1 int, v2 int); update t set v1 = 0; batch_plan: | - BatchUpdate { table: t, exprs: [0:Int32, $1, $2] } - BatchExchange { order: [], dist: Single } - BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } + BatchExchange { order: [], dist: Single } + BatchUpdate { table: t, exprs: [0:Int32, $1, $2] } + BatchExchange { order: [], dist: Single } + BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); update t set v1 = true; @@ -14,29 +15,33 @@ create table t (v1 int, v2 int); update t set v1 = v2 + 1; batch_plan: | - BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] } - BatchExchange { order: [], dist: Single } - BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } + BatchExchange { order: [], dist: Single } + BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] } + BatchExchange { order: [], dist: Single } + BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 real); update t set v1 = v2; batch_plan: | - BatchUpdate { table: t, exprs: [$1::Int32, $1, $2] } - BatchExchange { order: [], dist: Single } - BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } + BatchExchange { order: [], dist: Single } + BatchUpdate { table: t, exprs: [$1::Int32, $1, $2] } + BatchExchange { order: [], dist: Single } + BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); update t set v1 = v2 + 1 where v2 > 0; batch_plan: | - BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] } - BatchExchange { order: [], dist: Single } - BatchFilter { predicate: (t.v2 > 0:Int32) } - BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } + BatchExchange { order: [], dist: Single } + BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] } + BatchExchange { order: [], dist: Single } + BatchFilter { predicate: (t.v2 > 0:Int32) } + BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); update t set (v1, v2) = (v2 + 1, v1 - 1) where v1 != v2; batch_plan: | - BatchUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2] } - BatchExchange { order: [], dist: Single } - BatchFilter { predicate: (t.v1 <> t.v2) } - BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } + BatchExchange { order: [], dist: Single } + BatchUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2] } + BatchExchange { order: [], dist: Single } + BatchFilter { predicate: (t.v1 <> t.v2) } + BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }