Skip to content

Commit

Permalink
feat(common): support insert and select for jsonb type (#7994)
Browse files Browse the repository at this point in the history
Part of the `jsonb` type support:
* #7977
* #7986
* Introduce `DataType::Jsonb` **(this PR)**
* Add more expressions #7714

In this PR:
* Add `DataType::Jsonb`.
* Support constructing it from string and displaying it as string.
* Add e2e tests for `insert` and `select`.

Also tested for the following but not added in CI:
* prepared statement with BINARY format
* kafka+json source with a `jsonb` field

Approved-By: BugenZhao
  • Loading branch information
xiangjinwu authored Feb 17, 2023
1 parent 298a950 commit 32c55d0
Show file tree
Hide file tree
Showing 22 changed files with 196 additions and 13 deletions.
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ disallowed-methods = [
{ path = "std::iter::Iterator::zip", reason = "Please use `zip_eq_fast` if it's available. Otherwise use `zip_eq_debug`" },
{ path = "itertools::Itertools::zip_eq", reason = "Please use `zip_eq_fast` if it's available. Otherwise use `zip_eq_debug`" },
{ path = "futures::stream::select_all", reason = "Please use `risingwave_common::util::select_all` instead." },
{ path = "risingwave_common::array::JsonbVal::from_serde", reason = "Please add dedicated methods as part of `JsonbRef`/`JsonbVal`, rather than take inner `serde_json::Value` out, process, and put back." },
]
doc-valid-idents = [
"RisingWave",
Expand Down
6 changes: 6 additions & 0 deletions dashboard/proto/gen/data.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 14 additions & 12 deletions e2e_test/batch/catalog/pg_cast.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,20 @@ SELECT * FROM pg_catalog.pg_cast
51 1043 1114 EXPLICIT
52 1043 1184 EXPLICIT
53 1043 1186 EXPLICIT
54 1083 1043 ASSIGN
55 1083 1186 IMPLICIT
56 1114 1082 ASSIGN
57 1114 1043 ASSIGN
58 1114 1083 ASSIGN
59 1114 1184 IMPLICIT
60 1184 1082 ASSIGN
61 1184 1043 ASSIGN
62 1184 1083 ASSIGN
63 1184 1114 ASSIGN
64 1186 1043 ASSIGN
65 1186 1083 ASSIGN
54 1043 3802 EXPLICIT
55 1083 1043 ASSIGN
56 1083 1186 IMPLICIT
57 1114 1082 ASSIGN
58 1114 1043 ASSIGN
59 1114 1083 ASSIGN
60 1114 1184 IMPLICIT
61 1184 1082 ASSIGN
62 1184 1043 ASSIGN
63 1184 1083 ASSIGN
64 1184 1114 ASSIGN
65 1186 1043 ASSIGN
66 1186 1083 ASSIGN
67 3802 1043 ASSIGN

query TT rowsort
SELECT s.typname, t.typname
Expand Down
37 changes: 37 additions & 0 deletions e2e_test/batch/types/jsonb.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

query T rowsort
values ('{"a":[2, true, "", {}]}'::jsonb), ('1'), ('true'), ('null'), (null), ('[1, true]');
----
1
NULL
[1,true]
null
true
{"a":[2,true,"",{}]}

statement ok
create table t (v1 jsonb);

statement ok
insert into t values ('1'), ('true'), ('null'), (null);

query T rowsort
select * from t;
----
1
NULL
null
true

query T
select * from t order by v1::varchar;
----
1
null
true
NULL

statement ok
drop table t;
44 changes: 44 additions & 0 deletions e2e_test/batch/types/jsonb_ord.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# We do not intend to support using `jsonb` type for `group by` / `order by` / `primary key`
# Before #7981 is done, we need these tests to make sure our system do not panic.
# After #7981, we need them to make sure proper errors are returned to user.

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
values ('{"a":[2, true, "", {}]}'::jsonb), ('1'), ('true'), ('null'), (null), ('[1, true]') order by 1;

statement ok
create table t (v1 jsonb);

statement ok
insert into t values ('1'), ('true'), ('null'), (null);

statement ok
select * from t order by v1;

# deserialize length
statement ok
create materialized view mv1 as select * from t group by v1;

statement ok
select * from mv1;

statement ok
drop materialized view mv1;

# deserialize pk
statement ok
create table t2 (v1 jsonb primary key);

statement ok
insert into t2 values ('1'), ('true'), ('null'), (null);

statement ok
select * from t2;

statement ok
drop table t2;

statement ok
drop table t;
1 change: 1 addition & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message DataType {
STRUCT = 15;
LIST = 16;
BYTEA = 17;
JSONB = 18;
}
TypeName type_name = 1;
// Data length for char.
Expand Down
24 changes: 24 additions & 0 deletions src/common/src/array/jsonb_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,36 @@ impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
}
}

impl std::str::FromStr for JsonbVal {
type Err = <Value as std::str::FromStr>::Err;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let v: Value = s.parse()?;
Ok(Self(v.into()))
}
}

impl JsonbVal {
/// Avoid this function (or `impl From<Value>`) which is leak of abstraction.
/// In most cases you would be using `JsonbRef`.
pub fn from_serde(v: Value) -> Self {
Self(v.into())
}

/// Constructs a value without specific meaning. Usually used as a lightweight placeholder.
pub fn dummy() -> Self {
Self(Value::Null.into())
}

pub fn memcmp_deserialize(
deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
) -> memcomparable::Result<Self> {
let v: Value = <String as serde::Deserialize>::deserialize(deserializer)?
.parse()
.map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
Ok(Self(v.into()))
}

pub fn value_deserialize(buf: &[u8]) -> Option<Self> {
let v = Value::from_sql(&Type::JSONB, buf).ok()?;
Some(Self(v.into()))
Expand Down
1 change: 1 addition & 0 deletions src/common/src/hash/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ fn hash_key_size(data_type: &DataType) -> HashKeySize {

DataType::Varchar => HashKeySize::Variable,
DataType::Bytea => HashKeySize::Variable,
DataType::Jsonb => HashKeySize::Variable,
DataType::Struct { .. } => HashKeySize::Variable,
DataType::List { .. } => HashKeySize::Variable,
}
Expand Down
13 changes: 13 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ pub enum DataType {
#[display("bytea")]
#[from_str(regex = "(?i)^bytea$")]
Bytea,
#[display("jsonb")]
#[from_str(regex = "(?i)^jsonb$")]
Jsonb,
}

impl std::str::FromStr for Box<DataType> {
Expand All @@ -154,6 +157,7 @@ impl DataTypeName {
| DataTypeName::Timestamptz
| DataTypeName::Time
| DataTypeName::Bytea
| DataTypeName::Jsonb
| DataTypeName::Interval => true,

DataTypeName::Struct | DataTypeName::List => false,
Expand All @@ -176,6 +180,7 @@ impl DataTypeName {
DataTypeName::Timestamptz => DataType::Timestamptz,
DataTypeName::Time => DataType::Time,
DataTypeName::Interval => DataType::Interval,
DataTypeName::Jsonb => DataType::Jsonb,
DataTypeName::Struct | DataTypeName::List => {
return None;
}
Expand Down Expand Up @@ -214,6 +219,7 @@ impl From<&ProstDataType> for DataType {
TypeName::Decimal => DataType::Decimal,
TypeName::Interval => DataType::Interval,
TypeName::Bytea => DataType::Bytea,
TypeName::Jsonb => DataType::Jsonb,
TypeName::Struct => {
let fields: Vec<DataType> = proto.field_type.iter().map(|f| f.into()).collect_vec();
let field_names: Vec<String> = proto.field_names.iter().cloned().collect_vec();
Expand Down Expand Up @@ -259,6 +265,7 @@ impl DataType {
DataType::Timestamp => NaiveDateTimeArrayBuilder::new(capacity).into(),
DataType::Timestamptz => PrimitiveArrayBuilder::<i64>::new(capacity).into(),
DataType::Interval => IntervalArrayBuilder::new(capacity).into(),
DataType::Jsonb => JsonbArrayBuilder::new(capacity).into(),
DataType::Struct(t) => {
StructArrayBuilder::with_meta(capacity, t.to_array_meta()).into()
}
Expand Down Expand Up @@ -288,6 +295,7 @@ impl DataType {
DataType::Timestamptz => TypeName::Timestamptz,
DataType::Decimal => TypeName::Decimal,
DataType::Interval => TypeName::Interval,
DataType::Jsonb => TypeName::Jsonb,
DataType::Struct { .. } => TypeName::Struct,
DataType::List { .. } => TypeName::List,
DataType::Bytea => TypeName::Bytea,
Expand Down Expand Up @@ -373,6 +381,7 @@ impl DataType {
DataType::Timestamptz => ScalarImpl::Int64(i64::MIN),
DataType::Decimal => ScalarImpl::Decimal(Decimal::NegativeInf),
DataType::Interval => ScalarImpl::Interval(IntervalUnit::MIN),
DataType::Jsonb => ScalarImpl::Jsonb(JsonbVal::dummy()), // NOT `min` #7981
DataType::Struct(data_types) => ScalarImpl::Struct(StructValue::new(
data_types
.fields
Expand Down Expand Up @@ -907,6 +916,7 @@ impl ScalarImpl {
NaiveDateWrapper::with_days(days)
.map_err(|e| memcomparable::Error::Message(format!("{e}")))?
}),
Ty::Jsonb => Self::Jsonb(JsonbVal::memcmp_deserialize(de)?),
Ty::Struct(t) => StructValue::memcmp_deserialize(&t.fields, de)?.to_scalar_value(),
Ty::List { datatype } => ListValue::memcmp_deserialize(datatype, de)?.to_scalar_value(),
})
Expand Down Expand Up @@ -952,6 +962,7 @@ impl ScalarImpl {
.iter()
.map(|field| Self::encoding_data_size(field, deserializer))
.try_fold(0, |a, b| b.map(|b| a + b))?,
DataType::Jsonb => deserializer.skip_bytes()?,
DataType::Varchar => deserializer.skip_bytes()?,
DataType::Bytea => deserializer.skip_bytes()?,
};
Expand Down Expand Up @@ -988,6 +999,7 @@ pub fn literal_type_match(data_type: &DataType, literal: Option<&ScalarImpl>) ->
| (DataType::Timestamptz, ScalarImpl::Int64(_))
| (DataType::Decimal, ScalarImpl::Decimal(_))
| (DataType::Interval, ScalarImpl::Interval(_))
| (DataType::Jsonb, ScalarImpl::Jsonb(_))
| (DataType::Struct { .. }, ScalarImpl::Struct(_))
| (DataType::List { .. }, ScalarImpl::List(_))
)
Expand Down Expand Up @@ -1189,6 +1201,7 @@ mod tests {
ScalarImpl::Interval(IntervalUnit::new(2, 3, 3333)),
DataType::Interval,
),
DataTypeName::Jsonb => (ScalarImpl::Jsonb(JsonbVal::dummy()), DataType::Jsonb),
DataTypeName::Struct => (
ScalarImpl::Struct(StructValue::new(vec![
ScalarImpl::Int64(233).into(),
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/types/postgres_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl DataType {
| DataType::Varchar
| DataType::Bytea
| DataType::Interval
| DataType::Jsonb
| DataType::Struct(_)
| DataType::List { .. } => -1,
}
Expand All @@ -57,6 +58,7 @@ impl DataType {
1114 => Ok(DataType::Timestamp),
1184 => Ok(DataType::Timestamptz),
1186 => Ok(DataType::Interval),
3802 => Ok(DataType::Jsonb),
1000 => Ok(DataType::List {
datatype: Box::new(DataType::Boolean),
}),
Expand Down Expand Up @@ -99,6 +101,9 @@ impl DataType {
1187 => Ok(DataType::List {
datatype: Box::new(DataType::Interval),
}),
3807 => Ok(DataType::List {
datatype: Box::new(DataType::Jsonb),
}),
_ => Err(ErrorCode::InternalError(format!("Unsupported oid {}", oid)).into()),
}
}
Expand All @@ -121,6 +126,7 @@ impl DataType {
// NOTE: Struct type don't have oid in postgres, here we use varchar oid so that struct
// will be considered as a varchar.
DataType::Struct(_) => 1043,
DataType::Jsonb => 3802,
DataType::Bytea => 17,
DataType::List { datatype } => match unnested_list_type(datatype.as_ref().clone()) {
DataType::Boolean => 1000,
Expand All @@ -137,6 +143,7 @@ impl DataType {
DataType::Timestamp => 1115,
DataType::Timestamptz => 1185,
DataType::Interval => 1187,
DataType::Jsonb => 3807,
DataType::Struct(_) => 1015,
DataType::List { .. } => unreachable!("Never reach here!"),
},
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/util/value_encoding/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub enum ValueEncodingError {
InvalidNaiveTimeEncoding(u32, u32),
#[error("Invalid null tag value encoding: {0}")]
InvalidTagEncoding(u8),
#[error("Invalid jsonb encoding")]
InvalidJsonbEncoding,
#[error("Invalid struct encoding: {0}")]
InvalidStructEncoding(crate::array::ArrayError),
#[error("Invalid list encoding: {0}")]
Expand Down
6 changes: 5 additions & 1 deletion src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::{Buf, BufMut};
use chrono::{Datelike, Timelike};
use itertools::Itertools;

use crate::array::{ListRef, ListValue, StructRef, StructValue};
use crate::array::{JsonbVal, ListRef, ListValue, StructRef, StructValue};
use crate::types::struct_type::StructType;
use crate::types::{
DataType, Datum, Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper,
Expand Down Expand Up @@ -156,6 +156,10 @@ fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
DataType::Timestamp => ScalarImpl::NaiveDateTime(deserialize_naivedatetime(data)?),
DataType::Timestamptz => ScalarImpl::Int64(data.get_i64_le()),
DataType::Date => ScalarImpl::NaiveDate(deserialize_naivedate(data)?),
DataType::Jsonb => ScalarImpl::Jsonb(
JsonbVal::value_deserialize(&deserialize_bytea(data))
.ok_or(ValueEncodingError::InvalidJsonbEncoding)?,
),
DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
DataType::List {
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/parser/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ fn do_parse_simd_json_value(dtype: &DataType, v: &BorrowedValue<'_>) -> Result<S
BorrowedValue::Static(_) => i64_to_timestamptz(ensure_int!(v, i64))?.into(),
_ => anyhow::bail!("expect timestamptz, but found {v}"),
},
DataType::Jsonb => {
let v: serde_json::Value = v.clone().try_into()?;
#[expect(clippy::disallowed_methods)]
ScalarImpl::Jsonb(risingwave_common::array::JsonbVal::from_serde(v))
}
DataType::Struct(struct_type_info) => {
let fields = struct_type_info
.field_names
Expand Down
12 changes: 12 additions & 0 deletions src/expr/src/expr/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ macro_rules! list {

pub(crate) use list;

#[macro_export]
macro_rules! jsonb {
($macro:ident) => {
$macro! {
risingwave_common::types::DataType::Jsonb,
risingwave_common::array::JsonbArray
}
};
}

pub(crate) use jsonb;

#[macro_export]
macro_rules! int16 {
($macro:ident) => {
Expand Down
Loading

0 comments on commit 32c55d0

Please sign in to comment.