Skip to content

Commit

Permalink
impl with ord
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 13, 2023
1 parent 1ca8237 commit dc5ba42
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 41 deletions.
35 changes: 35 additions & 0 deletions e2e_test/batch/basic/unnest.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,38 @@ select distinct unnest(array[1,1,2,3,1]) as x;
1
2
3

query I
select * from unnest(array[0,1,2]) with ordinality;
----
0 1
1 2
2 3

query I
select * from unnest(array[0,1,2]) with ordinality, unnest(array[3,4]) with ordinality as unnest_2;
----
0 1 3 1
0 1 4 2
1 2 3 1
1 2 4 2
2 3 3 1
2 3 4 2

statement ok
create table t(arr varchar[]);

statement ok
insert into t values (Array['a','b', 'c']), (Array['d','e']);

query I
select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num);
----
{a,b,c} c 3
{a,b,c} b 2
{a,b,c} a 1
{d,e} e 2
{d,e} d 1

statement ok
drop table t;
24 changes: 24 additions & 0 deletions e2e_test/streaming/project_set.slt
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,27 @@ with cte as (SELECT 1 as v1, unnest(array[1,2,3,4,5]) AS v2) select v1 from cte;
1
1
1

statement ok
create table t(arr varchar[]);

statement ok
create materialized view mv as select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num);

statement ok
insert into t values (Array['a','b', 'c']), (Array['d','e']);

query I rowsort
select * from mv;
----
{a,b,c} c 3
{a,b,c} b 2
{a,b,c} a 1
{d,e} e 2
{d,e} d 1

statement ok
drop materialized view mv;

statement ok
drop table t;
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# constant FROM
- sql: |
select * from unnest(array[1,2,3]) WITH ORDINALITY;
expected_outputs:
- batch_plan
- stream_plan
# lateral join
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY;
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo;
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a);
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord);
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar);
expected_outputs:
- binder_error
# multiple with ordinality
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2);
expected_outputs:
- batch_plan
- stream_plan
162 changes: 162 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
select * from unnest(array[1,2,3]) WITH ORDINALITY;
batch_plan: |-
BatchProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1] }
└─BatchProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32))] }
└─BatchValues { rows: [[]] }
stream_plan: |-
StreamMaterialize { columns: [unnest, ordinality, _row_id(hidden), projected_row_id(hidden)], stream_key: [_row_id, projected_row_id], pk_columns: [_row_id, projected_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1, _row_id, projected_row_id] }
└─StreamProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32)), $0] }
└─StreamValues { rows: [[0:Int64]] }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, unnest, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, foo, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, a, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, a, ord, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar);
binder_error: 'Bind error: table "foo" has 2 columns available but 3 column aliases specified'
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
│ ├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
│ └─BatchProjectSet { select_list: [$0, Unnest($0)] }
│ └─BatchHashAgg { group_key: [t.arr], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.arr] }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, unnest, ordinality, arr_2, ordinality_2, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden), t.arr#1(hidden), projected_row_id#1(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_columns: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, t.arr, projected_row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), $expr1, projected_row_id, t.arr, Unnest($0), t._row_id, t.arr, projected_row_id] }
├─StreamShare { id: 8 }
│ └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
│ └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
│ ├─StreamExchange { dist: HashShard(t.arr) }
│ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamProjectSet { select_list: [$0, Unnest($0)] }
│ └─StreamProject { exprs: [t.arr] }
│ └─StreamHashAgg { group_key: [t.arr], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamShare { id: 8 }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
4 changes: 2 additions & 2 deletions src/frontend/src/binder/relation/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Binder {

let is_lateral = match &right {
Relation::Subquery(subquery) if subquery.lateral => true,
Relation::TableFunction(_) => true,
Relation::TableFunction { .. } => true,
_ => false,
};

Expand Down Expand Up @@ -110,7 +110,7 @@ impl Binder {

let is_lateral = match &right {
Relation::Subquery(subquery) if subquery.lateral => true,
Relation::TableFunction(_) => true,
Relation::TableFunction { .. } => true,
_ => false,
};

Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ impl Relation {
);
correlated_indices
}
Relation::TableFunction(table_function) => table_function
Relation::TableFunction {
expr: table_function,
with_ordinality: _,
} => table_function
.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
_ => vec![],
}
Expand Down
Loading

0 comments on commit dc5ba42

Please sign in to comment.