Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(expr): add _pg_expandarray function #12448

Merged
merged 6 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions e2e_test/batch/catalog/metabase.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
query
SELECT
NULL AS TABLE_CAT,
n.nspname AS TABLE_SCHEM,
ct.relname AS TABLE_NAME,
a.attname AS COLUMN_NAME,
(
information_schema._pg_expandarray(i.indkey)
).n AS KEY_SEQ,
ci.relname AS PK_NAME,
information_schema._pg_expandarray(i.indkey) AS KEYS,
a.attnum AS A_ATTNUM
FROM
pg_catalog.pg_class ct
JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid)
JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid)
JOIN pg_catalog.pg_index i ON (a.attrelid = i.indrelid)
JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid)
WHERE
true
AND n.nspname = 'public'
AND ct.relname = 'sentences'
AND i.indisprimary
----
39 changes: 39 additions & 0 deletions e2e_test/ddl/search_path.slt
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,44 @@ drop schema search_path_test1;
statement ok
drop schema search_path_test2;

# Schema for functions https://github.com/risingwavelabs/risingwave/issues/12422

query TI
select * from information_schema._pg_expandarray(Array['a','b','c'])
----
a 1
b 2
c 3

# FIXME: This should not be available since information_schema is not in the search path
query TI
select * from _pg_expandarray(Array['a','b','c'])
----
a 1
b 2
c 3


statement ok
set search_path to information_schema;

query TI
select * from _pg_expandarray(Array['a','b','c'])
----
a 1
b 2
c 3

# built-in functions (pg_catalog) are always available
query I
select abs(1)
----
1

query I
select pg_catalog.abs(1)
----
1

statement ok
set search_path to "$user", public;
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ message TableFunction {
REGEXP_MATCHES = 3;
RANGE = 4;
GENERATE_SUBSCRIPTS = 5;
_PG_EXPANDARRAY = 6;
// Jsonb functions
JSONB_ARRAY_ELEMENTS = 10;
JSONB_ARRAY_ELEMENTS_TEXT = 11;
Expand Down
18 changes: 15 additions & 3 deletions src/expr/macro/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,15 +839,26 @@ impl FunctionAttr {
.map(|i| quote! { self.return_type.as_struct().types().nth(#i).unwrap().clone() })
.collect()
};
#[allow(clippy::disallowed_methods)]
let optioned_outputs = user_fn
.core_return_type
.split(',')
.map(|t| t.contains("Option"))
// example: "(Option<&str>, i32)" => [true, false]
.zip(&outputs)
.map(|(optional, o)| match optional {
false => quote! { Some(#o.as_scalar_ref()) },
true => quote! { #o.map(|o| o.as_scalar_ref()) },
})
.collect_vec();
let build_value_array = if return_types.len() == 1 {
quote! { let [value_array] = value_arrays; }
} else {
quote! {
let bitmap = value_arrays[0].null_bitmap().clone();
let value_array = StructArray::new(
self.return_type.as_struct().clone(),
value_arrays.to_vec(),
bitmap,
Bitmap::ones(len),
).into_ref();
}
};
Expand Down Expand Up @@ -938,11 +949,12 @@ impl FunctionAttr {
for output in #iter {
index_builder.append(Some(i as i32));
match #output {
Some((#(#outputs),*)) => { #(#builders.append(Some(#outputs.as_scalar_ref()));)* }
Some((#(#outputs),*)) => { #(#builders.append(#optioned_outputs);)* }
None => { #(#builders.append_null();)* }
}

if index_builder.len() == self.chunk_size {
let len = index_builder.len();
let index_array = std::mem::replace(&mut index_builder, I32ArrayBuilder::new(self.chunk_size)).finish().into_ref();
let value_arrays = [#(std::mem::replace(&mut #builders, #builder_types::with_type(self.chunk_size, #return_types)).finish().into_ref()),*];
#build_value_array
Expand Down
4 changes: 2 additions & 2 deletions src/expr/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,9 @@ struct UserFunctionAttr {
async_: bool,
/// Whether contains argument `&Context`.
context: bool,
/// The last argument type is `&mut dyn Write`.
/// Whether contains argument `&mut impl Write`.
write: bool,
/// The last argument type is `retract: bool`.
/// Whether the last argument type is `retract: bool`.
retract: bool,
/// The argument type are `Option`s.
arg_option: bool,
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod empty;
mod generate_series;
mod generate_subscripts;
mod jsonb;
mod pg_expandarray;
mod regexp_matches;
mod repeat;
mod unnest;
Expand Down
51 changes: 51 additions & 0 deletions src/expr/src/table_function/pg_expandarray.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{ListRef, ScalarRefImpl, StructType};
use risingwave_expr_macro::function;

use super::*;

/// Returns the input array as a set of rows with an index.
///
/// ```slt
/// query II
/// select * from _pg_expandarray(array[1,2,null]);
/// ----
/// 1 1
/// 2 2
/// NULL 3
///
/// query TI
/// select * from _pg_expandarray(array['one', null, 'three']);
/// ----
/// one 1
/// NULL 2
/// three 3
/// ```
#[function(
"_pg_expandarray(list) -> setof struct<x any, n int32>",
type_infer = "infer_type"
)]
fn _pg_expandarray(array: ListRef<'_>) -> impl Iterator<Item = (Option<ScalarRefImpl<'_>>, i32)> {
#[allow(clippy::disallowed_methods)]
array.iter().zip(1..)
}

fn infer_type(args: &[DataType]) -> Result<DataType> {
Ok(DataType::Struct(StructType::new(vec![
("x", args[0].as_list().clone()),
("n", DataType::Int32),
])))
}
27 changes: 21 additions & 6 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::LazyLock;
use bk_tree::{metrics, BKTree};
use itertools::Itertools;
use risingwave_common::array::ListValue;
use risingwave_common::catalog::PG_CATALOG_SCHEMA_NAME;
use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_common::types::{DataType, ScalarImpl, Timestamptz};
Expand Down Expand Up @@ -59,12 +59,27 @@ impl Binder {
[schema, name] => {
let schema_name = schema.real_value();
if schema_name == PG_CATALOG_SCHEMA_NAME {
// pg_catalog is always effectively part of the search path, so we can always bind the function.
// Ref: https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-CATALOG
name.real_value()
} else if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
// definition of information_schema: https://github.com/postgres/postgres/blob/e0b2eed047df9045664da6f724cb42c10f8b12f0/src/backend/catalog/information_schema.sql
//
// FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path.
let function_name = name.real_value();
if function_name != "_pg_expandarray" {
return Err(ErrorCode::NotImplemented(
format!("Unsupported function name under schema: {}", schema_name),
12422.into(),
)
.into());
}
function_name
} else {
return Err(ErrorCode::BindError(format!(
"Unsupported function name under schema: {}",
schema_name
))
return Err(ErrorCode::NotImplemented(
format!("Unsupported function name under schema: {}", schema_name),
12422.into(),
)
.into());
}
}
Expand Down Expand Up @@ -138,7 +153,7 @@ impl Binder {
}

// user defined function
// TODO: resolve schema name
// TODO: resolve schema name https://github.com/risingwavelabs/risingwave/issues/12422
if let Ok(schema) = self.first_valid_schema() &&
let Some(func) = schema.get_function_by_name_args(
&function_name,
Expand Down