Skip to content

Commit

Permalink
feat(expr): support #> and #>> operator for extracting jsonb at a…
Browse files Browse the repository at this point in the history
… path (#13110)

Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Oct 30, 2023
1 parent 98edea6 commit e392db0
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 82 deletions.
10 changes: 7 additions & 3 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,14 @@ message ExprNode {

// Jsonb functions

// jsonb -> int, jsonb -> text, jsonb #> text[] that returns jsonb
JSONB_ACCESS_INNER = 600;
// jsonb ->> int, jsonb ->> text, jsonb #>> text[] that returns text
// jsonb -> int, jsonb -> text that returns jsonb
JSONB_ACCESS = 600;
// jsonb ->> int, jsonb ->> text that returns text
JSONB_ACCESS_STR = 601;
// jsonb #> text[] -> jsonb
JSONB_ACCESS_MULTI = 613;
// jsonb #>> text[] -> text
JSONB_ACCESS_MULTI_STR = 614;
JSONB_TYPEOF = 602;
JSONB_ARRAY_LENGTH = 603;
IS_JSON = 604;
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ impl<'a> JsonbRef<'a> {
self.0.as_null().is_some()
}

/// Returns true if this is a jsonb array.
pub fn is_array(&self) -> bool {
matches!(self.0, ValueRef::Array(_))
}

/// Returns true if this is a jsonb object.
pub fn is_object(&self) -> bool {
matches!(self.0, ValueRef::Object(_))
}

/// Returns the type name of this jsonb.
///
/// Possible values are: `null`, `boolean`, `number`, `string`, `array`, `object`.
Expand Down
142 changes: 139 additions & 3 deletions src/expr/impl/src/scalar/jsonb_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,45 @@

use std::fmt::Write;

use risingwave_common::types::JsonbRef;
use risingwave_common::types::{JsonbRef, ListRef};
use risingwave_expr::function;

#[function("jsonb_access_inner(jsonb, varchar) -> jsonb")]
/// Extracts JSON object field with the given key.
///
/// `jsonb -> text → jsonb`
///
/// # Examples
///
/// ```slt
/// query T
/// select '{"a": {"b":"foo"}}'::jsonb -> 'a';
/// ----
/// {"b": "foo"}
/// ```
#[function("jsonb_access(jsonb, varchar) -> jsonb")]
pub fn jsonb_object_field<'a>(v: JsonbRef<'a>, p: &str) -> Option<JsonbRef<'a>> {
v.access_object_field(p)
}

#[function("jsonb_access_inner(jsonb, int4) -> jsonb")]
/// Extracts n'th element of JSON array (array elements are indexed from zero,
/// but negative integers count from the end).
///
/// `jsonb -> integer → jsonb`
///
/// # Examples
///
/// ```slt
/// query T
/// select '[{"a":"foo"},{"b":"bar"},{"c":"baz"}]'::jsonb -> 2;
/// ----
/// {"c": "baz"}
///
/// query T
/// select '[{"a":"foo"},{"b":"bar"},{"c":"baz"}]'::jsonb -> -3;
/// ----
/// {"a": "foo"}
/// ```
#[function("jsonb_access(jsonb, int4) -> jsonb")]
pub fn jsonb_array_element(v: JsonbRef<'_>, p: i32) -> Option<JsonbRef<'_>> {
let idx = if p < 0 {
let Ok(len) = v.array_len() else {
Expand All @@ -39,6 +69,59 @@ pub fn jsonb_array_element(v: JsonbRef<'_>, p: i32) -> Option<JsonbRef<'_>> {
v.access_array_element(idx)
}

/// Extracts JSON sub-object at the specified path, where path elements can be either field keys or array indexes.
///
/// `jsonb #> text[] → jsonb`
///
/// # Examples
///
/// ```slt
/// query T
/// select '{"a": {"b": ["foo","bar"]}}'::jsonb #> '{a,b,1}'::text[];
/// ----
/// "bar"
///
/// query T
/// select '{"a": {"b": ["foo","bar"]}}'::jsonb #> '{a,b,null}'::text[];
/// ----
/// NULL
/// ```
#[function("jsonb_access_multi(jsonb, varchar[]) -> jsonb")]
pub fn jsonb_access_multi<'a>(v: JsonbRef<'a>, path: ListRef<'_>) -> Option<JsonbRef<'a>> {
let mut jsonb = v;
for key in path.iter() {
// return null if any element is null
let key = key?.into_utf8();
if jsonb.is_array() {
// return null if the key is not an integer
let idx = key.parse().ok()?;
jsonb = jsonb_array_element(jsonb, idx)?;
} else if jsonb.is_object() {
jsonb = jsonb_object_field(jsonb, key)?;
} else {
return None;
}
}
Some(jsonb)
}

/// Extracts JSON object field with the given key, as text.
///
/// `jsonb ->> text → text`
///
/// # Examples
///
/// ```slt
/// query T
/// select '{"a":1,"b":2}'::jsonb ->> 'b';
/// ----
/// 2
///
/// query T
/// select '{"a":1,"b":null}'::jsonb ->> 'b';
/// ----
/// NULL
/// ```
#[function("jsonb_access_str(jsonb, varchar) -> varchar")]
pub fn jsonb_object_field_str(v: JsonbRef<'_>, p: &str, writer: &mut impl Write) -> Option<()> {
let jsonb = jsonb_object_field(v, p)?;
Expand All @@ -49,6 +132,23 @@ pub fn jsonb_object_field_str(v: JsonbRef<'_>, p: &str, writer: &mut impl Write)
Some(())
}

/// Extracts n'th element of JSON array, as text.
///
/// `jsonb ->> integer → text`
///
/// # Examples
///
/// ```slt
/// query T
/// select '[1,2,3]'::jsonb ->> 2;
/// ----
/// 3
///
/// query T
/// select '[1,2,null]'::jsonb ->> 2;
/// ----
/// NULL
/// ```
#[function("jsonb_access_str(jsonb, int4) -> varchar")]
pub fn jsonb_array_element_str(v: JsonbRef<'_>, p: i32, writer: &mut impl Write) -> Option<()> {
let jsonb = jsonb_array_element(v, p)?;
Expand All @@ -58,3 +158,39 @@ pub fn jsonb_array_element_str(v: JsonbRef<'_>, p: i32, writer: &mut impl Write)
jsonb.force_str(writer).unwrap();
Some(())
}

/// Extracts JSON sub-object at the specified path as text.
///
/// `jsonb #>> text[] → text`
///
/// # Examples
///
/// ```slt
/// query T
/// select '{"a": {"b": ["foo","bar"]}}'::jsonb #>> '{a,b,1}'::text[];
/// ----
/// bar
///
/// query T
/// select '{"a": {"b": ["foo",null]}}'::jsonb #>> '{a,b,1}'::text[];
/// ----
/// NULL
///
/// query T
/// select '{"a": {"b": ["foo","bar"]}}'::jsonb #>> '{a,b,null}'::text[];
/// ----
/// NULL
/// ```
#[function("jsonb_access_multi_str(jsonb, varchar[]) -> varchar")]
pub fn jsonb_access_multi_str(
v: JsonbRef<'_>,
path: ListRef<'_>,
writer: &mut impl Write,
) -> Option<()> {
let jsonb = jsonb_access_multi(v, path)?;
if jsonb.is_jsonb_null() {
return None;
}
jsonb.force_str(writer).unwrap();
Some(())
}
12 changes: 6 additions & 6 deletions src/frontend/planner_test/tests/testdata/output/cse_expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
select v1->'a'->'c' x, v1->'a'->'b' y from t;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [JsonbAccessInner($expr1, 'c':Varchar) as $expr2, JsonbAccessInner($expr1, 'b':Varchar) as $expr3] }
└─BatchProject { exprs: [t.v1, JsonbAccessInner(t.v1, 'a':Varchar) as $expr1] }
└─BatchProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3] }
└─BatchProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [JsonbAccessInner($expr1, 'c':Varchar) as $expr2, JsonbAccessInner($expr1, 'b':Varchar) as $expr3, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccessInner(t.v1, 'a':Varchar) as $expr1, t._row_id] }
└─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- name: Common sub expression extract2
sql: |
Expand All @@ -20,12 +20,12 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [$expr1, $expr1] }
└─BatchProject { exprs: [t.v1, JsonbAccessInner(JsonbAccessInner(t.v1, 'a':Varchar), 'c':Varchar) as $expr1] }
└─BatchProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [$expr1, $expr1, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccessInner(JsonbAccessInner(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- name: Common sub expression shouldn't extract impure function
sql: |
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/binder/expr/binary_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ impl Binder {
BinaryOperator::PGBitwiseXor => ExprType::BitwiseXor,
BinaryOperator::PGBitwiseShiftLeft => ExprType::BitwiseShiftLeft,
BinaryOperator::PGBitwiseShiftRight => ExprType::BitwiseShiftRight,
BinaryOperator::Arrow => ExprType::JsonbAccessInner,
BinaryOperator::Arrow => ExprType::JsonbAccess,
BinaryOperator::LongArrow => ExprType::JsonbAccessStr,
BinaryOperator::HashArrow => ExprType::JsonbAccessMulti,
BinaryOperator::HashLongArrow => ExprType::JsonbAccessMultiStr,
BinaryOperator::Prefix => ExprType::StartsWith,
BinaryOperator::Contains => ExprType::JsonbContains,
BinaryOperator::Contained => ExprType::JsonbContained,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,8 @@ impl Binder {
// int256
("hex_to_int256", raw_call(ExprType::HexToInt256)),
// jsonb
("jsonb_object_field", raw_call(ExprType::JsonbAccessInner)),
("jsonb_array_element", raw_call(ExprType::JsonbAccessInner)),
("jsonb_object_field", raw_call(ExprType::JsonbAccess)),
("jsonb_array_element", raw_call(ExprType::JsonbAccess)),
("jsonb_object_field_text", raw_call(ExprType::JsonbAccessStr)),
("jsonb_array_element_text", raw_call(ExprType::JsonbAccessStr)),
("jsonb_typeof", raw_call(ExprType::JsonbTypeof)),
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ impl ExprVisitor for ImpureAnalyzer {
| expr_node::Type::ArrayPosition
| expr_node::Type::HexToInt256
| expr_node::Type::JsonbCat
| expr_node::Type::JsonbAccessInner
| expr_node::Type::JsonbAccess
| expr_node::Type::JsonbAccessStr
| expr_node::Type::JsonbAccessMulti
| expr_node::Type::JsonbAccessMultiStr
| expr_node::Type::JsonbTypeof
| expr_node::Type::JsonbArrayLength
| expr_node::Type::JsonbObject
Expand Down
Loading

0 comments on commit e392db0

Please sign in to comment.