Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: always insert a exchange singleton on top dml #4752

Merged
merged 4 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use self::plan_node::{BatchProject, Convention, LogicalProject, StreamMaterializ
use self::property::RequiredDist;
use self::rule::*;
use crate::catalog::TableId;
use crate::optimizer::plan_node::BatchExchange;
use crate::optimizer::plan_node::{BatchExchange, PlanNodeType};
use crate::optimizer::plan_visitor::{has_batch_exchange, has_logical_apply};
use crate::optimizer::property::Distribution;
use crate::utils::Condition;
Expand Down Expand Up @@ -325,6 +325,14 @@ impl PlanRoot {
ctx.trace("To Batch Distributed Plan:".to_string());
ctx.trace(plan.explain_to_string().unwrap());
}
// Always insert a exchange singleton for batch dml.
// TODO: Support local dml and
if plan.node_type() == PlanNodeType::BatchUpdate
|| plan.node_type() == PlanNodeType::BatchInsert
|| plan.node_type() == PlanNodeType::BatchDelete
{
plan = BatchExchange::new(plan, Order::any(), Distribution::Single).into();
}

Ok(plan)
}
Expand Down
16 changes: 9 additions & 7 deletions src/frontend/test_runner/tests/testdata/basic_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,19 @@
create table t (v1 int, v2 int);
delete from t;
batch_plan: |
BatchDelete { table: t }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchDelete { table: t }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
delete from t where v1 = 1;
batch_plan: |
BatchDelete { table: t }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v1 = 1:Int32) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchDelete { table: t }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v1 = 1:Int32) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
select * from generate_series('2'::INT,'10'::INT,'2'::INT);
batch_plan: |
Expand Down
18 changes: 10 additions & 8 deletions src/frontend/test_runner/tests/testdata/index_selection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,11 @@
create index idx3 on t1(c);
delete from t1 where b = 2;
batch_plan: |
BatchDelete { table: t1 }
BatchExchange { order: [], dist: Single }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchDelete { table: t1 }
BatchExchange { order: [], dist: Single }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard }
batch_local_plan: |
BatchDelete { table: t1 }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
Expand All @@ -197,10 +198,11 @@
create index idx3 on t1(c);
update t1 set c = 3 where a = 1 and b = 2;
batch_plan: |
BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] }
BatchExchange { order: [], dist: Single }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)), idx2.a = Int32(1)], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] }
BatchExchange { order: [], dist: Single }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)), idx2.a = Int32(1)], distribution: SomeShard }
batch_local_plan: |
BatchUpdate { table: t1, exprs: [$0, $1, 3:Int32::Int64, $3] }
BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] }
Expand Down
51 changes: 29 additions & 22 deletions src/frontend/test_runner/tests/testdata/insert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
create table t (v1 int, v2 int);
insert into t values (22, 33), (44, 55);
batch_plan: |
BatchInsert { table: t }
BatchValues { rows: [[22:Int32, 33:Int32], [44:Int32, 55:Int32]] }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchValues { rows: [[22:Int32, 33:Int32], [44:Int32, 55:Int32]] }
- sql: |
/* insert values on assign-castable types */
create table t (v1 real, v2 int);
insert into t values (22.33, '33'), (44, 55.0);
batch_plan: |
BatchInsert { table: t }
BatchValues { rows: [[22.33:Decimal::Float32, '33':Varchar::Int32], [44:Int32::Float32, 55.0:Decimal::Int32]] }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchValues { rows: [[22.33:Decimal::Float32, '33':Varchar::Int32], [44:Int32::Float32, 55.0:Decimal::Int32]] }
- sql: |
/* insert values on non-assign-castable types */
create table t (v1 real, v2 int);
Expand All @@ -28,15 +30,17 @@
create table t(v1 int);
insert into t values(NULL);
batch_plan: |
BatchInsert { table: t }
BatchValues { rows: [[null:Int32]] }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchValues { rows: [[null:Int32]] }
- sql: |
/* insert values cast each expr rather than whole `VALUES` (compare with below) */
create table t (v1 time);
insert into t values (timestamp '2020-01-01 01:02:03'), (time '03:04:05');
batch_plan: |
BatchInsert { table: t }
BatchValues { rows: [['2020-01-01 01:02:03':Varchar::Timestamp::Time], ['03:04:05':Varchar::Time]] }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchValues { rows: [['2020-01-01 01:02:03':Varchar::Timestamp::Time], ['03:04:05':Varchar::Time]] }
- sql: |
/* a `VALUES` without insert context may be invalid on its own (compare with above) */
create table t (v1 time);
Expand Down Expand Up @@ -66,18 +70,20 @@
create table t (v1 time);
insert into t select v1 from t;
batch_plan: |
BatchInsert { table: t }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
- sql: |
/* insert into select with cast */
create table t (v1 time, v2 int, v3 real);
insert into t select timestamp '2020-01-01 01:02:03', 11, 4.5 from t;
batch_plan: |
BatchInsert { table: t }
BatchExchange { order: [], dist: Single }
BatchProject { exprs: ['2020-01-01 01:02:03':Varchar::Timestamp::Time, 11:Int32, 4.5:Decimal::Float32] }
BatchScan { table: t, columns: [], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t }
BatchExchange { order: [], dist: Single }
BatchProject { exprs: ['2020-01-01 01:02:03':Varchar::Timestamp::Time, 11:Int32, 4.5:Decimal::Float32] }
BatchScan { table: t, columns: [], distribution: SomeShard }
- sql: |
/* insert into select with cast error */
create table t (v1 timestamp, v2 real);
Expand All @@ -96,10 +102,11 @@
create table t3 (e int, f int);
insert into t1 select c, e from t2 join t3 on t2.d = t3.f
batch_plan: |
BatchInsert { table: t1 }
BatchExchange { order: [], dist: Single }
BatchHashJoin { type: Inner, predicate: t2.d = t3.f, output: [t2.c, t3.e] }
BatchExchange { order: [], dist: HashShard(t2.d) }
BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
BatchExchange { order: [], dist: HashShard(t3.f) }
BatchScan { table: t3, columns: [t3.e, t3.f], distribution: SomeShard }
BatchExchange { order: [], dist: Single }
BatchInsert { table: t1 }
BatchExchange { order: [], dist: Single }
BatchHashJoin { type: Inner, predicate: t2.d = t3.f, output: [t2.c, t3.e] }
BatchExchange { order: [], dist: HashShard(t2.d) }
BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
BatchExchange { order: [], dist: HashShard(t3.f) }
BatchScan { table: t3, columns: [t3.e, t3.f], distribution: SomeShard }
39 changes: 22 additions & 17 deletions src/frontend/test_runner/tests/testdata/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
create table t (v1 int, v2 int);
update t set v1 = 0;
batch_plan: |
BatchUpdate { table: t, exprs: [0:Int32, $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [0:Int32, $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
update t set v1 = true;
Expand All @@ -14,29 +15,33 @@
create table t (v1 int, v2 int);
update t set v1 = v2 + 1;
batch_plan: |
BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 real);
update t set v1 = v2;
batch_plan: |
BatchUpdate { table: t, exprs: [$1::Int32, $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [$1::Int32, $1, $2] }
BatchExchange { order: [], dist: Single }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
update t set v1 = v2 + 1 where v2 > 0;
batch_plan: |
BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v2 > 0:Int32) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [($1 + 1:Int32), $1, $2] }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v2 > 0:Int32) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int, v2 int);
update t set (v1, v2) = (v2 + 1, v1 - 1) where v1 != v2;
batch_plan: |
BatchUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2] }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v1 <> t.v2) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }
BatchExchange { order: [], dist: Single }
BatchUpdate { table: t, exprs: [($1 + 1:Int32), ($0 - 1:Int32), $2] }
BatchExchange { order: [], dist: Single }
BatchFilter { predicate: (t.v1 <> t.v2) }
BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) }