Skip to content

Commit

Permalink
refactor: always insert a exchange singleton on top dml (#4752)
Browse files Browse the repository at this point in the history
* refactor: always insert a exchange singleton on top dml

* detect chunk empty error problem & update planner test

* fix rebase

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
BowenXiao1999 and mergify[bot] authored Sep 6, 2022
1 parent 36f3b58 commit aa827ce
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 55 deletions.
10 changes: 9 additions & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use self::plan_node::{BatchProject, Convention, LogicalProject, StreamMaterializ
use self::property::RequiredDist;
use self::rule::*;
use crate::catalog::TableId;
use crate::optimizer::plan_node::BatchExchange;
use crate::optimizer::plan_node::{BatchExchange, PlanNodeType};
use crate::optimizer::plan_visitor::{has_batch_exchange, has_logical_apply};
use crate::optimizer::property::Distribution;
use crate::utils::Condition;
Expand Down Expand Up @@ -325,6 +325,14 @@ impl PlanRoot {
ctx.trace("To Batch Distributed Plan:".to_string());
ctx.trace(plan.explain_to_string().unwrap());
}
// Always insert a exchange singleton for batch dml.
// TODO: Support local dml and
if plan.node_type() == PlanNodeType::BatchUpdate
|| plan.node_type() == PlanNodeType::BatchInsert
|| plan.node_type() == PlanNodeType::BatchDelete
{
plan = BatchExchange::new(plan, Order::any(), Distribution::Single).into();
}

Ok(plan)
}
Expand Down
16 changes: 9 additions & 7 deletions src/frontend/test_runner/tests/testdata/basic_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,19 @@
create table t (v1 int, v2 int);
delete from t;
batch_plan: |
BatchDelete { table: t }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchDelete { table: t }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
delete from t where v1 = 1;
batch_plan: |
BatchDelete { table: t }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v1 = 1:Int32) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchDelete { table: t }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v1 = 1:Int32) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
select * from generate_series('2'::INT,'10'::INT,'2'::INT);
batch_plan: |
Expand Down
18 changes: 10 additions & 8 deletions src/frontend/test_runner/tests/testdata/index_selection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,11 @@
create index idx3 on t1(c);
delete from t1 where b = 2;
batch_plan: |
BatchDelete { table: t1 }
BatchExchange { order: [], dist: Single }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchDelete { table: t1 }
BatchExchange { order: [], dist: Single }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard }
batch_local_plan: |
BatchDelete { table: t1 }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
Expand All @@ -197,10 +198,11 @@
create index idx3 on t1(c);
update t1 set c = 3 where a = 1 and b = 2;
batch_plan: |
BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] }
BatchExchange { order: [], dist: Single }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)), idx2.a = Int32(1)], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] }
BatchExchange { order: [], dist: Single }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)), idx2.a = Int32(1)], distribution: SomeShard }
batch_local_plan: |
BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
Expand Down
51 changes: 29 additions & 22 deletions src/frontend/test_runner/tests/testdata/insert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
create table t (v1 int, v2 int);
insert into t values (22, 33), (44, 55);
batch_plan: |
BatchInsert { table: t }
BatchValues { rows: [[22:Int32, 33:Int32], [44:Int32, 55:Int32]] }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchValues { rows: [[22:Int32, 33:Int32], [44:Int32, 55:Int32]] }
- sql: |
/* insert values on assign-castable types */
create table t (v1 real, v2 int);
insert into t values (22.33, '33'), (44, 55.0);
batch_plan: |
BatchInsert { table: t }
BatchValues { rows: [[22.33:Decimal::Float32, '33':Varchar::Int32], [44:Int32::Float32, 55.0:Decimal::Int32]] }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchValues { rows: [[22.33:Decimal::Float32, '33':Varchar::Int32], [44:Int32::Float32, 55.0:Decimal::Int32]] }
- sql: |
/* insert values on non-assign-castable types */
create table t (v1 real, v2 int);
Expand All @@ -28,15 +30,17 @@
create table t(v1 int);
insert into t values(NULL);
batch_plan: |
BatchInsert { table: t }
BatchValues { rows: [[null:Int32]] }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchValues { rows: [[null:Int32]] }
- sql: |
/* insert values cast each expr rather than whole `VALUES` (compare with below) */
create table t (v1 time);
insert into t values (timestamp '2020-01-01 01:02:03'), (time '03:04:05');
batch_plan: |
BatchInsert { table: t }
BatchValues { rows: [['2020-01-01 01:02:03':Varchar::Timestamp::Time], ['03:04:05':Varchar::Time]] }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchValues { rows: [['2020-01-01 01:02:03':Varchar::Timestamp::Time], ['03:04:05':Varchar::Time]] }
- sql: |
/* a `VALUES` without insert context may be invalid on its own (compare with above) */
create table t (v1 time);
Expand Down Expand Up @@ -66,18 +70,20 @@
create table t (v1 time);
insert into t select v1 from t;
batch_plan: |
BatchInsert { table: t }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
- sql: |
/* insert into select with cast */
create table t (v1 time, v2 int, v3 real);
insert into t select timestamp '2020-01-01 01:02:03', 11, 4.5 from t;
batch_plan: |
BatchInsert { table: t }
BatchExchange { order: [], dist: Single }
BatchProject { exprs: ['2020-01-01 01:02:03':Varchar::Timestamp::Time, 11:Int32, 4.5:Decimal::Float32] }
BatchScan { table: t, columns: [], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchExchange { order: [], dist: Single }
BatchProject { exprs: ['2020-01-01 01:02:03':Varchar::Timestamp::Time, 11:Int32, 4.5:Decimal::Float32] }
BatchScan { table: t, columns: [], distribution: SomeShard }
- sql: |
/* insert into select with cast error */
create table t (v1 timestamp, v2 real);
Expand All @@ -96,10 +102,11 @@
create table t3 (e int, f int);
insert into t1 select c, e from t2 join t3 on t2.d = t3.f
batch_plan: |
BatchInsert { table: t1 }
BatchExchange { order: [], dist: Single }
BatchHashJoin { type: Inner, predicate: t2.d = t3.f, output: [t2.c, t3.e] }
BatchExchange { order: [], dist: HashShard(t2.d) }
BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
BatchExchange { order: [], dist: HashShard(t3.f) }
BatchScan { table: t3, columns: [t3.e, t3.f], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t1 }
BatchExchange { order: [], dist: Single }
BatchHashJoin { type: Inner, predicate: t2.d = t3.f, output: [t2.c, t3.e] }
BatchExchange { order: [], dist: HashShard(t2.d) }
BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
BatchExchange { order: [], dist: HashShard(t3.f) }
BatchScan { table: t3, columns: [t3.e, t3.f], distribution: SomeShard }
39 changes: 22 additions & 17 deletions src/frontend/test_runner/tests/testdata/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
create table t (v1 int, v2 int);
update t set v1 = 0;
batch_plan: |
BatchUpdate { table: t, exprs: [0:Int32, $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [0:Int32, $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
update t set v1 = true;
Expand All @@ -14,29 +15,33 @@
create table t (v1 int, v2 int);
update t set v1 = v2 + 1;
batch_plan: |
BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 real);
update t set v1 = v2;
batch_plan: |
BatchUpdate { table: t, exprs: [$1::Int32, $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [$1::Int32, $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
update t set v1 = v2 + 1 where v2 > 0;
batch_plan: |
BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v2 > 0:Int32) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v2 > 0:Int32) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
update t set (v1, v2) = (v2 + 1, v1 - 1) where v1 != v2;
batch_plan: |
BatchUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2] }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v1 <> t.v2) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2] }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v1 <> t.v2) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }

0 comments on commit aa827ce

Please sign in to comment.