From dc5ba420eb6ffced08e9b4503390d2ce8671daa4 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 13 Sep 2023 17:07:15 +0800 Subject: [PATCH] impl with ord --- e2e_test/batch/basic/unnest.slt.part | 35 ++++ e2e_test/streaming/project_set.slt | 24 +++ .../tests/testdata/input/with_ordinality.yaml | 43 +++++ .../testdata/output/with_ordinality.yaml | 162 ++++++++++++++++++ src/frontend/src/binder/relation/join.rs | 4 +- src/frontend/src/binder/relation/mod.rs | 5 +- .../src/binder/relation/table_function.rs | 39 +++-- .../plan_node/logical_table_function.rs | 23 ++- .../table_function_to_project_set_rule.rs | 46 +++-- src/frontend/src/planner/relation.rs | 18 +- 10 files changed, 358 insertions(+), 41 deletions(-) create mode 100644 src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml create mode 100644 src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml diff --git a/e2e_test/batch/basic/unnest.slt.part b/e2e_test/batch/basic/unnest.slt.part index efcf9981e65c9..04474a2b6bffe 100644 --- a/e2e_test/batch/basic/unnest.slt.part +++ b/e2e_test/batch/basic/unnest.slt.part @@ -83,3 +83,38 @@ select distinct unnest(array[1,1,2,3,1]) as x; 1 2 3 + +query I +select * from unnest(array[0,1,2]) with ordinality; +---- +0 1 +1 2 +2 3 + +query I +select * from unnest(array[0,1,2]) with ordinality, unnest(array[3,4]) with ordinality as unnest_2; +---- +0 1 3 1 +0 1 4 2 +1 2 3 1 +1 2 4 2 +2 3 3 1 +2 3 4 2 + +statement ok +create table t(arr varchar[]); + +statement ok +insert into t values (Array['a','b', 'c']), (Array['d','e']); + +query I +select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num); +---- +{a,b,c} c 3 +{a,b,c} b 2 +{a,b,c} a 1 +{d,e} e 2 +{d,e} d 1 + +statement ok +drop table t; diff --git a/e2e_test/streaming/project_set.slt b/e2e_test/streaming/project_set.slt index 959c75ebebefc..a5e69e4f49309 100644 --- a/e2e_test/streaming/project_set.slt +++ b/e2e_test/streaming/project_set.slt @@ -107,3 +107,27 @@ with cte as (SELECT 1 as v1, unnest(array[1,2,3,4,5]) AS v2) select v1 from cte; 1 1 1 + +statement ok +create table t(arr varchar[]); + +statement ok +create materialized view mv as select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num); + +statement ok +insert into t values (Array['a','b', 'c']), (Array['d','e']); + +query I rowsort +select * from mv; +---- +{a,b,c} c 3 +{a,b,c} b 2 +{a,b,c} a 1 +{d,e} e 2 +{d,e} d 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml new file mode 100644 index 0000000000000..fa99f2087f7ad --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml @@ -0,0 +1,43 @@ +# constant FROM +- sql: | + select * from unnest(array[1,2,3]) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_plan +# lateral join +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo; + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a); + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar); + expected_outputs: + - binder_error +# multiple with ordinality +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2); + expected_outputs: + - batch_plan + - stream_plan diff --git a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml new file mode 100644 index 0000000000000..72558dad9be6d --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml @@ -0,0 +1,162 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + select * from unnest(array[1,2,3]) WITH ORDINALITY; + batch_plan: |- + BatchProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1] } + └─BatchProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32))] } + └─BatchValues { rows: [[]] } + stream_plan: |- + StreamMaterialize { columns: [unnest, ordinality, _row_id(hidden), projected_row_id(hidden)], stream_key: [_row_id, projected_row_id], pk_columns: [_row_id, projected_row_id], pk_conflict: NoCheck } + └─StreamProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1, _row_id, projected_row_id] } + └─StreamProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32)), $0] } + └─StreamValues { rows: [[0:Int64]] } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, unnest, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, foo, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, a, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, a, ord, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar); + binder_error: 'Bind error: table "foo" has 2 columns available but 3 column aliases specified' +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + │ ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + │ └─BatchProjectSet { select_list: [$0, Unnest($0)] } + │ └─BatchHashAgg { group_key: [t.arr], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.arr] } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, unnest, ordinality, arr_2, ordinality_2, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden), t.arr#1(hidden), projected_row_id#1(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_columns: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, t.arr, projected_row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), $expr1, projected_row_id, t.arr, Unnest($0), t._row_id, t.arr, projected_row_id] } + ├─StreamShare { id: 8 } + │ └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + │ └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + │ ├─StreamExchange { dist: HashShard(t.arr) } + │ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + │ └─StreamProjectSet { select_list: [$0, Unnest($0)] } + │ └─StreamProject { exprs: [t.arr] } + │ └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + │ └─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamShare { id: 8 } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/src/binder/relation/join.rs b/src/frontend/src/binder/relation/join.rs index fed759d6eb83b..eb4ce96f9ab3f 100644 --- a/src/frontend/src/binder/relation/join.rs +++ b/src/frontend/src/binder/relation/join.rs @@ -59,7 +59,7 @@ impl Binder { let is_lateral = match &right { Relation::Subquery(subquery) if subquery.lateral => true, - Relation::TableFunction(_) => true, + Relation::TableFunction { .. } => true, _ => false, }; @@ -110,7 +110,7 @@ impl Binder { let is_lateral = match &right { Relation::Subquery(subquery) if subquery.lateral => true, - Relation::TableFunction(_) => true, + Relation::TableFunction { .. } => true, _ => false, }; diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index cafb0e99c612f..4ee4337c2f048 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -119,7 +119,10 @@ impl Relation { ); correlated_indices } - Relation::TableFunction(table_function) => table_function + Relation::TableFunction { + expr: table_function, + with_ordinality: _, + } => table_function .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id), _ => vec![], } diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index 33d4b31ff34d0..988ea0561a860 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -47,17 +47,17 @@ impl Binder { let func_name = &name.0[0].real_value(); // internal/system table functions { - if with_ordinality { - return Err(ErrorCode::NotImplemented( - format!( - "WITH ORDINALITY for internal/system table function {}", - func_name - ), - None.into(), - ) - .into()); - } if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) { + if with_ordinality { + return Err(ErrorCode::NotImplemented( + format!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ), + None.into(), + ) + .into()); + } return self.bind_internal_table(args, alias); } if func_name.eq_ignore_ascii_case(PG_GET_KEYWORDS_FUNC_NAME) @@ -65,6 +65,16 @@ impl Binder { format!("{}.{}", PG_CATALOG_SCHEMA_NAME, PG_GET_KEYWORDS_FUNC_NAME).as_str(), ) { + if with_ordinality { + return Err(ErrorCode::NotImplemented( + format!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ), + None.into(), + ) + .into()); + } return self.bind_relation_by_name_inner( Some(PG_CATALOG_SCHEMA_NAME), PG_KEYWORDS_TABLE_NAME, @@ -115,12 +125,14 @@ impl Binder { self.pop_context()?; let func = func?; - let columns = if let DataType::Struct(s) = func.return_type() { + // bool indicates if the field is hidden + let mut columns = if let DataType::Struct(s) = func.return_type() { // If the table function returns a struct, it will be flattened into multiple columns. let schema = Schema::from(&s); schema.fields.into_iter().map(|f| (false, f)).collect_vec() } else { - // If there is an table alias, we should use the alias as the table function's + // If there is an table alias (and it doesn't return a struct), + // we should use the alias as the table function's // column name. If column aliases are also provided, they // are handled in bind_table_to_context. // @@ -138,6 +150,9 @@ impl Binder { }; vec![(false, Field::with_name(func.return_type(), col_name))] }; + if with_ordinality { + columns.push((false, Field::with_name(DataType::Int64, "ordinality"))); + } self.bind_table_to_context(columns, func_name.clone(), alias)?; diff --git a/src/frontend/src/optimizer/plan_node/logical_table_function.rs b/src/frontend/src/optimizer/plan_node/logical_table_function.rs index 4b62d0d8436cc..b54411a36faa9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/logical_table_function.rs @@ -31,17 +31,24 @@ use crate::optimizer::plan_node::{ use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{ColIndexMapping, Condition}; -/// `LogicalGenerateSeries` implements Hop Table Function. +/// `LogicalTableFunction` is a scalar/table function used as a relation (in the `FROM` clause). +/// +/// If the function returns a struct, it will be flattened into multiple columns. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalTableFunction { pub base: PlanBase, pub table_function: TableFunction, + pub with_ordinality: bool, } impl LogicalTableFunction { /// Create a [`LogicalTableFunction`] node. Used internally by optimizer. - pub fn new(table_function: TableFunction, ctx: OptimizerContextRef) -> Self { - let schema = if let DataType::Struct(s) = table_function.return_type() { + pub fn new( + table_function: TableFunction, + with_ordinality: bool, + ctx: OptimizerContextRef, + ) -> Self { + let mut schema = if let DataType::Struct(s) = table_function.return_type() { // If the function returns a struct, it will be flattened into multiple columns. Schema::from(&s) } else { @@ -52,11 +59,17 @@ impl LogicalTableFunction { )], } }; + if with_ordinality { + schema + .fields + .push(Field::with_name(DataType::Int64, "ordinality")); + } let functional_dependency = FunctionalDependencySet::new(schema.len()); let base = PlanBase::new_logical(ctx, schema, vec![], functional_dependency); Self { base, table_function, + with_ordinality, } } @@ -111,6 +124,10 @@ impl PredicatePushdown for LogicalTableFunction { impl ToBatch for LogicalTableFunction { fn to_batch(&self) -> Result { + // TODO: actually this is also unreachable now, as it's always converted to ProjectSet. Maybe we can remove it. + if self.with_ordinality { + todo!() + } Ok(BatchTableFunction::new(self.clone()).into()) } } diff --git a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs index 095e08664e1c4..1c198769e1751 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs @@ -19,11 +19,11 @@ use risingwave_common::types::DataType; use super::{BoxedRule, Rule}; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ - LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, + LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary, }; use crate::optimizer::PlanRef; -/// Transform a table function into a project set +/// Transform a TableFunction (used in FROM clause) into a ProjectSet so that it can be unnested later if it contains CorrelatedInputRef. /// /// Before: /// @@ -54,11 +54,11 @@ impl Rule for TableFunctionToProjectSetRule { logical_table_function.base.ctx.clone(), ); let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]); - // We need a project to align schema type because `LogicalProjectSet` has a hidden column - // `projected_row_id` and table function could return multiple columns, while project set - // return only one column with struct type. + // We need a project to align schema type because + // 1. `LogicalProjectSet` has a hidden column `projected_row_id` (0-th col) + // 2. When the function returns a struct type, TableFunction will return flatten it into multiple columns, while ProjectSet still returns a single column. let table_function_col_idx = 1; - if let DataType::Struct(st) = table_function_return_type.clone() { + let logical_project = if let DataType::Struct(st) = table_function_return_type.clone() { let exprs = st .types() .enumerate() @@ -66,13 +66,11 @@ impl Rule for TableFunctionToProjectSetRule { let field_access = FunctionCall::new_unchecked( ExprType::Field, vec![ - ExprImpl::InputRef( - InputRef::new( - table_function_col_idx, - table_function_return_type.clone(), - ) - .into(), - ), + InputRef::new( + table_function_col_idx, + table_function_return_type.clone(), + ) + .into(), ExprImpl::literal_int(i as i32), ], data_type.clone(), @@ -80,13 +78,27 @@ impl Rule for TableFunctionToProjectSetRule { ExprImpl::FunctionCall(field_access.into()) }) .collect_vec(); - let logical_project = LogicalProject::new(logical_project_set, exprs); - Some(logical_project.into()) + LogicalProject::new(logical_project_set, exprs) } else { - let logical_project = LogicalProject::with_out_col_idx( + LogicalProject::with_out_col_idx( logical_project_set, std::iter::once(table_function_col_idx), - ); + ) + }; + + if logical_table_function.with_ordinality { + let projected_row_id = InputRef::new(0, DataType::Int64).into(); + let ordinality = FunctionCall::new( + ExprType::Add, + vec![projected_row_id, ExprImpl::literal_bigint(1)], + ) + .unwrap() // i64 + i64 is ok + .into(); + let mut exprs = logical_project.exprs().clone(); + exprs.push(ordinality); + let logical_project = LogicalProject::new(logical_project.input(), exprs); + Some(logical_project.into()) + } else { Some(logical_project.into()) } } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 52db1a3f33e53..0476f093090f9 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -160,13 +160,19 @@ impl Planner { ) -> Result { // TODO: maybe we can unify LogicalTableFunction with LogicalValues match table_function { - ExprImpl::TableFunction(tf) => Ok(LogicalTableFunction::new(*tf, self.ctx()).into()), + ExprImpl::TableFunction(tf) => { + Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into()) + } expr => { - let schema = Schema { - // TODO: should be named - fields: vec![Field::unnamed(expr.return_type())], - }; - Ok(LogicalValues::create(vec![vec![expr]], schema, self.ctx())) + if with_ordinality { + todo!() + } else { + let schema = Schema { + // TODO: should be named + fields: vec![Field::unnamed(expr.return_type())], + }; + Ok(LogicalValues::create(vec![vec![expr]], schema, self.ctx())) + } } } }