From e7c3cd860bf5be0541c44266bbb9268324b16085 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 25 Jul 2023 09:54:41 +0200 Subject: [PATCH 1/9] tmp --- src/frontend/src/binder/relation/mod.rs | 19 +++++++--- .../src/binder/relation/table_function.rs | 35 +++++++++++++++++-- .../src/catalog/system_catalog/mod.rs | 1 + .../catalog/system_catalog/pg_catalog/mod.rs | 5 +++ .../plan_node/logical_table_function.rs | 1 + src/frontend/src/planner/relation.rs | 11 ++++-- src/sqlparser/src/ast/query.rs | 10 ++++-- src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 9 ++++- src/tests/sqlsmith/src/sql_gen/time_window.rs | 7 ++-- 10 files changed, 83 insertions(+), 16 deletions(-) diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index c92b00ea82819..e3bb124687cb8 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -54,7 +54,11 @@ pub enum Relation { Subquery(Box), Join(Box), WindowTableFunction(Box), - TableFunction(ExprImpl), + /// Table function or scalar function. + TableFunction { + expr: ExprImpl, + with_ordinality: bool, + }, Watermark(Box), Share(Box), } @@ -67,7 +71,9 @@ impl RewriteExprsRecursive for Relation { Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter), Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter), Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter), - Relation::TableFunction(inner) => *inner = rewriter.rewrite_expr(inner.take()), + Relation::TableFunction { expr: inner, .. } => { + *inner = rewriter.rewrite_expr(inner.take()) + } _ => {} } } @@ -433,9 +439,12 @@ impl Binder { alias, for_system_time_as_of_proctime, } => self.bind_relation_by_name(name, alias, for_system_time_as_of_proctime), - TableFactor::TableFunction { name, alias, args } => { - self.bind_table_function(name, alias, args) - } + TableFactor::TableFunction { + name, + alias, + args, + with_ordinality, + } => self.bind_table_function(name, alias, args, with_ordinality), TableFactor::Derived { lateral, subquery, diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index 1be11687fb1c8..20983a343d1d1 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use risingwave_common::catalog::{ Field, Schema, PG_CATALOG_SCHEMA_NAME, RW_INTERNAL_TABLE_FUNCTION_NAME, }; +use risingwave_common::error::ErrorCode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Function, FunctionArg, ObjectName, TableAlias}; @@ -34,15 +35,28 @@ impl Binder { /// /// Besides [`TableFunction`] expr, it can also be other things like window table functions, or /// scalar functions. + /// + /// `with_ordinality` is only supported for the `TableFunction` case now. pub(super) fn bind_table_function( &mut self, name: ObjectName, alias: Option, args: Vec, + with_ordinality: bool, ) -> Result { let func_name = &name.0[0].real_value(); // internal/system table functions { + if with_ordinality { + return Err(ErrorCode::NotImplemented( + format!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ), + None.into(), + ) + .into()); + } if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) { return self.bind_internal_table(args, alias); } @@ -61,12 +75,25 @@ impl Binder { } // window table functions (tumble/hop) if let Ok(kind) = WindowTableFunctionKind::from_str(func_name) { + if with_ordinality { + return Err(ErrorCode::InvalidInputSyntax(format!( + "WITH ORDINALITY for window table function {}", + func_name + )) + .into()); + } return Ok(Relation::WindowTableFunction(Box::new( self.bind_window_table_function(alias, kind, args)?, ))); } // watermark if is_watermark_func(func_name) { + if with_ordinality { + return Err(ErrorCode::InvalidInputSyntax( + "WITH ORDINALITY for watermark".to_string(), + ) + .into()); + } return Ok(Relation::Watermark(Box::new( self.bind_watermark(alias, args)?, ))); @@ -86,8 +113,7 @@ impl Binder { self.context.clause = clause; let columns = if let DataType::Struct(s) = func.return_type() { - // If the table function returns a struct, it's fields can be accessed just - // like a table's columns. + // If the table function returns a struct, it will be flattened into multiple columns. let schema = Schema::from(&s); schema.fields.into_iter().map(|f| (false, f)).collect_vec() } else { @@ -112,6 +138,9 @@ impl Binder { self.bind_table_to_context(columns, func_name.clone(), alias)?; - Ok(Relation::TableFunction(func)) + Ok(Relation::TableFunction { + expr: func, + with_ordinality, + }) } } diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index f04d7d09d5648..738cb67b9bcd2 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -262,6 +262,7 @@ prepare_sys_catalog! { { PG_CATALOG, PG_COLLATION, vec![0], read_collation_info }, { PG_CATALOG, PG_AM, vec![0], read_am_info }, { PG_CATALOG, PG_OPERATOR, vec![0], read_operator_info }, + // { PG_CATALOG, PG_AGGREGATE, vec![0], read_aggregate_info }, { PG_CATALOG, PG_VIEWS, vec![0, 1], read_views_info }, { PG_CATALOG, PG_ATTRIBUTE, vec![0, 4], read_pg_attribute }, { PG_CATALOG, PG_DATABASE, vec![0], read_database_info }, diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs index 7d1c446ec94a0..518c69ba573cd 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs @@ -194,6 +194,11 @@ impl SysCatalogReaderImpl { Ok(vec![]) } + pub(super) fn read_aggregate_info(&self) -> Result> { + Ok(vec![]) + } + + // FIXME(noel): Tracked by pub(super) fn read_am_info(&self) -> Result> { Ok(vec![]) diff --git a/src/frontend/src/optimizer/plan_node/logical_table_function.rs b/src/frontend/src/optimizer/plan_node/logical_table_function.rs index eb712a1e24244..a01945b08de2d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/logical_table_function.rs @@ -42,6 +42,7 @@ impl LogicalTableFunction { /// Create a [`LogicalTableFunction`] node. Used internally by optimizer. pub fn new(table_function: TableFunction, ctx: OptimizerContextRef) -> Self { let schema = if let DataType::Struct(s) = table_function.return_type() { + // If the function returns a struct, it will be flattened into multiple columns. Schema::from(&s) } else { Schema { diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index ad3a6279d76ae..14c9a809235ad 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -43,7 +43,10 @@ impl Planner { Relation::Join(join) => self.plan_join(*join), Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf), Relation::Source(s) => self.plan_source(*s), - Relation::TableFunction(tf) => self.plan_table_function(tf), + Relation::TableFunction { + expr: tf, + with_ordinality, + } => self.plan_table_function(tf, with_ordinality), Relation::Watermark(tf) => self.plan_watermark(*tf), Relation::Share(share) => self.plan_share(*share), } @@ -116,7 +119,11 @@ impl Planner { } } - pub(super) fn plan_table_function(&mut self, table_function: ExprImpl) -> Result { + pub(super) fn plan_table_function( + &mut self, + table_function: ExprImpl, + with_ordinality: bool, + ) -> Result { // TODO: maybe we can unify LogicalTableFunction with LogicalValues match table_function { ExprImpl::TableFunction(tf) => Ok(LogicalTableFunction::new(*tf, self.ctx()).into()), diff --git a/src/sqlparser/src/ast/query.rs b/src/sqlparser/src/ast/query.rs index cc703e5b81a38..55cdfec80cd22 100644 --- a/src/sqlparser/src/ast/query.rs +++ b/src/sqlparser/src/ast/query.rs @@ -387,11 +387,14 @@ pub enum TableFactor { subquery: Box, alias: Option, }, - /// `[ AS ]` + /// `(args)[ AS ]` + /// + /// Note that scalar functions can also be used in this way. TableFunction { name: ObjectName, alias: Option, args: Vec, + with_ordinality: bool, }, /// Represents a parenthesized table factor. The SQL spec only allows a /// join expression (`(foo bar [ baz ... ])`) to be nested, @@ -433,8 +436,11 @@ impl fmt::Display for TableFactor { } Ok(()) } - TableFactor::TableFunction { name, alias, args } => { + TableFactor::TableFunction { name, alias, args, with_ordinality } => { write!(f, "{}({})", name, display_comma_separated(args))?; + if *with_ordinality { + write!(f, " WITH ORDINALITY")?; + } if let Some(alias) = alias { write!(f, " AS {}", alias)?; } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index abdddbecbd694..dec6d013c12cd 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -345,6 +345,7 @@ define_keywords!( OPTION, OR, ORDER, + ORDINALITY, OTHERS, OUT, OUTER, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index b4f7dbfb0f43b..438de4b73c545 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -4076,8 +4076,15 @@ impl Parser { if !order_by.is_empty() { return parser_err!("Table-valued functions do not support ORDER BY clauses"); } + let with_ordinality = self.parse_keywords(&[Keyword::WITH, Keyword::ORDINALITY]); + let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?; - Ok(TableFactor::TableFunction { name, alias, args }) + Ok(TableFactor::TableFunction { + name, + alias, + args, + with_ordinality, + }) } else { let for_system_time_as_of_proctime = self.parse_for_system_time_as_of_proctime()?; let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?; diff --git a/src/tests/sqlsmith/src/sql_gen/time_window.rs b/src/tests/sqlsmith/src/sql_gen/time_window.rs index 58b1344ec44e4..8b687cdca7bc8 100644 --- a/src/tests/sqlsmith/src/sql_gen/time_window.rs +++ b/src/tests/sqlsmith/src/sql_gen/time_window.rs @@ -46,7 +46,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { let time_col = time_cols.choose(&mut self.rng).unwrap(); let time_col = Expr::Identifier(time_col.name.as_str().into()); let args = create_args(vec![name, time_col, size]); - let relation = create_tvf("tumble", alias, args); + let relation = create_tvf("tumble", alias, args,false); let table = Table::new(table_name, schema.clone()); @@ -72,7 +72,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { let time_col = Expr::Identifier(time_col.name.as_str().into()); let args = create_args(vec![name, time_col, slide, size]); - let relation = create_tvf("hop", alias, args); + let relation = create_tvf("hop", alias, args,false); let table = Table::new(table_name, schema.clone()); @@ -120,11 +120,12 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { } /// Create a table view function. -fn create_tvf(name: &str, alias: TableAlias, args: Vec) -> TableFactor { +fn create_tvf(name: &str, alias: TableAlias, args: Vec,with_ordinality:bool) -> TableFactor { TableFactor::TableFunction { name: ObjectName(vec![name.into()]), alias: Some(alias), args, + with_ordinality, } } From dc5ba420eb6ffced08e9b4503390d2ce8671daa4 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 13 Sep 2023 17:07:15 +0800 Subject: [PATCH 2/9] impl with ord --- e2e_test/batch/basic/unnest.slt.part | 35 ++++ e2e_test/streaming/project_set.slt | 24 +++ .../tests/testdata/input/with_ordinality.yaml | 43 +++++ .../testdata/output/with_ordinality.yaml | 162 ++++++++++++++++++ src/frontend/src/binder/relation/join.rs | 4 +- src/frontend/src/binder/relation/mod.rs | 5 +- .../src/binder/relation/table_function.rs | 39 +++-- .../plan_node/logical_table_function.rs | 23 ++- .../table_function_to_project_set_rule.rs | 46 +++-- src/frontend/src/planner/relation.rs | 18 +- 10 files changed, 358 insertions(+), 41 deletions(-) create mode 100644 src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml create mode 100644 src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml diff --git a/e2e_test/batch/basic/unnest.slt.part b/e2e_test/batch/basic/unnest.slt.part index efcf9981e65c9..04474a2b6bffe 100644 --- a/e2e_test/batch/basic/unnest.slt.part +++ b/e2e_test/batch/basic/unnest.slt.part @@ -83,3 +83,38 @@ select distinct unnest(array[1,1,2,3,1]) as x; 1 2 3 + +query I +select * from unnest(array[0,1,2]) with ordinality; +---- +0 1 +1 2 +2 3 + +query I +select * from unnest(array[0,1,2]) with ordinality, unnest(array[3,4]) with ordinality as unnest_2; +---- +0 1 3 1 +0 1 4 2 +1 2 3 1 +1 2 4 2 +2 3 3 1 +2 3 4 2 + +statement ok +create table t(arr varchar[]); + +statement ok +insert into t values (Array['a','b', 'c']), (Array['d','e']); + +query I +select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num); +---- +{a,b,c} c 3 +{a,b,c} b 2 +{a,b,c} a 1 +{d,e} e 2 +{d,e} d 1 + +statement ok +drop table t; diff --git a/e2e_test/streaming/project_set.slt b/e2e_test/streaming/project_set.slt index 959c75ebebefc..a5e69e4f49309 100644 --- a/e2e_test/streaming/project_set.slt +++ b/e2e_test/streaming/project_set.slt @@ -107,3 +107,27 @@ with cte as (SELECT 1 as v1, unnest(array[1,2,3,4,5]) AS v2) select v1 from cte; 1 1 1 + +statement ok +create table t(arr varchar[]); + +statement ok +create materialized view mv as select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num); + +statement ok +insert into t values (Array['a','b', 'c']), (Array['d','e']); + +query I rowsort +select * from mv; +---- +{a,b,c} c 3 +{a,b,c} b 2 +{a,b,c} a 1 +{d,e} e 2 +{d,e} d 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml new file mode 100644 index 0000000000000..fa99f2087f7ad --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml @@ -0,0 +1,43 @@ +# constant FROM +- sql: | + select * from unnest(array[1,2,3]) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_plan +# lateral join +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo; + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a); + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar); + expected_outputs: + - binder_error +# multiple with ordinality +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2); + expected_outputs: + - batch_plan + - stream_plan diff --git a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml new file mode 100644 index 0000000000000..72558dad9be6d --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml @@ -0,0 +1,162 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + select * from unnest(array[1,2,3]) WITH ORDINALITY; + batch_plan: |- + BatchProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1] } + └─BatchProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32))] } + └─BatchValues { rows: [[]] } + stream_plan: |- + StreamMaterialize { columns: [unnest, ordinality, _row_id(hidden), projected_row_id(hidden)], stream_key: [_row_id, projected_row_id], pk_columns: [_row_id, projected_row_id], pk_conflict: NoCheck } + └─StreamProject { exprs: [Unnest(ARRAY[1, 2, 3]:List(Int32)), (projected_row_id + 1:Int64) as $expr1, _row_id, projected_row_id] } + └─StreamProjectSet { select_list: [Unnest(ARRAY[1, 2, 3]:List(Int32)), $0] } + └─StreamValues { rows: [[0:Int64]] } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, unnest, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, foo, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, a, ordinality, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, a, ord, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar); + binder_error: 'Bind error: table "foo" has 2 columns available but 3 column aliases specified' +- sql: | + create table t(x int , arr int[]); + select * from t cross join unnest(arr) WITH ORDINALITY, unnest(arr) WITH ORDINALITY AS unnest_2(arr_2,ordinality_2); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + │ ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + │ └─BatchProjectSet { select_list: [$0, Unnest($0)] } + │ └─BatchHashAgg { group_key: [t.arr], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.arr] } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, unnest, ordinality, arr_2, ordinality_2, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden), t.arr#1(hidden), projected_row_id#1(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_columns: [t._row_id, t.arr, projected_row_id, arr, t.arr#1, projected_row_id#1], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, t.arr, projected_row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), $expr1, projected_row_id, t.arr, Unnest($0), t._row_id, t.arr, projected_row_id] } + ├─StreamShare { id: 8 } + │ └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + │ └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + │ ├─StreamExchange { dist: HashShard(t.arr) } + │ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + │ └─StreamProjectSet { select_list: [$0, Unnest($0)] } + │ └─StreamProject { exprs: [t.arr] } + │ └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + │ └─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamShare { id: 8 } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/src/binder/relation/join.rs b/src/frontend/src/binder/relation/join.rs index fed759d6eb83b..eb4ce96f9ab3f 100644 --- a/src/frontend/src/binder/relation/join.rs +++ b/src/frontend/src/binder/relation/join.rs @@ -59,7 +59,7 @@ impl Binder { let is_lateral = match &right { Relation::Subquery(subquery) if subquery.lateral => true, - Relation::TableFunction(_) => true, + Relation::TableFunction { .. } => true, _ => false, }; @@ -110,7 +110,7 @@ impl Binder { let is_lateral = match &right { Relation::Subquery(subquery) if subquery.lateral => true, - Relation::TableFunction(_) => true, + Relation::TableFunction { .. } => true, _ => false, }; diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index cafb0e99c612f..4ee4337c2f048 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -119,7 +119,10 @@ impl Relation { ); correlated_indices } - Relation::TableFunction(table_function) => table_function + Relation::TableFunction { + expr: table_function, + with_ordinality: _, + } => table_function .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id), _ => vec![], } diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index 33d4b31ff34d0..988ea0561a860 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -47,17 +47,17 @@ impl Binder { let func_name = &name.0[0].real_value(); // internal/system table functions { - if with_ordinality { - return Err(ErrorCode::NotImplemented( - format!( - "WITH ORDINALITY for internal/system table function {}", - func_name - ), - None.into(), - ) - .into()); - } if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) { + if with_ordinality { + return Err(ErrorCode::NotImplemented( + format!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ), + None.into(), + ) + .into()); + } return self.bind_internal_table(args, alias); } if func_name.eq_ignore_ascii_case(PG_GET_KEYWORDS_FUNC_NAME) @@ -65,6 +65,16 @@ impl Binder { format!("{}.{}", PG_CATALOG_SCHEMA_NAME, PG_GET_KEYWORDS_FUNC_NAME).as_str(), ) { + if with_ordinality { + return Err(ErrorCode::NotImplemented( + format!( + "WITH ORDINALITY for internal/system table function {}", + func_name + ), + None.into(), + ) + .into()); + } return self.bind_relation_by_name_inner( Some(PG_CATALOG_SCHEMA_NAME), PG_KEYWORDS_TABLE_NAME, @@ -115,12 +125,14 @@ impl Binder { self.pop_context()?; let func = func?; - let columns = if let DataType::Struct(s) = func.return_type() { + // bool indicates if the field is hidden + let mut columns = if let DataType::Struct(s) = func.return_type() { // If the table function returns a struct, it will be flattened into multiple columns. let schema = Schema::from(&s); schema.fields.into_iter().map(|f| (false, f)).collect_vec() } else { - // If there is an table alias, we should use the alias as the table function's + // If there is an table alias (and it doesn't return a struct), + // we should use the alias as the table function's // column name. If column aliases are also provided, they // are handled in bind_table_to_context. // @@ -138,6 +150,9 @@ impl Binder { }; vec![(false, Field::with_name(func.return_type(), col_name))] }; + if with_ordinality { + columns.push((false, Field::with_name(DataType::Int64, "ordinality"))); + } self.bind_table_to_context(columns, func_name.clone(), alias)?; diff --git a/src/frontend/src/optimizer/plan_node/logical_table_function.rs b/src/frontend/src/optimizer/plan_node/logical_table_function.rs index 4b62d0d8436cc..b54411a36faa9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/logical_table_function.rs @@ -31,17 +31,24 @@ use crate::optimizer::plan_node::{ use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{ColIndexMapping, Condition}; -/// `LogicalGenerateSeries` implements Hop Table Function. +/// `LogicalTableFunction` is a scalar/table function used as a relation (in the `FROM` clause). +/// +/// If the function returns a struct, it will be flattened into multiple columns. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalTableFunction { pub base: PlanBase, pub table_function: TableFunction, + pub with_ordinality: bool, } impl LogicalTableFunction { /// Create a [`LogicalTableFunction`] node. Used internally by optimizer. - pub fn new(table_function: TableFunction, ctx: OptimizerContextRef) -> Self { - let schema = if let DataType::Struct(s) = table_function.return_type() { + pub fn new( + table_function: TableFunction, + with_ordinality: bool, + ctx: OptimizerContextRef, + ) -> Self { + let mut schema = if let DataType::Struct(s) = table_function.return_type() { // If the function returns a struct, it will be flattened into multiple columns. Schema::from(&s) } else { @@ -52,11 +59,17 @@ impl LogicalTableFunction { )], } }; + if with_ordinality { + schema + .fields + .push(Field::with_name(DataType::Int64, "ordinality")); + } let functional_dependency = FunctionalDependencySet::new(schema.len()); let base = PlanBase::new_logical(ctx, schema, vec![], functional_dependency); Self { base, table_function, + with_ordinality, } } @@ -111,6 +124,10 @@ impl PredicatePushdown for LogicalTableFunction { impl ToBatch for LogicalTableFunction { fn to_batch(&self) -> Result { + // TODO: actually this is also unreachable now, as it's always converted to ProjectSet. Maybe we can remove it. + if self.with_ordinality { + todo!() + } Ok(BatchTableFunction::new(self.clone()).into()) } } diff --git a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs index 095e08664e1c4..1c198769e1751 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs @@ -19,11 +19,11 @@ use risingwave_common::types::DataType; use super::{BoxedRule, Rule}; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ - LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, + LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary, }; use crate::optimizer::PlanRef; -/// Transform a table function into a project set +/// Transform a TableFunction (used in FROM clause) into a ProjectSet so that it can be unnested later if it contains CorrelatedInputRef. /// /// Before: /// @@ -54,11 +54,11 @@ impl Rule for TableFunctionToProjectSetRule { logical_table_function.base.ctx.clone(), ); let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]); - // We need a project to align schema type because `LogicalProjectSet` has a hidden column - // `projected_row_id` and table function could return multiple columns, while project set - // return only one column with struct type. + // We need a project to align schema type because + // 1. `LogicalProjectSet` has a hidden column `projected_row_id` (0-th col) + // 2. When the function returns a struct type, TableFunction will return flatten it into multiple columns, while ProjectSet still returns a single column. let table_function_col_idx = 1; - if let DataType::Struct(st) = table_function_return_type.clone() { + let logical_project = if let DataType::Struct(st) = table_function_return_type.clone() { let exprs = st .types() .enumerate() @@ -66,13 +66,11 @@ impl Rule for TableFunctionToProjectSetRule { let field_access = FunctionCall::new_unchecked( ExprType::Field, vec![ - ExprImpl::InputRef( - InputRef::new( - table_function_col_idx, - table_function_return_type.clone(), - ) - .into(), - ), + InputRef::new( + table_function_col_idx, + table_function_return_type.clone(), + ) + .into(), ExprImpl::literal_int(i as i32), ], data_type.clone(), @@ -80,13 +78,27 @@ impl Rule for TableFunctionToProjectSetRule { ExprImpl::FunctionCall(field_access.into()) }) .collect_vec(); - let logical_project = LogicalProject::new(logical_project_set, exprs); - Some(logical_project.into()) + LogicalProject::new(logical_project_set, exprs) } else { - let logical_project = LogicalProject::with_out_col_idx( + LogicalProject::with_out_col_idx( logical_project_set, std::iter::once(table_function_col_idx), - ); + ) + }; + + if logical_table_function.with_ordinality { + let projected_row_id = InputRef::new(0, DataType::Int64).into(); + let ordinality = FunctionCall::new( + ExprType::Add, + vec![projected_row_id, ExprImpl::literal_bigint(1)], + ) + .unwrap() // i64 + i64 is ok + .into(); + let mut exprs = logical_project.exprs().clone(); + exprs.push(ordinality); + let logical_project = LogicalProject::new(logical_project.input(), exprs); + Some(logical_project.into()) + } else { Some(logical_project.into()) } } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 52db1a3f33e53..0476f093090f9 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -160,13 +160,19 @@ impl Planner { ) -> Result { // TODO: maybe we can unify LogicalTableFunction with LogicalValues match table_function { - ExprImpl::TableFunction(tf) => Ok(LogicalTableFunction::new(*tf, self.ctx()).into()), + ExprImpl::TableFunction(tf) => { + Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into()) + } expr => { - let schema = Schema { - // TODO: should be named - fields: vec![Field::unnamed(expr.return_type())], - }; - Ok(LogicalValues::create(vec![vec![expr]], schema, self.ctx())) + if with_ordinality { + todo!() + } else { + let schema = Schema { + // TODO: should be named + fields: vec![Field::unnamed(expr.return_type())], + }; + Ok(LogicalValues::create(vec![vec![expr]], schema, self.ctx())) + } } } } From a3d4dc083a6cfd06920b99b1745a5ab44eb67ccd Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 13 Sep 2023 17:19:00 +0800 Subject: [PATCH 3/9] clippy --- .../src/optimizer/rule/table_function_to_project_set_rule.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs index 1c198769e1751..5a6f1187fdd02 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs @@ -23,7 +23,7 @@ use crate::optimizer::plan_node::{ }; use crate::optimizer::PlanRef; -/// Transform a TableFunction (used in FROM clause) into a ProjectSet so that it can be unnested later if it contains CorrelatedInputRef. +/// Transform a `TableFunction` (used in FROM clause) into a `ProjectSet` so that it can be unnested later if it contains `CorrelatedInputRef`. /// /// Before: /// From 5619e2c2649c493372674676aa50f0d2aec6d8e8 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 13 Sep 2023 17:20:15 +0800 Subject: [PATCH 4/9] fix --- e2e_test/streaming/project_set.slt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/streaming/project_set.slt b/e2e_test/streaming/project_set.slt index a5e69e4f49309..5b0577996aa33 100644 --- a/e2e_test/streaming/project_set.slt +++ b/e2e_test/streaming/project_set.slt @@ -120,11 +120,11 @@ insert into t values (Array['a','b', 'c']), (Array['d','e']); query I rowsort select * from mv; ---- -{a,b,c} c 3 -{a,b,c} b 2 {a,b,c} a 1 -{d,e} e 2 +{a,b,c} b 2 +{a,b,c} c 3 {d,e} d 1 +{d,e} e 2 statement ok drop materialized view mv; From 84893d44ab963ec0810db659f908af1b4fce0c9f Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 14 Sep 2023 10:46:07 +0800 Subject: [PATCH 5/9] update parser test --- src/sqlparser/tests/testdata/select.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sqlparser/tests/testdata/select.yaml b/src/sqlparser/tests/testdata/select.yaml index bbbb5a72bbdab..6aed3d2a4dc4c 100644 --- a/src/sqlparser/tests/testdata/select.yaml +++ b/src/sqlparser/tests/testdata/select.yaml @@ -29,10 +29,10 @@ formatted_sql: SELECT (CAST(ROW(1, 2, 3) AS foo)).v1.* - input: SELECT * FROM generate_series('2'::INT,'10'::INT,'2'::INT) formatted_sql: SELECT * FROM generate_series(CAST('2' AS INT), CAST('10' AS INT), CAST('2' AS INT)) - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "generate_series", quote_style: None }]), alias: None, args: [Unnamed(Expr(Cast { expr: Value(SingleQuotedString("2")), data_type: Int })), Unnamed(Expr(Cast { expr: Value(SingleQuotedString("10")), data_type: Int })), Unnamed(Expr(Cast { expr: Value(SingleQuotedString("2")), data_type: Int }))] }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "generate_series", quote_style: None }]), alias: None, args: [Unnamed(Expr(Cast { expr: Value(SingleQuotedString("2")), data_type: Int })), Unnamed(Expr(Cast { expr: Value(SingleQuotedString("10")), data_type: Int })), Unnamed(Expr(Cast { expr: Value(SingleQuotedString("2")), data_type: Int }))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT * FROM unnest(Array[1,2,3]); formatted_sql: SELECT * FROM unnest(ARRAY[1, 2, 3]) - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: None, args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("3"))], named: true })))] }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: None, args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("3"))], named: true })))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT id, fname, lname FROM customer WHERE salary <> 'Not Provided' AND salary <> '' formatted_sql: SELECT id, fname, lname FROM customer WHERE salary <> 'Not Provided' AND salary <> '' - input: SELECT id FROM customer WHERE NOT salary = '' @@ -102,7 +102,7 @@ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), for_system_time_as_of_proctime: false }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), for_system_time_as_of_proctime: true }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select percentile_cont(0.3) within group (order by x desc) from unnest(array[1,2,4,5,10]) as x formatted_sql: SELECT percentile_cont(0.3) FROM unnest(ARRAY[1, 2, 4, 5, 10]) AS x - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "percentile_cont", quote_style: None }]), args: [Unnamed(Expr(Value(Number("0.3"))))], over: None, distinct: false, order_by: [], filter: None, within_group: Some(OrderByExpr { expr: Identifier(Ident { value: "x", quote_style: None }), asc: Some(false), nulls_first: None }) }))], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "x", quote_style: None }, columns: [] }), args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("4")), Value(Number("5")), Value(Number("10"))], named: true })))] }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "percentile_cont", quote_style: None }]), args: [Unnamed(Expr(Value(Number("0.3"))))], over: None, distinct: false, order_by: [], filter: None, within_group: Some(OrderByExpr { expr: Identifier(Ident { value: "x", quote_style: None }), asc: Some(false), nulls_first: None }) }))], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "x", quote_style: None }, columns: [] }), args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("4")), Value(Number("5")), Value(Number("10"))], named: true })))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select percentile_cont(0.3) within group (order by x, y desc) from t error_msg: 'sql parser error: only one arg in order by is expected here' - input: select 'apple' ~~ 'app%' From 228fc6e3715cf30e34977c747e55b9bf092f3caa Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 14 Sep 2023 11:01:23 +0800 Subject: [PATCH 6/9] update todo --- .../tests/testdata/input/with_ordinality.yaml | 14 +++++++++++ .../testdata/output/with_ordinality.yaml | 23 +++++++++++++++++++ .../plan_node/logical_table_function.rs | 22 ++++-------------- src/frontend/src/planner/relation.rs | 17 ++++++++++---- 4 files changed, 54 insertions(+), 22 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml index fa99f2087f7ad..4c21c47e0d9b3 100644 --- a/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml +++ b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml @@ -41,3 +41,17 @@ expected_outputs: - batch_plan - stream_plan +# constant FROM (scalar function) +- sql: | + select * from abs(1) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_plan +# lateral join (scalar function) +# FIXME: currently this panics due to CorrelatedInputRef in Values https://github.com/risingwavelabs/risingwave/issues/12231 +- sql: | + create table t(x int , arr int[]); + select * from t, abs(x) WITH ORDINALITY; + expected_outputs: + - batch_plan + - stream_error diff --git a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml index 72558dad9be6d..892cf136e248e 100644 --- a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml +++ b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml @@ -160,3 +160,26 @@ └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + select * from abs(1) WITH ORDINALITY; + batch_plan: 'BatchValues { rows: [[1:Int32, 1:Int64]] }' + stream_plan: |- + StreamMaterialize { columns: [abs, ordinality, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } + └─StreamValues { rows: [[Abs(1:Int32), 1:Int64, 0:Int64]] } +- sql: | + create table t(x int , arr int[]); + select * from t, abs(x) WITH ORDINALITY; + batch_plan: |- + BatchNestedLoopJoin { type: Inner, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchHashJoin { type: Inner, predicate: t.x IS NOT DISTINCT FROM t.x, output: [t.x, t.arr] } + │ ├─BatchExchange { order: [], dist: HashShard(t.x) } + │ │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + │ └─BatchHashAgg { group_key: [t.x], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(t.x) } + │ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } + └─BatchValues { rows: [[Abs(CorrelatedInputRef { index: 0, correlated_id: 1 }), 1:Int64]] } + stream_error: |- + Not supported: streaming nested-loop join + HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. + See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md diff --git a/src/frontend/src/optimizer/plan_node/logical_table_function.rs b/src/frontend/src/optimizer/plan_node/logical_table_function.rs index b54411a36faa9..c42a51aeb9024 100644 --- a/src/frontend/src/optimizer/plan_node/logical_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/logical_table_function.rs @@ -14,7 +14,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::error::Result; use risingwave_common::types::DataType; use super::utils::{childless_record, Distill}; @@ -25,8 +25,7 @@ use super::{ use crate::expr::{Expr, ExprRewriter, TableFunction}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::{ - BatchTableFunction, ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, - ToStreamContext, + ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{ColIndexMapping, Condition}; @@ -124,30 +123,19 @@ impl PredicatePushdown for LogicalTableFunction { impl ToBatch for LogicalTableFunction { fn to_batch(&self) -> Result { - // TODO: actually this is also unreachable now, as it's always converted to ProjectSet. Maybe we can remove it. - if self.with_ordinality { - todo!() - } - Ok(BatchTableFunction::new(self.clone()).into()) + unreachable!("TableFunction should be converted to ProjectSet") } } impl ToStream for LogicalTableFunction { fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Err( - ErrorCode::NotImplemented("LogicalTableFunction::to_stream".to_string(), None.into()) - .into(), - ) + unreachable!("TableFunction should be converted to ProjectSet") } fn logical_rewrite_for_stream( &self, _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - Err(ErrorCode::NotImplemented( - "LogicalTableFunction::logical_rewrite_for_stream".to_string(), - None.into(), - ) - .into()) + unreachable!("TableFunction should be converted to ProjectSet") } } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 0476f093090f9..f4085f6ffa42e 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -164,13 +164,20 @@ impl Planner { Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into()) } expr => { + let mut schema = Schema { + // TODO: should be named + fields: vec![Field::unnamed(expr.return_type())], + }; if with_ordinality { - todo!() + schema + .fields + .push(Field::with_name(DataType::Int64, "ordinality")); + Ok(LogicalValues::create( + vec![vec![expr, ExprImpl::literal_bigint(1)]], + schema, + self.ctx(), + )) } else { - let schema = Schema { - // TODO: should be named - fields: vec![Field::unnamed(expr.return_type())], - }; Ok(LogicalValues::create(vec![vec![expr]], schema, self.ctx())) } } From 2aac7b0295b8eee76f7c727f97514dc82cc9e32b Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 14 Sep 2023 11:05:02 +0800 Subject: [PATCH 7/9] add update --- e2e_test/streaming/project_set.slt | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/e2e_test/streaming/project_set.slt b/e2e_test/streaming/project_set.slt index 5b0577996aa33..f360663067e3e 100644 --- a/e2e_test/streaming/project_set.slt +++ b/e2e_test/streaming/project_set.slt @@ -126,6 +126,17 @@ select * from mv; {d,e} d 1 {d,e} e 2 +statement ok +update t set arr = Array['a', 'c'] where arr = Array['a','b', 'c']; + +query I rowsort +select * from mv; +---- +{a,c} a 1 +{a,c} c 2 +{d,e} d 1 +{d,e} e 2 + statement ok drop materialized view mv; From 9364c06bc64333e5e919049fb3ea061fafd84cd0 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 14 Sep 2023 11:16:59 +0800 Subject: [PATCH 8/9] add rowsort --- e2e_test/batch/basic/unnest.slt.part | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/e2e_test/batch/basic/unnest.slt.part b/e2e_test/batch/basic/unnest.slt.part index 04474a2b6bffe..0807637c88e2d 100644 --- a/e2e_test/batch/basic/unnest.slt.part +++ b/e2e_test/batch/basic/unnest.slt.part @@ -107,14 +107,14 @@ create table t(arr varchar[]); statement ok insert into t values (Array['a','b', 'c']), (Array['d','e']); -query I +query I rowsort select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num); ---- -{a,b,c} c 3 -{a,b,c} b 2 {a,b,c} a 1 -{d,e} e 2 +{a,b,c} b 2 +{a,b,c} c 3 {d,e} d 1 +{d,e} e 2 statement ok drop table t; From da6c2cbd6f3919d4d91121042316970871f38147 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 14 Sep 2023 11:19:41 +0800 Subject: [PATCH 9/9] use alias columns explicitlity --- .../tests/testdata/input/with_ordinality.yaml | 7 ++++++ .../testdata/output/with_ordinality.yaml | 25 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml index 4c21c47e0d9b3..08dfafa2c7310 100644 --- a/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml +++ b/src/frontend/planner_test/tests/testdata/input/with_ordinality.yaml @@ -29,6 +29,13 @@ expected_outputs: - batch_plan - stream_plan +- name: use alias columns explicitlity + sql: | + create table t(x int , arr int[]); + select x, arr, a, ord from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + expected_outputs: + - batch_plan + - stream_plan - sql: | create table t(x int , arr int[]); select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar); diff --git a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml index 892cf136e248e..208eec58ecdba 100644 --- a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml +++ b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml @@ -106,6 +106,31 @@ └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- name: use alias columns explicitlity + sql: | + create table t(x int , arr int[]); + select x, arr, a, ord from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord); + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1] } + └─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: all } + ├─BatchExchange { order: [], dist: HashShard(t.arr) } + │ └─BatchScan { table: t, columns: [t.x, t.arr], distribution: SomeShard } + └─BatchProjectSet { select_list: [$0, Unnest($0)] } + └─BatchHashAgg { group_key: [t.arr], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(t.arr) } + └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, arr, a, ord, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, t.arr, projected_row_id] } + └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] } + ├─StreamExchange { dist: HashShard(t.arr) } + │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProjectSet { select_list: [$0, Unnest($0)] } + └─StreamProject { exprs: [t.arr] } + └─StreamHashAgg { group_key: [t.arr], aggs: [count] } + └─StreamExchange { dist: HashShard(t.arr) } + └─StreamTableScan { table: t, columns: [t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - sql: | create table t(x int , arr int[]); select * from t cross join unnest(arr) WITH ORDINALITY as foo(a,ord,bar);