diff --git a/proto/expr.proto b/proto/expr.proto index 802397f0456f9..4c55ee2b614b9 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -274,6 +274,8 @@ message ExprNode { JSONB_PATH_MATCH = 621; JSONB_PATH_QUERY_ARRAY = 622; JSONB_PATH_QUERY_FIRST = 623; + JSONB_POPULATE_RECORD = 629; + JSONB_TO_RECORD = 630; // Non-pure functions below (> 1000) // ------------------------ @@ -328,6 +330,8 @@ message TableFunction { JSONB_EACH_TEXT = 13; JSONB_OBJECT_KEYS = 14; JSONB_PATH_QUERY = 15; + JSONB_POPULATE_RECORDSET = 16; + JSONB_TO_RECORDSET = 17; // User defined table function UDTF = 100; } diff --git a/src/common/src/types/jsonb.rs b/src/common/src/types/jsonb.rs index 6b48893ac9ede..522ec788d8646 100644 --- a/src/common/src/types/jsonb.rs +++ b/src/common/src/types/jsonb.rs @@ -19,7 +19,9 @@ use bytes::Buf; use jsonbb::{Value, ValueRef}; use risingwave_common_estimate_size::EstimateSize; -use crate::types::{Scalar, ScalarRef}; +use super::{Datum, IntoOrdered, ListValue, ScalarImpl, StructRef, ToOwnedDatum, F64}; +use crate::types::{DataType, Scalar, ScalarRef, StructType, StructValue}; +use crate::util::iter_util::ZipEqDebug; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct JsonbVal(pub(crate) Value); @@ -297,11 +299,12 @@ impl<'a> JsonbRef<'a> { /// /// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good /// interoperability. We do not support arbitrary precision like PostgreSQL `numeric` right now. - pub fn as_number(&self) -> Result { + pub fn as_number(&self) -> Result { self.0 .as_number() .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))? .as_f64() + .map(|f| f.into_ordered()) .ok_or_else(|| "jsonb number out of range".into()) } @@ -380,6 +383,107 @@ impl<'a> JsonbRef<'a> { self.0.serialize(&mut ser).map_err(|_| std::fmt::Error) } + /// Convert the jsonb value to a datum. + pub fn to_datum(self, ty: &DataType) -> Result { + if !matches!( + ty, + DataType::Jsonb + | DataType::Boolean + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Varchar + | DataType::List(_) + | DataType::Struct(_) + ) { + return Err(format!("cannot cast jsonb to {ty}")); + } + if self.0.as_null().is_some() { + return Ok(None); + } + Ok(Some(match ty { + DataType::Jsonb => ScalarImpl::Jsonb(self.into()), + DataType::Boolean => ScalarImpl::Bool(self.as_bool()?), + DataType::Int16 => ScalarImpl::Int16(self.as_number()?.try_into()?), + DataType::Int32 => ScalarImpl::Int32(self.as_number()?.try_into()?), + DataType::Int64 => ScalarImpl::Int64(self.as_number()?.try_into()?), + DataType::Float32 => ScalarImpl::Float32(self.as_number()?.try_into()?), + DataType::Float64 => ScalarImpl::Float64(self.as_number()?), + DataType::Varchar => ScalarImpl::Utf8(self.force_string().into()), + DataType::List(t) => ScalarImpl::List(self.to_list(t)?), + DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?), + _ => unreachable!(), + })) + } + + /// Convert the jsonb value to a list value. + pub fn to_list(self, elem_type: &DataType) -> Result { + let array = self + .0 + .as_array() + .ok_or_else(|| format!("expected JSON array, but found {self}"))?; + let mut builder = elem_type.create_array_builder(array.len()); + for v in array.iter() { + builder.append(Self(v).to_datum(elem_type)?); + } + Ok(ListValue::new(builder.finish())) + } + + /// Convert the jsonb value to a struct value. + pub fn to_struct(self, ty: &StructType) -> Result { + let object = self.0.as_object().ok_or_else(|| { + format!( + "cannot call populate_composite on a jsonb {}", + self.type_name() + ) + })?; + let mut fields = Vec::with_capacity(ty.len()); + for (name, ty) in ty.iter() { + let datum = match object.get(name) { + Some(v) => Self(v).to_datum(ty)?, + None => None, + }; + fields.push(datum); + } + Ok(StructValue::new(fields)) + } + + /// Expands the top-level JSON object to a row having the struct type of the `base` argument. + pub fn populate_struct( + self, + ty: &StructType, + base: Option>, + ) -> Result { + let Some(base) = base else { + return self.to_struct(ty); + }; + let object = self.0.as_object().ok_or_else(|| { + format!( + "cannot call populate_composite on a jsonb {}", + self.type_name() + ) + })?; + let mut fields = Vec::with_capacity(ty.len()); + for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) { + let datum = match object.get(name) { + Some(v) => match ty { + // recursively populate the nested struct + DataType::Struct(s) => Some( + Self(v) + .populate_struct(s, base_field.map(|s| s.into_struct()))? + .into(), + ), + _ => Self(v).to_datum(ty)?, + }, + None => base_field.to_owned_datum(), + }; + fields.push(datum); + } + Ok(StructValue::new(fields)) + } + /// Returns the capacity of the underlying buffer. pub fn capacity(self) -> usize { self.0.capacity() diff --git a/src/expr/impl/src/scalar/cast.rs b/src/expr/impl/src/scalar/cast.rs index bff225ad687ff..cb990bdcec6a7 100644 --- a/src/expr/impl/src/scalar/cast.rs +++ b/src/expr/impl/src/scalar/cast.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use risingwave_common::array::{ArrayImpl, DataChunk, ListRef, ListValue, StructRef, StructValue}; use risingwave_common::cast; use risingwave_common::row::OwnedRow; -use risingwave_common::types::{Int256, IntoOrdered, JsonbRef, ToText, F64}; +use risingwave_common::types::{Int256, JsonbRef, ToText, F64}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::expr::{build_func, Context, ExpressionBoxExt, InputRefExpression}; use risingwave_expr::{function, ExprError, Result}; @@ -79,7 +79,6 @@ pub fn jsonb_to_bool(v: JsonbRef<'_>) -> Result { pub fn jsonb_to_number>(v: JsonbRef<'_>) -> Result { v.as_number() .map_err(|e| ExprError::Parse(e.into()))? - .into_ordered() .try_into() .map_err(|_| ExprError::NumericOutOfRange) } diff --git a/src/expr/impl/src/scalar/jsonb_record.rs b/src/expr/impl/src/scalar/jsonb_record.rs new file mode 100644 index 0000000000000..fcc9606897ab7 --- /dev/null +++ b/src/expr/impl/src/scalar/jsonb_record.rs @@ -0,0 +1,145 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::{JsonbRef, StructRef, StructValue}; +use risingwave_expr::expr::Context; +use risingwave_expr::{function, ExprError, Result}; + +/// Expands the top-level JSON object to a row having the composite type of the base argument. +/// The JSON object is scanned for fields whose names match column names of the output row type, +/// and their values are inserted into those columns of the output. (Fields that do not correspond +/// to any output column name are ignored.) In typical use, the value of base is just NULL, which +/// means that any output columns that do not match any object field will be filled with nulls. +/// However, if base isn't NULL then the values it contains will be used for unmatched columns. +/// +/// # Examples +/// +/// ```slt +/// query ITT +/// select (jsonb_populate_record( +/// null::struct>, +/// '{"a": 1, "b": ["2", "a b"], "c": {"d": 4, "e": "a b c"}, "x": "foo"}' +/// )).*; +/// ---- +/// 1 {2,"a b"} (4,"a b c") +/// +/// query ITT +/// select (jsonb_populate_record( +/// row(1, null, row(4, '5'))::struct>, +/// '{"b": ["2", "a b"], "c": {"e": "a b c"}, "x": "foo"}' +/// )).*; +/// ---- +/// 1 {2,"a b"} (4,"a b c") +/// ``` +#[function("jsonb_populate_record(struct, jsonb) -> struct")] +fn jsonb_populate_record( + base: Option>, + jsonb: JsonbRef<'_>, + ctx: &Context, +) -> Result { + let output_type = ctx.return_type.as_struct(); + jsonb.populate_struct(output_type, base).map_err(parse_err) +} + +/// Expands the top-level JSON array of objects to a set of rows having the composite type of the +/// base argument. Each element of the JSON array is processed as described above for +/// `jsonb_populate_record`. +/// +/// # Examples +/// +/// ```slt +/// query II +/// select * from jsonb_populate_recordset( +/// null::struct, +/// '[{"a":1,"b":2}, {"a":3,"b":4}]'::jsonb +/// ); +/// ---- +/// 1 2 +/// 3 4 +/// +/// query II +/// select * from jsonb_populate_recordset( +/// row(0, 0)::struct, +/// '[{}, {"a":1}, {"b":2}, {"a":1,"b":2}]'::jsonb +/// ); +/// ---- +/// 0 0 +/// 1 0 +/// 0 2 +/// 1 2 +/// ``` +#[function("jsonb_populate_recordset(struct, jsonb) -> setof struct")] +fn jsonb_populate_recordset<'a>( + base: Option>, + jsonb: JsonbRef<'a>, + ctx: &'a Context, +) -> Result> + 'a> { + let output_type = ctx.return_type.as_struct(); + Ok(jsonb + .array_elements() + .map_err(parse_err)? + .map(move |elem| elem.populate_struct(output_type, base).map_err(parse_err))) +} + +/// Expands the top-level JSON object to a row having the composite type defined by an AS clause. +/// The output record is filled from fields of the JSON object, in the same way as described above +/// for `jsonb_populate_record`. Since there is no input record value, unmatched columns are always +/// filled with nulls. +/// +/// # Examples +/// +/// // FIXME(runji): this query is blocked by parser and frontend support. +/// ```slt,ignore +/// query T +/// select * from jsonb_to_record('{"a":1,"b":[1,2,3],"c":[1,2,3],"e":"bar","r": {"a": 123, "b": "a b c"}}') +/// as x(a int, b text, c int[], d text, r struct); +/// ---- +/// 1 [1,2,3] {1,2,3} NULL (123,"a b c") +/// ``` +#[function("jsonb_to_record(jsonb) -> struct", type_infer = "panic")] +fn jsonb_to_record(jsonb: JsonbRef<'_>, ctx: &Context) -> Result { + let output_type = ctx.return_type.as_struct(); + jsonb.to_struct(output_type).map_err(parse_err) +} + +/// Expands the top-level JSON array of objects to a set of rows having the composite type defined +/// by an AS clause. Each element of the JSON array is processed as described above for +/// `jsonb_populate_record`. +/// +/// # Examples +/// +/// // FIXME(runji): this query is blocked by parser and frontend support. +/// ```slt,ignore +/// query IT +/// select * from jsonb_to_recordset('[{"a":1,"b":"foo"}, {"a":"2","c":"bar"}]') as x(a int, b text); +/// ---- +/// 1 foo +/// 2 NULL +/// ``` +#[function("jsonb_to_recordset(jsonb) -> setof struct", type_infer = "panic")] +fn jsonb_to_recordset<'a>( + jsonb: JsonbRef<'a>, + ctx: &'a Context, +) -> Result> + 'a> { + let output_type = ctx.return_type.as_struct(); + Ok(jsonb + .array_elements() + .map_err(parse_err)? + .map(|elem| elem.to_struct(output_type).map_err(parse_err))) +} + +/// Construct a parse error from String. +fn parse_err(s: String) -> ExprError { + ExprError::Parse(s.into()) +} diff --git a/src/expr/impl/src/scalar/mod.rs b/src/expr/impl/src/scalar/mod.rs index 27135a739763b..d2f528ece0c6e 100644 --- a/src/expr/impl/src/scalar/mod.rs +++ b/src/expr/impl/src/scalar/mod.rs @@ -57,6 +57,7 @@ mod jsonb_delete; mod jsonb_info; mod jsonb_object; mod jsonb_path; +mod jsonb_record; mod length; mod lower; mod make_time; diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 684a0e2159751..ca3203033a63e 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -1037,10 +1037,10 @@ impl FunctionAttr { .map(|ty| format_ident!("{}Builder", types::array_type(ty))) .collect_vec(); let return_types = if return_types.len() == 1 { - vec![quote! { self.return_type.clone() }] + vec![quote! { self.context.return_type.clone() }] } else { (0..return_types.len()) - .map(|i| quote! { self.return_type.as_struct().types().nth(#i).unwrap().clone() }) + .map(|i| quote! { self.context.return_type.as_struct().types().nth(#i).unwrap().clone() }) .collect() }; #[allow(clippy::disallowed_methods)] @@ -1060,14 +1060,15 @@ impl FunctionAttr { } else { quote! { let value_array = StructArray::new( - self.return_type.as_struct().clone(), + self.context.return_type.as_struct().clone(), value_arrays.to_vec(), Bitmap::ones(len), ).into_ref(); } }; + let context = user_fn.context.then(|| quote! { &self.context, }); let prebuilt_arg = match &self.prebuild { - Some(_) => quote! { &self.prebuilt_arg }, + Some(_) => quote! { &self.prebuilt_arg, }, None => quote! {}, }; let prebuilt_arg_type = match &self.prebuild { @@ -1081,12 +1082,32 @@ impl FunctionAttr { .expect("invalid prebuild syntax"), None => quote! { () }, }; - let iter = match user_fn.return_type_kind { - ReturnTypeKind::T => quote! { iter }, - ReturnTypeKind::Option => quote! { if let Some(it) = iter { it } else { continue; } }, - ReturnTypeKind::Result => quote! { iter? }, + let iter = quote! { #fn_name(#(#inputs,)* #prebuilt_arg #context) }; + let mut iter = match user_fn.return_type_kind { + ReturnTypeKind::T => quote! { #iter }, + ReturnTypeKind::Result => quote! { #iter? }, + ReturnTypeKind::Option => quote! { if let Some(it) = #iter { it } else { continue; } }, ReturnTypeKind::ResultOption => { - quote! { if let Some(it) = iter? { it } else { continue; } } + quote! { if let Some(it) = #iter? { it } else { continue; } } + } + }; + // if user function accepts non-option arguments, we assume the function + // returns empty on null input, so we need to unwrap the inputs before calling. + #[allow(clippy::disallowed_methods)] // allow zip + let some_inputs = inputs + .iter() + .zip(user_fn.args_option.iter()) + .map(|(input, opt)| { + if *opt { + quote! { #input } + } else { + quote! { Some(#input) } + } + }); + iter = quote! { + match (#(#inputs,)*) { + (#(#some_inputs,)*) => #iter, + _ => continue, } }; let iterator_item_type = user_fn.iterator_item_kind.clone().ok_or_else(|| { @@ -1108,11 +1129,18 @@ impl FunctionAttr { use risingwave_common::types::*; use risingwave_common::buffer::Bitmap; use risingwave_common::util::iter_util::ZipEqFast; - use risingwave_expr::expr::BoxedExpression; + use risingwave_expr::expr::{BoxedExpression, Context}; use risingwave_expr::{Result, ExprError}; use risingwave_expr::codegen::*; risingwave_expr::ensure!(children.len() == #num_args); + + let context = Context { + return_type: return_type.clone(), + arg_types: children.iter().map(|c| c.return_type()).collect(), + variadic: false, + }; + let mut iter = children.into_iter(); #(let #all_child = iter.next().unwrap();)* #( @@ -1126,7 +1154,7 @@ impl FunctionAttr { #[derive(Debug)] struct #struct_name { - return_type: DataType, + context: Context, chunk_size: usize, #(#child: BoxedExpression,)* prebuilt_arg: #prebuilt_arg_type, @@ -1134,7 +1162,7 @@ impl FunctionAttr { #[async_trait] impl risingwave_expr::table_function::TableFunction for #struct_name { fn return_type(&self) -> DataType { - self.return_type.clone() + self.context.return_type.clone() } async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result> { self.eval_inner(input) @@ -1151,23 +1179,23 @@ impl FunctionAttr { let mut index_builder = I32ArrayBuilder::new(self.chunk_size); #(let mut #builders = #builder_types::with_type(self.chunk_size, #return_types);)* - for (i, (row, visible)) in multizip((#(#arrays.iter(),)*)).zip_eq_fast(input.visibility().iter()).enumerate() { - if let (#(Some(#inputs),)*) = row && visible { - let iter = #fn_name(#(#inputs,)* #prebuilt_arg); - for output in #iter { - index_builder.append(Some(i as i32)); - match #output { - Some((#(#outputs),*)) => { #(#builders.append(#optioned_outputs);)* } - None => { #(#builders.append_null();)* } - } + for (i, ((#(#inputs,)*), visible)) in multizip((#(#arrays.iter(),)*)).zip_eq_fast(input.visibility().iter()).enumerate() { + if !visible { + continue; + } + for output in #iter { + index_builder.append(Some(i as i32)); + match #output { + Some((#(#outputs),*)) => { #(#builders.append(#optioned_outputs);)* } + None => { #(#builders.append_null();)* } + } - if index_builder.len() == self.chunk_size { - let len = index_builder.len(); - let index_array = std::mem::replace(&mut index_builder, I32ArrayBuilder::new(self.chunk_size)).finish().into_ref(); - let value_arrays = [#(std::mem::replace(&mut #builders, #builder_types::with_type(self.chunk_size, #return_types)).finish().into_ref()),*]; - #build_value_array - yield DataChunk::new(vec![index_array, value_array], self.chunk_size); - } + if index_builder.len() == self.chunk_size { + let len = index_builder.len(); + let index_array = std::mem::replace(&mut index_builder, I32ArrayBuilder::new(self.chunk_size)).finish().into_ref(); + let value_arrays = [#(std::mem::replace(&mut #builders, #builder_types::with_type(self.chunk_size, #return_types)).finish().into_ref()),*]; + #build_value_array + yield DataChunk::new(vec![index_array, value_array], self.chunk_size); } } } @@ -1183,7 +1211,7 @@ impl FunctionAttr { } Ok(Box::new(#struct_name { - return_type, + context, chunk_size, #(#child,)* prebuilt_arg: #prebuilt_arg_value, diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 1d424f7be4f4f..1b36e9ee2fd73 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1121,6 +1121,7 @@ impl Binder { ("to_jsonb", raw_call(ExprType::ToJsonb)), ("jsonb_build_array", raw_call(ExprType::JsonbBuildArray)), ("jsonb_build_object", raw_call(ExprType::JsonbBuildObject)), + ("jsonb_populate_record", raw_call(ExprType::JsonbPopulateRecord)), ("jsonb_path_match", raw_call(ExprType::JsonbPathMatch)), ("jsonb_path_exists", raw_call(ExprType::JsonbPathExists)), ("jsonb_path_query_array", raw_call(ExprType::JsonbPathQueryArray)), diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index bc56733a4423b..310e7a9187dab 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -201,6 +201,8 @@ impl ExprVisitor for ImpureAnalyzer { | expr_node::Type::JsonbBuildArray | expr_node::Type::JsonbBuildArrayVariadic | expr_node::Type::JsonbBuildObject + | expr_node::Type::JsonbPopulateRecord + | expr_node::Type::JsonbToRecord | expr_node::Type::JsonbBuildObjectVariadic | expr_node::Type::JsonbPathExists | expr_node::Type::JsonbPathMatch diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index 79905a076e426..a3d2e0edfb15d 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -285,6 +285,8 @@ impl Strong { | ExprType::JsonbPathMatch | ExprType::JsonbPathQueryArray | ExprType::JsonbPathQueryFirst + | ExprType::JsonbPopulateRecord + | ExprType::JsonbToRecord | ExprType::Vnode | ExprType::Proctime | ExprType::PgSleep