Skip to content

Commit

Permalink
feat(expr): add jsonb_populate_record(set) function (#13421)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Apr 16, 2024
1 parent 546773a commit 4ba80c9
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 33 deletions.
4 changes: 4 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ message ExprNode {
JSONB_PATH_MATCH = 621;
JSONB_PATH_QUERY_ARRAY = 622;
JSONB_PATH_QUERY_FIRST = 623;
JSONB_POPULATE_RECORD = 629;
JSONB_TO_RECORD = 630;

// Non-pure functions below (> 1000)
// ------------------------
Expand Down Expand Up @@ -328,6 +330,8 @@ message TableFunction {
JSONB_EACH_TEXT = 13;
JSONB_OBJECT_KEYS = 14;
JSONB_PATH_QUERY = 15;
JSONB_POPULATE_RECORDSET = 16;
JSONB_TO_RECORDSET = 17;
// User defined table function
UDTF = 100;
}
Expand Down
108 changes: 106 additions & 2 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use bytes::Buf;
use jsonbb::{Value, ValueRef};
use risingwave_common_estimate_size::EstimateSize;

use crate::types::{Scalar, ScalarRef};
use super::{Datum, IntoOrdered, ListValue, ScalarImpl, StructRef, ToOwnedDatum, F64};
use crate::types::{DataType, Scalar, ScalarRef, StructType, StructValue};
use crate::util::iter_util::ZipEqDebug;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct JsonbVal(pub(crate) Value);
Expand Down Expand Up @@ -297,11 +299,12 @@ impl<'a> JsonbRef<'a> {
///
/// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good
/// interoperability. We do not support arbitrary precision like PostgreSQL `numeric` right now.
pub fn as_number(&self) -> Result<f64, String> {
pub fn as_number(&self) -> Result<F64, String> {
self.0
.as_number()
.ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
.as_f64()
.map(|f| f.into_ordered())
.ok_or_else(|| "jsonb number out of range".into())
}

Expand Down Expand Up @@ -380,6 +383,107 @@ impl<'a> JsonbRef<'a> {
self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
}

/// Convert the jsonb value to a datum.
pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
if !matches!(
ty,
DataType::Jsonb
| DataType::Boolean
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
| DataType::Varchar
| DataType::List(_)
| DataType::Struct(_)
) {
return Err(format!("cannot cast jsonb to {ty}"));
}
if self.0.as_null().is_some() {
return Ok(None);
}
Ok(Some(match ty {
DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
DataType::Boolean => ScalarImpl::Bool(self.as_bool()?),
DataType::Int16 => ScalarImpl::Int16(self.as_number()?.try_into()?),
DataType::Int32 => ScalarImpl::Int32(self.as_number()?.try_into()?),
DataType::Int64 => ScalarImpl::Int64(self.as_number()?.try_into()?),
DataType::Float32 => ScalarImpl::Float32(self.as_number()?.try_into()?),
DataType::Float64 => ScalarImpl::Float64(self.as_number()?),
DataType::Varchar => ScalarImpl::Utf8(self.force_string().into()),
DataType::List(t) => ScalarImpl::List(self.to_list(t)?),
DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
_ => unreachable!(),
}))
}

/// Convert the jsonb value to a list value.
pub fn to_list(self, elem_type: &DataType) -> Result<ListValue, String> {
let array = self
.0
.as_array()
.ok_or_else(|| format!("expected JSON array, but found {self}"))?;
let mut builder = elem_type.create_array_builder(array.len());
for v in array.iter() {
builder.append(Self(v).to_datum(elem_type)?);
}
Ok(ListValue::new(builder.finish()))
}

/// Convert the jsonb value to a struct value.
pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
let object = self.0.as_object().ok_or_else(|| {
format!(
"cannot call populate_composite on a jsonb {}",
self.type_name()
)
})?;
let mut fields = Vec::with_capacity(ty.len());
for (name, ty) in ty.iter() {
let datum = match object.get(name) {
Some(v) => Self(v).to_datum(ty)?,
None => None,
};
fields.push(datum);
}
Ok(StructValue::new(fields))
}

/// Expands the top-level JSON object to a row having the struct type of the `base` argument.
pub fn populate_struct(
self,
ty: &StructType,
base: Option<StructRef<'_>>,
) -> Result<StructValue, String> {
let Some(base) = base else {
return self.to_struct(ty);
};
let object = self.0.as_object().ok_or_else(|| {
format!(
"cannot call populate_composite on a jsonb {}",
self.type_name()
)
})?;
let mut fields = Vec::with_capacity(ty.len());
for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
let datum = match object.get(name) {
Some(v) => match ty {
// recursively populate the nested struct
DataType::Struct(s) => Some(
Self(v)
.populate_struct(s, base_field.map(|s| s.into_struct()))?
.into(),
),
_ => Self(v).to_datum(ty)?,
},
None => base_field.to_owned_datum(),
};
fields.push(datum);
}
Ok(StructValue::new(fields))
}

/// Returns the capacity of the underlying buffer.
pub fn capacity(self) -> usize {
self.0.capacity()
Expand Down
3 changes: 1 addition & 2 deletions src/expr/impl/src/scalar/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use risingwave_common::array::{ArrayImpl, DataChunk, ListRef, ListValue, StructRef, StructValue};
use risingwave_common::cast;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{Int256, IntoOrdered, JsonbRef, ToText, F64};
use risingwave_common::types::{Int256, JsonbRef, ToText, F64};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::{build_func, Context, ExpressionBoxExt, InputRefExpression};
use risingwave_expr::{function, ExprError, Result};
Expand Down Expand Up @@ -79,7 +79,6 @@ pub fn jsonb_to_bool(v: JsonbRef<'_>) -> Result<bool> {
pub fn jsonb_to_number<T: TryFrom<F64>>(v: JsonbRef<'_>) -> Result<T> {
v.as_number()
.map_err(|e| ExprError::Parse(e.into()))?
.into_ordered()
.try_into()
.map_err(|_| ExprError::NumericOutOfRange)
}
Expand Down
145 changes: 145 additions & 0 deletions src/expr/impl/src/scalar/jsonb_record.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{JsonbRef, StructRef, StructValue};
use risingwave_expr::expr::Context;
use risingwave_expr::{function, ExprError, Result};

/// Expands the top-level JSON object to a row having the composite type of the base argument.
/// The JSON object is scanned for fields whose names match column names of the output row type,
/// and their values are inserted into those columns of the output. (Fields that do not correspond
/// to any output column name are ignored.) In typical use, the value of base is just NULL, which
/// means that any output columns that do not match any object field will be filled with nulls.
/// However, if base isn't NULL then the values it contains will be used for unmatched columns.
///
/// # Examples
///
/// ```slt
/// query ITT
/// select (jsonb_populate_record(
/// null::struct<a int, b text[], c struct<d int, e text>>,
/// '{"a": 1, "b": ["2", "a b"], "c": {"d": 4, "e": "a b c"}, "x": "foo"}'
/// )).*;
/// ----
/// 1 {2,"a b"} (4,"a b c")
///
/// query ITT
/// select (jsonb_populate_record(
/// row(1, null, row(4, '5'))::struct<a int, b text[], c struct<d int, e text>>,
/// '{"b": ["2", "a b"], "c": {"e": "a b c"}, "x": "foo"}'
/// )).*;
/// ----
/// 1 {2,"a b"} (4,"a b c")
/// ```
#[function("jsonb_populate_record(struct, jsonb) -> struct")]
fn jsonb_populate_record(
base: Option<StructRef<'_>>,
jsonb: JsonbRef<'_>,
ctx: &Context,
) -> Result<StructValue> {
let output_type = ctx.return_type.as_struct();
jsonb.populate_struct(output_type, base).map_err(parse_err)
}

/// Expands the top-level JSON array of objects to a set of rows having the composite type of the
/// base argument. Each element of the JSON array is processed as described above for
/// `jsonb_populate_record`.
///
/// # Examples
///
/// ```slt
/// query II
/// select * from jsonb_populate_recordset(
/// null::struct<a int, b int>,
/// '[{"a":1,"b":2}, {"a":3,"b":4}]'::jsonb
/// );
/// ----
/// 1 2
/// 3 4
///
/// query II
/// select * from jsonb_populate_recordset(
/// row(0, 0)::struct<a int, b int>,
/// '[{}, {"a":1}, {"b":2}, {"a":1,"b":2}]'::jsonb
/// );
/// ----
/// 0 0
/// 1 0
/// 0 2
/// 1 2
/// ```
#[function("jsonb_populate_recordset(struct, jsonb) -> setof struct")]
fn jsonb_populate_recordset<'a>(
base: Option<StructRef<'a>>,
jsonb: JsonbRef<'a>,
ctx: &'a Context,
) -> Result<impl Iterator<Item = Result<StructValue>> + 'a> {
let output_type = ctx.return_type.as_struct();
Ok(jsonb
.array_elements()
.map_err(parse_err)?
.map(move |elem| elem.populate_struct(output_type, base).map_err(parse_err)))
}

/// Expands the top-level JSON object to a row having the composite type defined by an AS clause.
/// The output record is filled from fields of the JSON object, in the same way as described above
/// for `jsonb_populate_record`. Since there is no input record value, unmatched columns are always
/// filled with nulls.
///
/// # Examples
///
/// // FIXME(runji): this query is blocked by parser and frontend support.
/// ```slt,ignore
/// query T
/// select * from jsonb_to_record('{"a":1,"b":[1,2,3],"c":[1,2,3],"e":"bar","r": {"a": 123, "b": "a b c"}}')
/// as x(a int, b text, c int[], d text, r struct<a int, b text>);
/// ----
/// 1 [1,2,3] {1,2,3} NULL (123,"a b c")
/// ```
#[function("jsonb_to_record(jsonb) -> struct", type_infer = "panic")]
fn jsonb_to_record(jsonb: JsonbRef<'_>, ctx: &Context) -> Result<StructValue> {
let output_type = ctx.return_type.as_struct();
jsonb.to_struct(output_type).map_err(parse_err)
}

/// Expands the top-level JSON array of objects to a set of rows having the composite type defined
/// by an AS clause. Each element of the JSON array is processed as described above for
/// `jsonb_populate_record`.
///
/// # Examples
///
/// // FIXME(runji): this query is blocked by parser and frontend support.
/// ```slt,ignore
/// query IT
/// select * from jsonb_to_recordset('[{"a":1,"b":"foo"}, {"a":"2","c":"bar"}]') as x(a int, b text);
/// ----
/// 1 foo
/// 2 NULL
/// ```
#[function("jsonb_to_recordset(jsonb) -> setof struct", type_infer = "panic")]
fn jsonb_to_recordset<'a>(
jsonb: JsonbRef<'a>,
ctx: &'a Context,
) -> Result<impl Iterator<Item = Result<StructValue>> + 'a> {
let output_type = ctx.return_type.as_struct();
Ok(jsonb
.array_elements()
.map_err(parse_err)?
.map(|elem| elem.to_struct(output_type).map_err(parse_err)))
}

/// Construct a parse error from String.
fn parse_err(s: String) -> ExprError {
ExprError::Parse(s.into())
}
1 change: 1 addition & 0 deletions src/expr/impl/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod jsonb_delete;
mod jsonb_info;
mod jsonb_object;
mod jsonb_path;
mod jsonb_record;
mod length;
mod lower;
mod make_time;
Expand Down
Loading

0 comments on commit 4ba80c9

Please sign in to comment.