diff --git a/e2e_test/batch/basic/unnest.slt.part b/e2e_test/batch/basic/unnest.slt.part index efcf9981e65c9..0807637c88e2d 100644 --- a/e2e_test/batch/basic/unnest.slt.part +++ b/e2e_test/batch/basic/unnest.slt.part @@ -83,3 +83,38 @@ select distinct unnest(array[1,1,2,3,1]) as x; 1 2 3 + +query I +select * from unnest(array[0,1,2]) with ordinality; +---- +0 1 +1 2 +2 3 + +query I +select * from unnest(array[0,1,2]) with ordinality, unnest(array[3,4]) with ordinality as unnest_2; +---- +0 1 3 1 +0 1 4 2 +1 2 3 1 +1 2 4 2 +2 3 3 1 +2 3 4 2 + +statement ok +create table t(arr varchar[]); + +statement ok +insert into t values (Array['a','b', 'c']), (Array['d','e']); + +query I rowsort +select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num); +---- +{a,b,c} a 1 +{a,b,c} b 2 +{a,b,c} c 3 +{d,e} d 1 +{d,e} e 2 + +statement ok +drop table t; diff --git a/e2e_test/streaming/project_set.slt b/e2e_test/streaming/project_set.slt index 959c75ebebefc..f360663067e3e 100644 --- a/e2e_test/streaming/project_set.slt +++ b/e2e_test/streaming/project_set.slt @@ -107,3 +107,38 @@ with cte as (SELECT 1 as v1, unnest(array[1,2,3,4,5]) AS v2) select v1 from cte; 1 1 1 + +statement ok +create table t(arr varchar[]); + +statement ok +create materialized view mv as select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num); + +statement ok +insert into t values (Array['a','b', 'c']), (Array['d','e']); + +query I rowsort +select * from mv; +---- +{a,b,c} a 1 +{a,b,c} b 2 +{a,b,c} c 3 +{d,e} d 1 +{d,e} e 2 + +statement ok +update t set arr = Array['a', 'c'] where arr = Array['a','b', 'c']; + +query I rowsort +select * from mv; +---- +{a,c} a 1 +{a,c} c 2 +{d,e} d 1 +{d,e} e 2 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml new file mode 100644 index 0000000000000..08dfafa2c7310 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml @@ -0,0 +1,64 @@ +# constant FROM +- sql: | + select * from unnest(array[1,2,3]) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_plan +# lateral join +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo; + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a); + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + expected_outputs: + - batch_plan + - stream_plan +- name: use alias columns explicitlity + sql: | + create table t(x int , arr int[]); + select x, arr, a, ord from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar); + expected_outputs: + - binder_error +# multiple with ordinality +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2); + expected_outputs: + - batch_plan + - stream_plan +# constant FROM (scalar function) +- sql: | + select * from abs(1) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_plan +# lateral join (scalar function) +# FIXME: currently this panics due to CorrelatedInputRef in Values https://github.com/risingwavelabs/risingwave/issues/12231 +- sql: | + create table t(x int , arr int[]); + select * from t, abs(x) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_error diff --git a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml new file mode 100644 index 0000000000000..208eec58ecdba --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml @@ -0,0 +1,210 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + select * from unnest(array[1,2,3]) WITH ORDINALITY; + batch_plan: |- + BatchProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1] } + └─BatchProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32))] } + └─BatchValues { rows: [[]] } + stream_plan: |- + StreamMaterialize { columns: [unnest, ordinality, _row_id(hidden), projected_row_id(hidden)], stream_key: [_row_id, projected_row_id], pk_columns: [_row_id, projected_row_id], pk_conflict: NoCheck } + └─StreamProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1, _row_id, projected_row_id] } + └─StreamProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32)), $0] } + └─StreamValues { rows: [[0:Int64]] } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, unnest, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, foo, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, a, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, a, ord, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- name: use alias columns explicitlity + sql: | + create table t(x int , arr int[]); + select x, arr, a, ord from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, a, ord, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar); + binder_error: 'Bind error: table "foo" has 2 columns available but 3 column aliases specified' +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + │ ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + │ └─BatchProjectSet { select_list: [$0, Unnest($0)] } + │ └─BatchHashAgg { group_key: [t.arr], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.arr] } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, unnest, ordinality, arr_2, ordinality_2, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden), t.arr#1(hidden), projected_row_id#1(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_columns: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, t.arr, projected_row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), $expr1, projected_row_id, t.arr, Unnest($0), t._row_id, t.arr, projected_row_id] } + ├─StreamShare { id: 8 } + │ └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + │ └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + │ ├─StreamExchange { dist: HashShard(t.arr) } + │ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + │ └─StreamProjectSet { select_list: [$0, Unnest($0)] } + │ └─StreamProject { exprs: [t.arr] } + │ └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + │ └─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamShare { id: 8 } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + select * from abs(1) WITH ORDINALITY; + batch_plan: 'BatchValues { rows: [[1:Int32, 1:Int64]] }' + stream_plan: |- + StreamMaterialize { columns: [abs, ordinality, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } + └─StreamValues { rows: [[Abs(1:Int32), 1:Int64, 0:Int64]] } +- sql: | + create table t(x int , arr int[]); + select * from t, abs(x) WITH ORDINALITY; + batch_plan: |- + BatchNestedLoopJoin { type: Inner, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchHashJoin { type: Inner, predicate: t.x IS NOT DISTINCT FROM t.x, output: [t.x, t.arr] } + │ ├─BatchExchange { order: [], dist: HashShard(t.x) } + │ │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + │ └─BatchHashAgg { group_key: [t.x], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(t.x) } + │ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } + └─BatchValues { rows: [[Abs(CorrelatedInputRef { index: 0, correlated_id: 1 }), 1:Int64]] } + stream_error: |- + Not supported: streaming nested-loop join + HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. + See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md diff --git a/src/frontend/src/binder/relation/join.rs b/src/frontend/src/binder/relation/join.rs index fed759d6eb83b..eb4ce96f9ab3f 100644 --- a/src/frontend/src/binder/relation/join.rs +++ b/src/frontend/src/binder/relation/join.rs @@ -59,7 +59,7 @@ impl Binder { let is_lateral = match &right { Relation::Subquery(subquery) if subquery.lateral => true, - Relation::TableFunction(_) => true, + Relation::TableFunction { .. } => true, _ => false, }; @@ -110,7 +110,7 @@ impl Binder { let is_lateral = match &right { Relation::Subquery(subquery) if subquery.lateral => true, - Relation::TableFunction(_) => true, + Relation::TableFunction { .. } => true, _ => false, }; diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 2df943362e3f9..4ee4337c2f048 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -55,7 +55,11 @@ pub enum Relation { Join(Box), Apply(Box), WindowTableFunction(Box), - TableFunction(ExprImpl), + /// Table function or scalar function. + TableFunction { + expr: ExprImpl, + with_ordinality: bool, + }, Watermark(Box), Share(Box), } @@ -69,7 +73,9 @@ impl RewriteExprsRecursive for Relation { Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter), Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter), Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter), - Relation::TableFunction(inner) => *inner = rewriter.rewrite_expr(inner.take()), + Relation::TableFunction { expr: inner, .. } => { + *inner = rewriter.rewrite_expr(inner.take()) + } _ => {} } } @@ -113,7 +119,10 @@ impl Relation { ); correlated_indices } - Relation::TableFunction(table_function) => table_function + Relation::TableFunction { + expr: table_function, + with_ordinality: _, + } => table_function .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id), _ => vec![], } @@ -437,9 +446,14 @@ impl Binder { alias, for_system_time_as_of_proctime, } => self.bind_relation_by_name(name, alias, for_system_time_as_of_proctime), - TableFactor::TableFunction { name, alias, args } => { + TableFactor::TableFunction { + name, + alias, + args, + with_ordinality, + } => { self.try_mark_lateral_as_visible(); - let result = self.bind_table_function(name, alias, args); + let result = self.bind_table_function(name, alias, args, with_ordinality); self.try_mark_lateral_as_invisible(); result } diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index 7441473316e33..988ea0561a860 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use risingwave_common::catalog::{ Field, Schema, PG_CATALOG_SCHEMA_NAME, RW_INTERNAL_TABLE_FUNCTION_NAME, }; +use risingwave_common::error::ErrorCode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Function, FunctionArg, ObjectName, TableAlias}; @@ -34,16 +35,29 @@ impl Binder { /// /// Besides [`crate::expr::TableFunction`] expr, it can also be other things like window table /// functions, or scalar functions. + /// + /// `with_ordinality` is only supported for the `TableFunction` case now. pub(super) fn bind_table_function( &mut self, name: ObjectName, alias: Option, args: Vec, + with_ordinality: bool, ) -> Result { let func_name = &name.0[0].real_value(); // internal/system table functions { if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) { + if with_ordinality { + return Err(ErrorCode::NotImplemented( + format!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ), + None.into(), + ) + .into()); + } return self.bind_internal_table(args, alias); } if func_name.eq_ignore_ascii_case(PG_GET_KEYWORDS_FUNC_NAME) @@ -51,6 +65,16 @@ impl Binder { format!("{}.{}", PG_CATALOG_SCHEMA_NAME, PG_GET_KEYWORDS_FUNC_NAME).as_str(), ) { + if with_ordinality { + return Err(ErrorCode::NotImplemented( + format!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ), + None.into(), + ) + .into()); + } return self.bind_relation_by_name_inner( Some(PG_CATALOG_SCHEMA_NAME), PG_KEYWORDS_TABLE_NAME, @@ -61,12 +85,25 @@ impl Binder { } // window table functions (tumble/hop) if let Ok(kind) = WindowTableFunctionKind::from_str(func_name) { + if with_ordinality { + return Err(ErrorCode::InvalidInputSyntax(format!( + "WITH ORDINALITY for window table function {}", + func_name + )) + .into()); + } return Ok(Relation::WindowTableFunction(Box::new( self.bind_window_table_function(alias, kind, args)?, ))); } // watermark if is_watermark_func(func_name) { + if with_ordinality { + return Err(ErrorCode::InvalidInputSyntax( + "WITH ORDINALITY for watermark".to_string(), + ) + .into()); + } return Ok(Relation::Watermark(Box::new( self.bind_watermark(alias, args)?, ))); @@ -88,13 +125,14 @@ impl Binder { self.pop_context()?; let func = func?; - let columns = if let DataType::Struct(s) = func.return_type() { - // If the table function returns a struct, it's fields can be accessed just - // like a table's columns. + // bool indicates if the field is hidden + let mut columns = if let DataType::Struct(s) = func.return_type() { + // If the table function returns a struct, it will be flattened into multiple columns. let schema = Schema::from(&s); schema.fields.into_iter().map(|f| (false, f)).collect_vec() } else { - // If there is an table alias, we should use the alias as the table function's + // If there is an table alias (and it doesn't return a struct), + // we should use the alias as the table function's // column name. If column aliases are also provided, they // are handled in bind_table_to_context. // @@ -112,9 +150,15 @@ impl Binder { }; vec![(false, Field::with_name(func.return_type(), col_name))] }; + if with_ordinality { + columns.push((false, Field::with_name(DataType::Int64, "ordinality"))); + } self.bind_table_to_context(columns, func_name.clone(), alias)?; - Ok(Relation::TableFunction(func)) + Ok(Relation::TableFunction { + expr: func, + with_ordinality, + }) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_table_function.rs b/src/frontend/src/optimizer/plan_node/logical_table_function.rs index ee60a624be3ba..c42a51aeb9024 100644 --- a/src/frontend/src/optimizer/plan_node/logical_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/logical_table_function.rs @@ -14,7 +14,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::Result; use risingwave_common::types::DataType; use super::utils::{childless_record, Distill}; @@ -25,23 +25,30 @@ use super::{ use crate::expr::{Expr, ExprRewriter, TableFunction}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ - BatchTableFunction, ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, - ToStreamContext, + ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{ColIndexMapping, Condition}; -/// `LogicalGenerateSeries` implements Hop Table Function. +/// `LogicalTableFunction` is a scalar/table function used as a relation (in the `FROM` clause). +/// +/// If the function returns a struct, it will be flattened into multiple columns. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalTableFunction { pub base: PlanBase, pub table_function: TableFunction, + pub with_ordinality: bool, } impl LogicalTableFunction { /// Create a [`LogicalTableFunction`] node. Used internally by optimizer. - pub fn new(table_function: TableFunction, ctx: OptimizerContextRef) -> Self { - let schema = if let DataType::Struct(s) = table_function.return_type() { + pub fn new( + table_function: TableFunction, + with_ordinality: bool, + ctx: OptimizerContextRef, + ) -> Self { + let mut schema = if let DataType::Struct(s) = table_function.return_type() { + // If the function returns a struct, it will be flattened into multiple columns. Schema::from(&s) } else { Schema { @@ -51,11 +58,17 @@ impl LogicalTableFunction { )], } }; + if with_ordinality { + schema + .fields + .push(Field::with_name(DataType::Int64, "ordinality")); + } let functional_dependency = FunctionalDependencySet::new(schema.len()); let base = PlanBase::new_logical(ctx, schema, vec![], functional_dependency); Self { base, table_function, + with_ordinality, } } @@ -110,26 +123,19 @@ impl PredicatePushdown for LogicalTableFunction { impl ToBatch for LogicalTableFunction { fn to_batch(&self) -> Result { - Ok(BatchTableFunction::new(self.clone()).into()) + unreachable!("TableFunction should be converted to ProjectSet") } } impl ToStream for LogicalTableFunction { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Err( - ErrorCode::NotImplemented("LogicalTableFunction::to_stream".to_string(), None.into()) - .into(), - ) + unreachable!("TableFunction should be converted to ProjectSet") } fn logical_rewrite_for_stream( &self, _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - Err(ErrorCode::NotImplemented( - "LogicalTableFunction::logical_rewrite_for_stream".to_string(), - None.into(), - ) - .into()) + unreachable!("TableFunction should be converted to ProjectSet") } } diff --git a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs index 095e08664e1c4..5a6f1187fdd02 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs @@ -19,11 +19,11 @@ use risingwave_common::types::DataType; use super::{BoxedRule, Rule}; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ - LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, + LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary, }; use crate::optimizer::PlanRef; -/// Transform a table function into a project set +/// Transform a `TableFunction` (used in FROM clause) into a `ProjectSet` so that it can be unnested later if it contains `CorrelatedInputRef`. /// /// Before: /// @@ -54,11 +54,11 @@ impl Rule for TableFunctionToProjectSetRule { logical_table_function.base.ctx.clone(), ); let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]); - // We need a project to align schema type because `LogicalProjectSet` has a hidden column - // `projected_row_id` and table function could return multiple columns, while project set - // return only one column with struct type. + // We need a project to align schema type because + // 1. `LogicalProjectSet` has a hidden column `projected_row_id` (0-th col) + // 2. When the function returns a struct type, TableFunction will return flatten it into multiple columns, while ProjectSet still returns a single column. let table_function_col_idx = 1; - if let DataType::Struct(st) = table_function_return_type.clone() { + let logical_project = if let DataType::Struct(st) = table_function_return_type.clone() { let exprs = st .types() .enumerate() @@ -66,13 +66,11 @@ impl Rule for TableFunctionToProjectSetRule { let field_access = FunctionCall::new_unchecked( ExprType::Field, vec![ - ExprImpl::InputRef( - InputRef::new( - table_function_col_idx, - table_function_return_type.clone(), - ) - .into(), - ), + InputRef::new( + table_function_col_idx, + table_function_return_type.clone(), + ) + .into(), ExprImpl::literal_int(i as i32), ], data_type.clone(), @@ -80,13 +78,27 @@ impl Rule for TableFunctionToProjectSetRule { ExprImpl::FunctionCall(field_access.into()) }) .collect_vec(); - let logical_project = LogicalProject::new(logical_project_set, exprs); - Some(logical_project.into()) + LogicalProject::new(logical_project_set, exprs) } else { - let logical_project = LogicalProject::with_out_col_idx( + LogicalProject::with_out_col_idx( logical_project_set, std::iter::once(table_function_col_idx), - ); + ) + }; + + if logical_table_function.with_ordinality { + let projected_row_id = InputRef::new(0, DataType::Int64).into(); + let ordinality = FunctionCall::new( + ExprType::Add, + vec![projected_row_id, ExprImpl::literal_bigint(1)], + ) + .unwrap() // i64 + i64 is ok + .into(); + let mut exprs = logical_project.exprs().clone(); + exprs.push(ordinality); + let logical_project = LogicalProject::new(logical_project.input(), exprs); + Some(logical_project.into()) + } else { Some(logical_project.into()) } } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index db4bb0233f077..f4085f6ffa42e 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -46,7 +46,10 @@ impl Planner { Relation::Apply(join) => self.plan_apply(*join), Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf), Relation::Source(s) => self.plan_source(*s), - Relation::TableFunction(tf) => self.plan_table_function(tf), + Relation::TableFunction { + expr: tf, + with_ordinality, + } => self.plan_table_function(tf, with_ordinality), Relation::Watermark(tf) => self.plan_watermark(*tf), Relation::Share(share) => self.plan_share(*share), } @@ -150,16 +153,33 @@ impl Planner { } } - pub(super) fn plan_table_function(&mut self, table_function: ExprImpl) -> Result { + pub(super) fn plan_table_function( + &mut self, + table_function: ExprImpl, + with_ordinality: bool, + ) -> Result { // TODO: maybe we can unify LogicalTableFunction with LogicalValues match table_function { - ExprImpl::TableFunction(tf) => Ok(LogicalTableFunction::new(*tf, self.ctx()).into()), + ExprImpl::TableFunction(tf) => { + Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into()) + } expr => { - let schema = Schema { + let mut schema = Schema { // TODO: should be named fields: vec![Field::unnamed(expr.return_type())], }; - Ok(LogicalValues::create(vec![vec![expr]], schema, self.ctx())) + if with_ordinality { + schema + .fields + .push(Field::with_name(DataType::Int64, "ordinality")); + Ok(LogicalValues::create( + vec![vec![expr, ExprImpl::literal_bigint(1)]], + schema, + self.ctx(), + )) + } else { + Ok(LogicalValues::create(vec![vec![expr]], schema, self.ctx())) + } } } } diff --git a/src/sqlparser/src/ast/query.rs b/src/sqlparser/src/ast/query.rs index cc703e5b81a38..f018b853f3330 100644 --- a/src/sqlparser/src/ast/query.rs +++ b/src/sqlparser/src/ast/query.rs @@ -387,11 +387,14 @@ pub enum TableFactor { subquery: Box, alias: Option, }, - /// `[ AS ]` + /// `(args)[ AS ]` + /// + /// Note that scalar functions can also be used in this way. TableFunction { name: ObjectName, alias: Option, args: Vec, + with_ordinality: bool, }, /// Represents a parenthesized table factor. The SQL spec only allows a /// join expression (`(foo bar [ baz ... ])`) to be nested, @@ -433,8 +436,16 @@ impl fmt::Display for TableFactor { } Ok(()) } - TableFactor::TableFunction { name, alias, args } => { + TableFactor::TableFunction { + name, + alias, + args, + with_ordinality, + } => { write!(f, "{}({})", name, display_comma_separated(args))?; + if *with_ordinality { + write!(f, " WITH ORDINALITY")?; + } if let Some(alias) = alias { write!(f, " AS {}", alias)?; } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 283a0a409512c..5c2fedb0ea547 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -350,6 +350,7 @@ define_keywords!( OPTION, OR, ORDER, + ORDINALITY, OTHERS, OUT, OUTER, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 4302a4ea12ab1..7cb8a099436c9 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -4275,8 +4275,15 @@ impl Parser { if !order_by.is_empty() { return parser_err!("Table-valued functions do not support ORDER BY clauses"); } + let with_ordinality = self.parse_keywords(&[Keyword::WITH, Keyword::ORDINALITY]); + let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?; - Ok(TableFactor::TableFunction { name, alias, args }) + Ok(TableFactor::TableFunction { + name, + alias, + args, + with_ordinality, + }) } else { let for_system_time_as_of_proctime = self.parse_for_system_time_as_of_proctime()?; let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?; diff --git a/src/sqlparser/tests/testdata/select.yaml b/src/sqlparser/tests/testdata/select.yaml index bbbb5a72bbdab..6aed3d2a4dc4c 100644 --- a/src/sqlparser/tests/testdata/select.yaml +++ b/src/sqlparser/tests/testdata/select.yaml @@ -29,10 +29,10 @@ formatted_sql: SELECT (CAST(ROW(1, 2, 3) AS foo)).v1.* - input: SELECT * FROM generate_series('2'::INT,'10'::INT,'2'::INT) formatted_sql: SELECT * FROM generate_series(CAST('2' AS INT), CAST('10' AS INT), CAST('2' AS INT)) - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "generate_series", quote_style: None }]), alias: None, args: [Unnamed(Expr(Cast { expr: Value(SingleQuotedString("2")), data_type: Int })), Unnamed(Expr(Cast { expr: Value(SingleQuotedString("10")), data_type: Int })), Unnamed(Expr(Cast { expr: Value(SingleQuotedString("2")), data_type: Int }))] }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "generate_series", quote_style: None }]), alias: None, args: [Unnamed(Expr(Cast { expr: Value(SingleQuotedString("2")), data_type: Int })), Unnamed(Expr(Cast { expr: Value(SingleQuotedString("10")), data_type: Int })), Unnamed(Expr(Cast { expr: Value(SingleQuotedString("2")), data_type: Int }))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT * FROM unnest(Array[1,2,3]); formatted_sql: SELECT * FROM unnest(ARRAY[1, 2, 3]) - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: None, args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("3"))], named: true })))] }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: None, args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("3"))], named: true })))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT id, fname, lname FROM customer WHERE salary <> 'Not Provided' AND salary <> '' formatted_sql: SELECT id, fname, lname FROM customer WHERE salary <> 'Not Provided' AND salary <> '' - input: SELECT id FROM customer WHERE NOT salary = '' @@ -102,7 +102,7 @@ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), for_system_time_as_of_proctime: false }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), for_system_time_as_of_proctime: true }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select percentile_cont(0.3) within group (order by x desc) from unnest(array[1,2,4,5,10]) as x formatted_sql: SELECT percentile_cont(0.3) FROM unnest(ARRAY[1, 2, 4, 5, 10]) AS x - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "percentile_cont", quote_style: None }]), args: [Unnamed(Expr(Value(Number("0.3"))))], over: None, distinct: false, order_by: [], filter: None, within_group: Some(OrderByExpr { expr: Identifier(Ident { value: "x", quote_style: None }), asc: Some(false), nulls_first: None }) }))], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "x", quote_style: None }, columns: [] }), args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("4")), Value(Number("5")), Value(Number("10"))], named: true })))] }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "percentile_cont", quote_style: None }]), args: [Unnamed(Expr(Value(Number("0.3"))))], over: None, distinct: false, order_by: [], filter: None, within_group: Some(OrderByExpr { expr: Identifier(Ident { value: "x", quote_style: None }), asc: Some(false), nulls_first: None }) }))], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "x", quote_style: None }, columns: [] }), args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("4")), Value(Number("5")), Value(Number("10"))], named: true })))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select percentile_cont(0.3) within group (order by x, y desc) from t error_msg: 'sql parser error: only one arg in order by is expected here' - input: select 'apple' ~~ 'app%' diff --git a/src/tests/sqlsmith/src/sql_gen/time_window.rs b/src/tests/sqlsmith/src/sql_gen/time_window.rs index d5fd7c8936b22..053af729ee526 100644 --- a/src/tests/sqlsmith/src/sql_gen/time_window.rs +++ b/src/tests/sqlsmith/src/sql_gen/time_window.rs @@ -46,7 +46,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { let time_col = time_cols.choose(&mut self.rng).unwrap(); let time_col = Expr::Identifier(time_col.name.as_str().into()); let args = create_args(vec![name, time_col, size]); - let relation = create_tvf("tumble", alias, args); + let relation = create_tvf("tumble", alias, args, false); let table = Table::new(table_name, schema.clone()); @@ -72,7 +72,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { let time_col = Expr::Identifier(time_col.name.as_str().into()); let args = create_args(vec![name, time_col, slide, size]); - let relation = create_tvf("hop", alias, args); + let relation = create_tvf("hop", alias, args, false); let table = Table::new(table_name, schema.clone()); @@ -120,11 +120,17 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { } /// Create a table view function. -fn create_tvf(name: &str, alias: TableAlias, args: Vec) -> TableFactor { +fn create_tvf( + name: &str, + alias: TableAlias, + args: Vec, + with_ordinality: bool, +) -> TableFactor { TableFactor::TableFunction { name: ObjectName(vec![name.into()]), alias: Some(alias), args, + with_ordinality, } }