Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): implement WITH ORDINALITY clause #12273

Merged
merged 10 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions e2e_test/batch/basic/unnest.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,38 @@ select distinct unnest(array[1,1,2,3,1]) as x;
1
2
3

query I
select * from unnest(array[0,1,2]) with ordinality;
----
0 1
1 2
2 3

query I
select * from unnest(array[0,1,2]) with ordinality, unnest(array[3,4]) with ordinality as unnest_2;
----
0 1 3 1
0 1 4 2
1 2 3 1
1 2 4 2
2 3 3 1
2 3 4 2

statement ok
create table t(arr varchar[]);

statement ok
insert into t values (Array['a','b', 'c']), (Array['d','e']);

query I
select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num);
----
{a,b,c} c 3
{a,b,c} b 2
{a,b,c} a 1
{d,e} e 2
{d,e} d 1
xxchan marked this conversation as resolved.
Show resolved Hide resolved

statement ok
drop table t;
24 changes: 24 additions & 0 deletions e2e_test/streaming/project_set.slt
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,27 @@ with cte as (SELECT 1 as v1, unnest(array[1,2,3,4,5]) AS v2) select v1 from cte;
1
1
1

statement ok
create table t(arr varchar[]);

statement ok
create materialized view mv as select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num);

statement ok
insert into t values (Array['a','b', 'c']), (Array['d','e']);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if update Array['a','b', 'c'] to Array['a', 'c']?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Actually I'm not sure what's your intention 🤔


query I rowsort
select * from mv;
----
{a,b,c} a 1
{a,b,c} b 2
{a,b,c} c 3
{d,e} d 1
{d,e} e 2

statement ok
drop materialized view mv;

statement ok
drop table t;
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# constant FROM
- sql: |
select * from unnest(array[1,2,3]) WITH ORDINALITY;
expected_outputs:
- batch_plan
- stream_plan
# lateral join
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY;
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo;
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a);
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord);
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar);
expected_outputs:
- binder_error
# multiple with ordinality
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2);
expected_outputs:
- batch_plan
- stream_plan
162 changes: 162 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
select * from unnest(array[1,2,3]) WITH ORDINALITY;
batch_plan: |-
BatchProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1] }
└─BatchProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32))] }
└─BatchValues { rows: [[]] }
stream_plan: |-
StreamMaterialize { columns: [unnest, ordinality, _row_id(hidden), projected_row_id(hidden)], stream_key: [_row_id, projected_row_id], pk_columns: [_row_id, projected_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1, _row_id, projected_row_id] }
└─StreamProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32)), $0] }
└─StreamValues { rows: [[0:Int64]] }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, unnest, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, foo, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a);
xxchan marked this conversation as resolved.
Show resolved Hide resolved
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, a, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord);
xxchan marked this conversation as resolved.
Show resolved Hide resolved
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, a, ord, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar);
binder_error: 'Bind error: table "foo" has 2 columns available but 3 column aliases specified'
- sql: |
create table t(x int , arr int[]);
select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2);
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
├─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all }
│ ├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard }
│ └─BatchProjectSet { select_list: [$0, Unnest($0)] }
│ └─BatchHashAgg { group_key: [t.arr], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.arr] }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, unnest, ordinality, arr_2, ordinality_2, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden), t.arr#1(hidden), projected_row_id#1(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_columns: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, t.arr, projected_row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), $expr1, projected_row_id, t.arr, Unnest($0), t._row_id, t.arr, projected_row_id] }
├─StreamShare { id: 8 }
│ └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
│ └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
│ ├─StreamExchange { dist: HashShard(t.arr) }
│ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamProjectSet { select_list: [$0, Unnest($0)] }
│ └─StreamProject { exprs: [t.arr] }
│ └─StreamHashAgg { group_key: [t.arr], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamShare { id: 8 }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
4 changes: 2 additions & 2 deletions src/frontend/src/binder/relation/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Binder {

let is_lateral = match &right {
Relation::Subquery(subquery) if subquery.lateral => true,
Relation::TableFunction(_) => true,
Relation::TableFunction { .. } => true,
_ => false,
};

Expand Down Expand Up @@ -110,7 +110,7 @@ impl Binder {

let is_lateral = match &right {
Relation::Subquery(subquery) if subquery.lateral => true,
Relation::TableFunction(_) => true,
Relation::TableFunction { .. } => true,
_ => false,
};

Expand Down
24 changes: 19 additions & 5 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ pub enum Relation {
Join(Box<BoundJoin>),
Apply(Box<BoundJoin>),
WindowTableFunction(Box<BoundWindowTableFunction>),
TableFunction(ExprImpl),
/// Table function or scalar function.
TableFunction {
expr: ExprImpl,
with_ordinality: bool,
},
Watermark(Box<BoundWatermark>),
Share(Box<BoundShare>),
}
Expand All @@ -69,7 +73,9 @@ impl RewriteExprsRecursive for Relation {
Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
Relation::TableFunction(inner) => *inner = rewriter.rewrite_expr(inner.take()),
Relation::TableFunction { expr: inner, .. } => {
*inner = rewriter.rewrite_expr(inner.take())
}
_ => {}
}
}
Expand Down Expand Up @@ -113,7 +119,10 @@ impl Relation {
);
correlated_indices
}
Relation::TableFunction(table_function) => table_function
Relation::TableFunction {
expr: table_function,
with_ordinality: _,
} => table_function
.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
_ => vec![],
}
Expand Down Expand Up @@ -437,9 +446,14 @@ impl Binder {
alias,
for_system_time_as_of_proctime,
} => self.bind_relation_by_name(name, alias, for_system_time_as_of_proctime),
TableFactor::TableFunction { name, alias, args } => {
TableFactor::TableFunction {
name,
alias,
args,
with_ordinality,
} => {
self.try_mark_lateral_as_visible();
let result = self.bind_table_function(name, alias, args);
let result = self.bind_table_function(name, alias, args, with_ordinality);
self.try_mark_lateral_as_invisible();
result
}
Expand Down
Loading
Loading