From 8e76fb81a53f8c4aedc5bb26e60d2fb7aa041292 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 20 Sep 2023 11:03:23 +0800 Subject: [PATCH 1/6] add `_pg_expandarray` function Signed-off-by: Runji Wang --- proto/expr.proto | 1 + src/expr/macro/src/gen.rs | 18 +++++-- src/expr/macro/src/lib.rs | 4 +- src/expr/src/table_function/mod.rs | 1 + src/expr/src/table_function/pg_expandarray.rs | 51 +++++++++++++++++++ 5 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 src/expr/src/table_function/pg_expandarray.rs diff --git a/proto/expr.proto b/proto/expr.proto index 7d24241850918..566929166013b 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -247,6 +247,7 @@ message TableFunction { REGEXP_MATCHES = 3; RANGE = 4; GENERATE_SUBSCRIPTS = 5; + _PG_EXPANDARRAY = 6; // Jsonb functions JSONB_ARRAY_ELEMENTS = 10; JSONB_ARRAY_ELEMENTS_TEXT = 11; diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index f940a705754a5..1319df5d5fbfa 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -839,15 +839,26 @@ impl FunctionAttr { .map(|i| quote! { self.return_type.as_struct().types().nth(#i).unwrap().clone() }) .collect() }; + #[allow(clippy::disallowed_methods)] + let optioned_outputs = user_fn + .core_return_type + .split(',') + .map(|t| t.contains("Option")) + // example: "(Option<&str>, i32)" => [true, false] + .zip(&outputs) + .map(|(optional, o)| match optional { + false => quote! { Some(#o.as_scalar_ref()) }, + true => quote! { #o.map(|o| o.as_scalar_ref()) }, + }) + .collect_vec(); let build_value_array = if return_types.len() == 1 { quote! { let [value_array] = value_arrays; } } else { quote! { - let bitmap = value_arrays[0].null_bitmap().clone(); let value_array = StructArray::new( self.return_type.as_struct().clone(), value_arrays.to_vec(), - bitmap, + Bitmap::ones(len), ).into_ref(); } }; @@ -938,11 +949,12 @@ impl FunctionAttr { for output in #iter { index_builder.append(Some(i as i32)); match #output { - Some((#(#outputs),*)) => { #(#builders.append(Some(#outputs.as_scalar_ref()));)* } + Some((#(#outputs),*)) => { #(#builders.append(#optioned_outputs);)* } None => { #(#builders.append_null();)* } } if index_builder.len() == self.chunk_size { + let len = index_builder.len(); let index_array = std::mem::replace(&mut index_builder, I32ArrayBuilder::new(self.chunk_size)).finish().into_ref(); let value_arrays = [#(std::mem::replace(&mut #builders, #builder_types::with_type(self.chunk_size, #return_types)).finish().into_ref()),*]; #build_value_array diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index c6ebffdff660f..484f093ff8883 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -510,9 +510,9 @@ struct UserFunctionAttr { async_: bool, /// Whether contains argument `&Context`. context: bool, - /// The last argument type is `&mut dyn Write`. + /// Whether contains argument `&mut impl Write`. write: bool, - /// The last argument type is `retract: bool`. + /// Whether the last argument type is `retract: bool`. retract: bool, /// The argument type are `Option`s. arg_option: bool, diff --git a/src/expr/src/table_function/mod.rs b/src/expr/src/table_function/mod.rs index 245eebd7f3720..2f64f8449a23d 100644 --- a/src/expr/src/table_function/mod.rs +++ b/src/expr/src/table_function/mod.rs @@ -31,6 +31,7 @@ mod empty; mod generate_series; mod generate_subscripts; mod jsonb; +mod pg_expandarray; mod regexp_matches; mod repeat; mod unnest; diff --git a/src/expr/src/table_function/pg_expandarray.rs b/src/expr/src/table_function/pg_expandarray.rs new file mode 100644 index 0000000000000..1e840002be3d1 --- /dev/null +++ b/src/expr/src/table_function/pg_expandarray.rs @@ -0,0 +1,51 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::{ListRef, ScalarRefImpl, StructType}; +use risingwave_expr_macro::function; + +use super::*; + +/// Returns the input array as a set of rows with an index. +/// +/// ```slt +/// query II +/// select * from _pg_expandarray(array[1,2,null]); +/// ---- +/// 1 1 +/// 2 2 +/// NULL 3 +/// +/// query TI +/// select * from _pg_expandarray(array['one', null, 'three']); +/// ---- +/// one 1 +/// NULL 2 +/// three 3 +/// ``` +#[function( + "_pg_expandarray(list) -> setof struct", + type_infer = "infer_type" +)] +fn _pg_expandarray(array: ListRef<'_>) -> impl Iterator>, i32)> { + #[allow(clippy::disallowed_methods)] + array.iter().zip(1..) +} + +fn infer_type(args: &[DataType]) -> Result { + Ok(DataType::Struct(StructType::new(vec![ + ("x", args[0].as_list().clone()), + ("n", DataType::Int32), + ]))) +} From d4a8242fd7b21a5a2132e761af6099254dd291df Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 20 Sep 2023 12:13:31 +0800 Subject: [PATCH 2/6] allow information_schema._pg_expandarray & add schema tests --- e2e_test/ddl/search_path.slt | 39 ++++++++++++++++++++++++ src/frontend/src/binder/expr/function.rs | 27 ++++++++++++---- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/e2e_test/ddl/search_path.slt b/e2e_test/ddl/search_path.slt index bbb927bdd8976..06db7f3f45c90 100644 --- a/e2e_test/ddl/search_path.slt +++ b/e2e_test/ddl/search_path.slt @@ -101,5 +101,44 @@ drop schema search_path_test1; statement ok drop schema search_path_test2; +# Schema for functions https://github.com/risingwavelabs/risingwave/issues/12422 + +query TI +select * from information_schema._pg_expandarray(Array['a','b','c']) +---- +a 1 +b 2 +c 3 + +# FIXME: This should not be available since information_schema is not in the search path +query TI +select * from _pg_expandarray(Array['a','b','c']) +---- +a 1 +b 2 +c 3 + + +statement ok +set search_path to information_schema; + +query TI +select * from _pg_expandarray(Array['a','b','c']) +---- +a 1 +b 2 +c 3 + +# built-in functions (pg_catalog) are always available +query I +select abs(1) +---- +1 + +query I +select pg_catalog.abs(1) +---- +1 + statement ok set search_path to "$user", public; diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 1146050e8dde4..904474fe0e5af 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -20,7 +20,7 @@ use std::sync::LazyLock; use bk_tree::{metrics, BKTree}; use itertools::Itertools; use risingwave_common::array::ListValue; -use risingwave_common::catalog::PG_CATALOG_SCHEMA_NAME; +use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_common::types::{DataType, ScalarImpl, Timestamptz}; @@ -59,12 +59,27 @@ impl Binder { [schema, name] => { let schema_name = schema.real_value(); if schema_name == PG_CATALOG_SCHEMA_NAME { + // pg_catalog is always effectively part of the search path, so we can always bind the function. + // Ref: https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-CATALOG name.real_value() + } else if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME { + // definition of information_schema: https://github.com/postgres/postgres/blob/e0b2eed047df9045664da6f724cb42c10f8b12f0/src/backend/catalog/information_schema.sql + // + // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path. + let function_name = name.real_value(); + if function_name != "_pg_expandarray" { + return Err(ErrorCode::NotImplemented( + format!("Unsupported function name under schema: {}", schema_name), + 12422.into(), + ) + .into()); + } + function_name } else { - return Err(ErrorCode::BindError(format!( - "Unsupported function name under schema: {}", - schema_name - )) + return Err(ErrorCode::NotImplemented( + format!("Unsupported function name under schema: {}", schema_name), + 12422.into(), + ) .into()); } } @@ -138,7 +153,7 @@ impl Binder { } // user defined function - // TODO: resolve schema name + // TODO: resolve schema name https://github.com/risingwavelabs/risingwave/issues/12422 if let Ok(schema) = self.first_valid_schema() && let Some(func) = schema.get_function_by_name_args( &function_name, From 55b39a5b7fe57f0c25263a5147676f94fd7d1358 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 20 Sep 2023 12:20:15 +0800 Subject: [PATCH 3/6] add metabase query --- e2e_test/batch/catalog/metabase.slt.part | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 e2e_test/batch/catalog/metabase.slt.part diff --git a/e2e_test/batch/catalog/metabase.slt.part b/e2e_test/batch/catalog/metabase.slt.part new file mode 100644 index 0000000000000..5363630677770 --- /dev/null +++ b/e2e_test/batch/catalog/metabase.slt.part @@ -0,0 +1,24 @@ +query +SELECT + NULL AS TABLE_CAT, + n.nspname AS TABLE_SCHEM, + ct.relname AS TABLE_NAME, + a.attname AS COLUMN_NAME, + ( + information_schema._pg_expandarray(i.indkey) + ).n AS KEY_SEQ, + ci.relname AS PK_NAME, + information_schema._pg_expandarray(i.indkey) AS KEYS, + a.attnum AS A_ATTNUM +FROM + pg_catalog.pg_class ct + JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) + JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) + JOIN pg_catalog.pg_index i ON (a.attrelid = i.indrelid) + JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) +WHERE + true + AND n.nspname = 'public' + AND ct.relname = 'sentences' + AND i.indisprimary +---- From bfe6712a2930045aa3cfb72da20eabf086cc33b0 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Wed, 20 Sep 2023 18:46:22 +0800 Subject: [PATCH 4/6] remove trailing spaces Signed-off-by: TennyZhuang --- e2e_test/batch/catalog/metabase.slt.part | 38 ++++++++++++------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/e2e_test/batch/catalog/metabase.slt.part b/e2e_test/batch/catalog/metabase.slt.part index 5363630677770..0172930f6e5f2 100644 --- a/e2e_test/batch/catalog/metabase.slt.part +++ b/e2e_test/batch/catalog/metabase.slt.part @@ -1,24 +1,24 @@ query -SELECT - NULL AS TABLE_CAT, - n.nspname AS TABLE_SCHEM, - ct.relname AS TABLE_NAME, - a.attname AS COLUMN_NAME, +SELECT + NULL AS TABLE_CAT, + n.nspname AS TABLE_SCHEM, + ct.relname AS TABLE_NAME, + a.attname AS COLUMN_NAME, ( information_schema._pg_expandarray(i.indkey) - ).n AS KEY_SEQ, - ci.relname AS PK_NAME, - information_schema._pg_expandarray(i.indkey) AS KEYS, - a.attnum AS A_ATTNUM -FROM - pg_catalog.pg_class ct - JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) - JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) - JOIN pg_catalog.pg_index i ON (a.attrelid = i.indrelid) - JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) -WHERE - true - AND n.nspname = 'public' - AND ct.relname = 'sentences' + ).n AS KEY_SEQ, + ci.relname AS PK_NAME, + information_schema._pg_expandarray(i.indkey) AS KEYS, + a.attnum AS A_ATTNUM +FROM + pg_catalog.pg_class ct + JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) + JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) + JOIN pg_catalog.pg_index i ON (a.attrelid = i.indrelid) + JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) +WHERE + true + AND n.nspname = 'public' + AND ct.relname = 'sentences' AND i.indisprimary ---- From 65414dbdaa1364f527f5ecd24953d3e62422f66f Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 20 Sep 2023 22:38:38 +0800 Subject: [PATCH 5/6] ignore UPPER_SNAKE_CASE Signed-off-by: Runji Wang --- proto/buf.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/proto/buf.yaml b/proto/buf.yaml index a2870930bb4ac..3f05dbacbb67f 100644 --- a/proto/buf.yaml +++ b/proto/buf.yaml @@ -9,6 +9,9 @@ lint: # This proto is copied from https://github.com/grpc/grpc/blob/v1.15.0/doc/health-checking.md # It violates some lint rules, so we ignore it. - health.proto + ignore_only: + ENUM_VALUE_UPPER_SNAKE_CASE: + - expr.proto enum_zero_value_suffix: UNSPECIFIED except: - ENUM_VALUE_PREFIX # Enum variant doesn't have to prefix with enum name. From 7d1b6fda603ac0102ee1d1c6b761dbf175dfa862 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Thu, 21 Sep 2023 12:23:28 +0800 Subject: [PATCH 6/6] allow comment ignores Signed-off-by: Runji Wang --- proto/buf.yaml | 4 +--- proto/expr.proto | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/proto/buf.yaml b/proto/buf.yaml index 3f05dbacbb67f..1aa31816ce0af 100644 --- a/proto/buf.yaml +++ b/proto/buf.yaml @@ -9,9 +9,7 @@ lint: # This proto is copied from https://github.com/grpc/grpc/blob/v1.15.0/doc/health-checking.md # It violates some lint rules, so we ignore it. - health.proto - ignore_only: - ENUM_VALUE_UPPER_SNAKE_CASE: - - expr.proto + allow_comment_ignores: true enum_zero_value_suffix: UNSPECIFIED except: - ENUM_VALUE_PREFIX # Enum variant doesn't have to prefix with enum name. diff --git a/proto/expr.proto b/proto/expr.proto index 566929166013b..0c7290705f826 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -247,6 +247,7 @@ message TableFunction { REGEXP_MATCHES = 3; RANGE = 4; GENERATE_SUBSCRIPTS = 5; + // buf:lint:ignore ENUM_VALUE_UPPER_SNAKE_CASE _PG_EXPANDARRAY = 6; // Jsonb functions JSONB_ARRAY_ELEMENTS = 10;