Skip to content

Commit

Permalink
feat(frontend): change index default distributed by columns (#11865)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored and Li0k committed Sep 15, 2023
1 parent b072774 commit bdf3a7b
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 48 deletions.
4 changes: 2 additions & 2 deletions e2e_test/ddl/show.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ create index idx1 on t3 (v1,v2);
query TTTTT
show indexes from t3;
----
idx1 t3 v1 ASC, v2 ASC v3 v1, v2
idx1 t3 v1 ASC, v2 ASC v3 v1

query TT
describe t3;
Expand All @@ -37,7 +37,7 @@ v1 integer
v2 integer
v3 integer
primary key _row_id
idx1 index(v1 ASC, v2 ASC) include(v3) distributed by(v1, v2)
idx1 index(v1 ASC, v2 ASC) include(v3) distributed by(v1)

query TT
show create index idx1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: t.a = idx2.c AND t.b = idx2.d, output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.a, t.b) }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.a) }
└─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
- sql: |
create table t (a int, b int);
Expand All @@ -40,7 +40,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: t.a = idx.c AND t.b = idx.d, output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.a, t.b) }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.a) }
└─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
- name: test index join prefix lookup
sql: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,30 +351,22 @@
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [max(ak1k2.v)] }
└─BatchSortAgg { group_key: [ak1k2.k1], aggs: [max(ak1k2.v)] }
└─BatchExchange { order: [ak1k2.k1 ASC], dist: HashShard(ak1k2.k1) }
└─BatchScan { table: ak1k2, columns: [ak1k2.k1, ak1k2.v], distribution: SomeShard }
└─BatchScan { table: ak1k2, columns: [ak1k2.k1, ak1k2.v], distribution: UpstreamHashShard(ak1k2.k1) }
stream_plan: |-
StreamMaterialize { columns: [max_v, ak1k2.k1(hidden)], stream_key: [ak1k2.k1], pk_columns: [ak1k2.k1], pk_conflict: NoCheck }
└─StreamProject { exprs: [max(ak1k2.v), ak1k2.k1] }
└─StreamHashAgg { group_key: [ak1k2.k1], aggs: [max(ak1k2.v), count] }
└─StreamExchange { dist: HashShard(ak1k2.k1) }
└─StreamTableScan { table: ak1k2, columns: [ak1k2.k1, ak1k2.v, ak1k2.k2, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1, ak1k2.k2) }
└─StreamTableScan { table: ak1k2, columns: [ak1k2.k1, ak1k2.v, ak1k2.k2, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [max_v, ak1k2.k1(hidden)], stream_key: [ak1k2.k1], pk_columns: [ak1k2.k1], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [max(ak1k2.v), ak1k2.k1] }
└── StreamHashAgg { group_key: [ak1k2.k1], aggs: [max(ak1k2.v), count] }
├── result table: 1
├── state tables: [ 0 ]
├── distinct tables: []
└── StreamExchange Hash([0]) from 1
Fragment 1
Chain { table: ak1k2, columns: [ak1k2.k1, ak1k2.v, ak1k2.k2, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1, ak1k2.k2) }
├── state table: 2
├── Upstream
└── BatchPlanNode
└── StreamHashAgg { group_key: [ak1k2.k1], aggs: [max(ak1k2.v), count] } { result table: 1, state tables: [ 0 ], distinct tables: [] }
└── Chain { table: ak1k2, columns: [ak1k2.k1, ak1k2.v, ak1k2.k2, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1) }
├── state table: 2
├── Upstream
└── BatchPlanNode
Table 0
├── columns: [ ak1k2_k1, ak1k2_v, ak1k2_a__row_id ]
Expand Down Expand Up @@ -423,7 +415,7 @@
└─StreamProject { exprs: [max(ak1k2.v), ak1k2.k2] }
└─StreamHashAgg { group_key: [ak1k2.k2], aggs: [max(ak1k2.v), count] }
└─StreamExchange { dist: HashShard(ak1k2.k2) }
└─StreamTableScan { table: ak1k2, columns: [ak1k2.k2, ak1k2.v, ak1k2.k1, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1, ak1k2.k2) }
└─StreamTableScan { table: ak1k2, columns: [ak1k2.k2, ak1k2.v, ak1k2.k1, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [max_v, ak1k2.k2(hidden)], stream_key: [ak1k2.k2], pk_columns: [ak1k2.k2], pk_conflict: NoCheck }
Expand All @@ -436,7 +428,7 @@
└── StreamExchange Hash([0]) from 1
Fragment 1
Chain { table: ak1k2, columns: [ak1k2.k2, ak1k2.v, ak1k2.k1, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1, ak1k2.k2) }
Chain { table: ak1k2, columns: [ak1k2.k2, ak1k2.v, ak1k2.k1, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1) }
├── state table: 2
├── Upstream
└── BatchPlanNode
Expand Down Expand Up @@ -481,19 +473,19 @@
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [sum(ak1k2.v)] }
└─BatchSortAgg { group_key: [ak1k2.k1, ak1k2.k2], aggs: [sum(ak1k2.v)] }
└─BatchScan { table: ak1k2, columns: [ak1k2.k1, ak1k2.k2, ak1k2.v], distribution: UpstreamHashShard(ak1k2.k1, ak1k2.k2) }
└─BatchScan { table: ak1k2, columns: [ak1k2.k1, ak1k2.k2, ak1k2.v], distribution: UpstreamHashShard(ak1k2.k1) }
stream_plan: |-
StreamMaterialize { columns: [sum_v, ak1k2.k1(hidden), ak1k2.k2(hidden)], stream_key: [ak1k2.k1, ak1k2.k2], pk_columns: [ak1k2.k1, ak1k2.k2], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(ak1k2.v), ak1k2.k1, ak1k2.k2] }
└─StreamHashAgg { group_key: [ak1k2.k1, ak1k2.k2], aggs: [sum(ak1k2.v), count] }
└─StreamTableScan { table: ak1k2, columns: [ak1k2.k1, ak1k2.k2, ak1k2.v, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1, ak1k2.k2) }
└─StreamTableScan { table: ak1k2, columns: [ak1k2.k1, ak1k2.k2, ak1k2.v, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [sum_v, ak1k2.k1(hidden), ak1k2.k2(hidden)], stream_key: [ak1k2.k1, ak1k2.k2], pk_columns: [ak1k2.k1, ak1k2.k2], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamProject { exprs: [sum(ak1k2.v), ak1k2.k1, ak1k2.k2] }
└── StreamHashAgg { group_key: [ak1k2.k1, ak1k2.k2], aggs: [sum(ak1k2.v), count] } { result table: 0, state tables: [], distinct tables: [] }
└── Chain { table: ak1k2, columns: [ak1k2.k1, ak1k2.k2, ak1k2.v, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1, ak1k2.k2) }
└── Chain { table: ak1k2, columns: [ak1k2.k1, ak1k2.k2, ak1k2.v, ak1k2.a._row_id], pk: [ak1k2.a._row_id], dist: UpstreamHashShard(ak1k2.k1) }
├── state table: 1
├── Upstream
└── BatchPlanNode
Expand All @@ -502,7 +494,7 @@
├── columns: [ ak1k2_k1, ak1k2_k2, sum(ak1k2_v), count ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 2, 3 ]
├── distribution key: [ 0, 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 2
Table 1
Expand All @@ -517,7 +509,7 @@
├── columns: [ sum_v, ak1k2.k1, ak1k2.k2 ]
├── primary key: [ $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 1, 2 ]
├── distribution key: [ 1 ]
└── read pk prefix len hint: 2
- id: aggk1k2_from_Ak1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,51 @@
select * from t1 where a = 1
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1)], distribution: UpstreamHashShard(idx1.a, idx1.b) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1)], distribution: UpstreamHashShard(idx1.a) }
- sql: |
/* Use index if it provides required order */
create table t1 (a int, b int, c int);
create index idx1 on t1(a, b) include(c);
select * from t1 order by a, b
batch_plan: |-
BatchExchange { order: [idx1.a ASC, idx1.b ASC], dist: Single }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], distribution: UpstreamHashShard(idx1.a, idx1.b) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], distribution: UpstreamHashShard(idx1.a) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a, b) include(c);
select * from t1 where a = 1 or a = 2
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2)], distribution: UpstreamHashShard(idx1.a, idx1.b) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2)], distribution: UpstreamHashShard(idx1.a) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a, b) include(c);
select * from t1 where a in (1,2,3)
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2), idx1.a = Int32(3)], distribution: UpstreamHashShard(idx1.a, idx1.b) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2), idx1.a = Int32(3)], distribution: UpstreamHashShard(idx1.a) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a, b) include(c);
select * from t1 where a between 1 and 8
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2), idx1.a = Int32(3), idx1.a = Int32(4), idx1.a = Int32(5), idx1.a = Int32(6), idx1.a = Int32(7), idx1.a = Int32(8)], distribution: UpstreamHashShard(idx1.a, idx1.b) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2), idx1.a = Int32(3), idx1.a = Int32(4), idx1.a = Int32(5), idx1.a = Int32(6), idx1.a = Int32(7), idx1.a = Int32(8)], distribution: UpstreamHashShard(idx1.a) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a, b) include(c);
select * from t1 where a = 1 and b = 1
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1) AND idx1.b = Decimal(Normalized(1))], distribution: UpstreamHashShard(idx1.a, idx1.b) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1) AND idx1.b = Decimal(Normalized(1))], distribution: UpstreamHashShard(idx1.a) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a, b) include(c);
create index idx2 on t1(b, a) include(c);
select * from t1 where b = 1
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx2, columns: [idx2.a, idx2.b, idx2.c], scan_ranges: [idx2.b = Decimal(Normalized(1))], distribution: UpstreamHashShard(idx2.b, idx2.a) }
└─BatchScan { table: idx2, columns: [idx2.a, idx2.b, idx2.c], scan_ranges: [idx2.b = Decimal(Normalized(1))], distribution: UpstreamHashShard(idx2.b) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a, b) include(a, b);
Expand Down Expand Up @@ -78,7 +78,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: In(idx1.b, 2:Decimal, 3:Decimal) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2)], distribution: UpstreamHashShard(idx1.a, idx1.b) }
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2)], distribution: UpstreamHashShard(idx1.a) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a, b) include(c);
Expand Down Expand Up @@ -117,7 +117,7 @@
select * from t1 where a = 1 and b = 2;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx2, columns: [idx2.a, idx2.b, idx2.c], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: UpstreamHashShard(idx2.b, idx2.a) }
└─BatchScan { table: idx2, columns: [idx2.a, idx2.b, idx2.c], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: UpstreamHashShard(idx2.b) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a) include (b, c);
Expand All @@ -126,7 +126,7 @@
select * from t1 where b = 2;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchScan { table: idx2, columns: [idx2.a, idx2.b, idx2.c], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: UpstreamHashShard(idx2.b, idx2.a) }
└─BatchScan { table: idx2, columns: [idx2.a, idx2.b, idx2.c], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: UpstreamHashShard(idx2.b) }
- sql: |
create table t1 (a int, b numeric, c bigint);
create index idx1 on t1(a) include(a);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck }
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] }
├─StreamExchange { dist: HashShard(stream.a1, stream.b1) }
├─StreamExchange { dist: HashShard(stream.a1) }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2, idx2.b2) }
└─StreamTableScan { table: idx2, columns: [idx2.a2, idx2.b2, idx2.id2], pk: [idx2.id2], dist: UpstreamHashShard(idx2.a2, idx2.b2) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) }
└─StreamTableScan { table: idx2, columns: [idx2.a2, idx2.b2, idx2.id2], pk: [idx2.id2], dist: UpstreamHashShard(idx2.a2) }
- name: temporal join with an index (index column size = 1)
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
Expand Down Expand Up @@ -213,10 +213,10 @@
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck }
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] }
├─StreamExchange { dist: HashShard(stream.a1, stream.b1) }
├─StreamExchange { dist: HashShard(stream.a1) }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx.a2, idx.b2) }
└─StreamTableScan { table: idx, columns: [idx.id2, idx.a2, idx.b2], pk: [idx.id2], dist: UpstreamHashShard(idx.a2, idx.b2) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx.a2) }
└─StreamTableScan { table: idx, columns: [idx.id2, idx.a2, idx.b2], pk: [idx.id2], dist: UpstreamHashShard(idx.a2) }
- name: index selection for temporal join (with two indexes) and should choose the index with a longer prefix..
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
Expand All @@ -227,10 +227,10 @@
stream_plan: |-
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck }
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] }
├─StreamExchange { dist: HashShard(stream.a1, stream.b1) }
├─StreamExchange { dist: HashShard(stream.a1) }
│ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2, idx2.b2) }
└─StreamTableScan { table: idx2, columns: [idx2.id2, idx2.a2, idx2.b2], pk: [idx2.id2], dist: UpstreamHashShard(idx2.a2, idx2.b2) }
└─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) }
└─StreamTableScan { table: idx2, columns: [idx2.id2, idx2.a2, idx2.b2], pk: [idx2.id2], dist: UpstreamHashShard(idx2.a2) }
- name: index selection for temporal join (with three indexes) and should choose primary table.
sql: |
create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ pub(crate) fn gen_create_index_plan(
index_table_name.clone(),
&index_columns_ordered_expr,
&include_columns_expr,
// We use the whole index columns as distributed key by default if users
// We use the first index column as distributed key by default if users
// haven't specify the distributed by columns.
if distributed_columns_expr.is_empty() {
index_columns_ordered_expr.len()
1
} else {
distributed_columns_expr.len()
},
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod tests {
"v3".into() => "integer".into(),
"v4".into() => "integer".into(),
"primary key".into() => "v3".into(),
"idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1, v2)".into(),
"idx1".into() => "index(v1 DESC, v2 ASC, v3 ASC) include(v4) distributed by(v1)".into(),
};

assert_eq!(columns, expected_columns);
Expand Down

0 comments on commit bdf3a7b

Please sign in to comment.