diff --git a/dashboard/proto/gen/data.ts b/dashboard/proto/gen/data.ts index 16e721937978d..efb0fde525ab1 100644 --- a/dashboard/proto/gen/data.ts +++ b/dashboard/proto/gen/data.ts @@ -20,6 +20,7 @@ export const RwArrayType = { STRUCT: "STRUCT", LIST: "LIST", BYTEA: "BYTEA", + JSONB: "JSONB", UNRECOGNIZED: "UNRECOGNIZED", } as const; @@ -75,6 +76,9 @@ export function rwArrayTypeFromJSON(object: any): RwArrayType { case 15: case "BYTEA": return RwArrayType.BYTEA; + case 16: + case "JSONB": + return RwArrayType.JSONB; case -1: case "UNRECOGNIZED": default: @@ -116,6 +120,8 @@ export function rwArrayTypeToJSON(object: RwArrayType): string { return "LIST"; case RwArrayType.BYTEA: return "BYTEA"; + case RwArrayType.JSONB: + return "JSONB"; case RwArrayType.UNRECOGNIZED: default: return "UNRECOGNIZED"; diff --git a/proto/data.proto b/proto/data.proto index 1d101f0a489b8..2f371b0eaf983 100644 --- a/proto/data.proto +++ b/proto/data.proto @@ -95,6 +95,7 @@ enum ArrayType { STRUCT = 13; LIST = 14; BYTEA = 15; + JSONB = 16; } message Array { diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 0b368e1f94ceb..4ef6b89a0b5fa 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -443,6 +443,7 @@ impl From<&ListArray> for arrow_array::ListArray { Time64NanosecondBuilder::with_capacity(a.len()), |b, v| b.append_option(v.map(|d| d.into_arrow())), ), + ArrayImpl::Jsonb(_) => todo!("list of jsonb"), ArrayImpl::Struct(_) => todo!("list of struct"), ArrayImpl::List(_) => todo!("list of list"), ArrayImpl::Bytea(a) => build( diff --git a/src/common/src/array/column_proto_readers.rs b/src/common/src/array/column_proto_readers.rs index 3ce97206c66e0..3ee32e7443067 100644 --- a/src/common/src/array/column_proto_readers.rs +++ b/src/common/src/array/column_proto_readers.rs @@ -189,8 +189,7 @@ pub fn read_string_array>( offset ) })?; - let v = R::read(buf.as_slice())?; - builder.append(Some(v)); + R::read(buf.as_slice(), &mut builder)?; } else { builder.append(None); } diff --git a/src/common/src/array/jsonb_array.rs b/src/common/src/array/jsonb_array.rs index 1b40cb65b5f50..18489aa0f133e 100644 --- a/src/common/src/array/jsonb_array.rs +++ b/src/common/src/array/jsonb_array.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use postgres_types::{ToSql as _, Type}; +use postgres_types::{FromSql as _, ToSql as _, Type}; use serde_json::Value; +use super::{Array, ArrayBuilder}; +use crate::buffer::{Bitmap, BitmapBuilder}; use crate::types::{Scalar, ScalarImpl, ScalarRef}; +use crate::util::iter_util::ZipEqFast; #[derive(Debug, Clone, PartialEq, Eq)] pub struct JsonbVal(Box); // The `Box` is just to keep `size_of::` smaller. @@ -113,6 +116,18 @@ impl crate::types::to_binary::ToBinary for JsonbRef<'_> { } } +impl JsonbVal { + /// Constructs a value without specific meaning. Usually used as a lightweight placeholder. + pub fn dummy() -> Self { + Self(Value::Null.into()) + } + + pub fn value_deserialize(buf: &[u8]) -> Option { + let v = Value::from_sql(&Type::JSONB, buf).ok()?; + Some(Self(v.into())) + } +} + impl JsonbRef<'_> { pub fn memcmp_serialize( &self, @@ -134,3 +149,145 @@ impl JsonbRef<'_> { output.freeze().into() } } + +#[derive(Debug)] +pub struct JsonbArrayBuilder { + bitmap: BitmapBuilder, + data: Vec, +} + +#[derive(Debug, Clone)] +pub struct JsonbArray { + bitmap: Bitmap, + data: Vec, +} + +impl ArrayBuilder for JsonbArrayBuilder { + type ArrayType = JsonbArray; + + fn with_meta(capacity: usize, _meta: super::ArrayMeta) -> Self { + Self { + bitmap: BitmapBuilder::with_capacity(capacity), + data: Vec::with_capacity(capacity), + } + } + + fn append_n(&mut self, n: usize, value: Option<::RefItem<'_>>) { + match value { + Some(x) => { + self.bitmap.append_n(n, true); + self.data + .extend(std::iter::repeat(x).take(n).map(|x| x.0.clone())); + } + None => { + self.bitmap.append_n(n, false); + self.data + .extend(std::iter::repeat(*JsonbVal::dummy().0).take(n)); + } + } + } + + fn append_array(&mut self, other: &Self::ArrayType) { + for bit in other.bitmap.iter() { + self.bitmap.append(bit); + } + self.data.extend_from_slice(&other.data); + } + + fn pop(&mut self) -> Option<()> { + self.data.pop().map(|_| self.bitmap.pop().unwrap()) + } + + fn finish(self) -> Self::ArrayType { + Self::ArrayType { + bitmap: self.bitmap.finish(), + data: self.data, + } + } +} + +impl JsonbArrayBuilder { + pub fn append_move( + &mut self, + value: <::ArrayType as Array>::OwnedItem, + ) { + self.bitmap.append(true); + self.data.push(*value.0); + } +} + +impl Array for JsonbArray { + type Builder = JsonbArrayBuilder; + type OwnedItem = JsonbVal; + type RefItem<'a> = JsonbRef<'a>; + + unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> { + JsonbRef(self.data.get_unchecked(idx)) + } + + fn len(&self) -> usize { + self.data.len() + } + + fn to_protobuf(&self) -> super::ProstArray { + // The memory layout contains `serde_json::Value` trees, but in protobuf we transmit this as + // variable length bytes in value encoding. That is, one buffer of length n+1 containing + // start and end offsets into the 2nd buffer containing all value bytes concatenated. + + use risingwave_pb::common::buffer::CompressionType; + use risingwave_pb::common::Buffer; + + let mut offset_buffer = + Vec::::with_capacity((1 + self.data.len()) * std::mem::size_of::()); + let mut data_buffer = Vec::::with_capacity(self.data.len()); + + let mut offset = 0; + for (v, not_null) in self.data.iter().zip_eq_fast(self.null_bitmap().iter()) { + if !not_null { + continue; + } + let d = JsonbRef(v).value_serialize(); + offset_buffer.extend_from_slice(&(offset as u64).to_be_bytes()); + data_buffer.extend_from_slice(&d); + offset += d.len(); + } + offset_buffer.extend_from_slice(&(offset as u64).to_be_bytes()); + + let values = vec![ + Buffer { + compression: CompressionType::None as i32, + body: offset_buffer, + }, + Buffer { + compression: CompressionType::None as i32, + body: data_buffer, + }, + ]; + + let null_bitmap = self.null_bitmap().to_protobuf(); + super::ProstArray { + null_bitmap: Some(null_bitmap), + values, + array_type: super::ProstArrayType::Jsonb as i32, + struct_array_data: None, + list_array_data: None, + } + } + + fn null_bitmap(&self) -> &Bitmap { + &self.bitmap + } + + fn into_null_bitmap(self) -> Bitmap { + self.bitmap + } + + fn set_bitmap(&mut self, bitmap: Bitmap) { + self.bitmap = bitmap; + } + + fn create_builder(&self, capacity: usize) -> super::ArrayBuilderImpl { + let array_builder = Self::Builder::new(capacity); + super::ArrayBuilderImpl::Jsonb(array_builder) + } +} diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index fb487e28a8d6a..48dd6d7543497 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -53,7 +53,7 @@ pub use data_chunk_iter::RowRef; pub use decimal_array::{DecimalArray, DecimalArrayBuilder}; pub use interval_array::{IntervalArray, IntervalArrayBuilder}; pub use iterator::ArrayIterator; -pub use jsonb_array::{JsonbRef, JsonbVal}; +pub use jsonb_array::{JsonbArray, JsonbArrayBuilder, JsonbRef, JsonbVal}; pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue}; use paste::paste; pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType}; @@ -345,6 +345,7 @@ macro_rules! for_all_variants { { NaiveDate, naivedate, NaiveDateArray, NaiveDateArrayBuilder }, { NaiveDateTime, naivedatetime, NaiveDateTimeArray, NaiveDateTimeArrayBuilder }, { NaiveTime, naivetime, NaiveTimeArray, NaiveTimeArrayBuilder }, + { Jsonb, jsonb, JsonbArray, JsonbArrayBuilder }, { Struct, struct, StructArray, StructArrayBuilder }, { List, list, ListArray, ListArrayBuilder }, { Bytea, bytea, BytesArray, BytesArrayBuilder} @@ -383,6 +384,12 @@ impl From for ArrayImpl { } } +impl From for ArrayImpl { + fn from(arr: JsonbArray) -> Self { + Self::Jsonb(arr) + } +} + impl From for ArrayImpl { fn from(arr: StructArray) -> Self { Self::Struct(arr) @@ -676,6 +683,9 @@ impl ArrayImpl { ProstArrayType::Time => read_naive_time_array(array, cardinality)?, ProstArrayType::Timestamp => read_naive_date_time_array(array, cardinality)?, ProstArrayType::Interval => read_interval_unit_array(array, cardinality)?, + ProstArrayType::Jsonb => { + read_string_array::(array, cardinality)? + } ProstArrayType::Struct => StructArray::from_protobuf(array)?, ProstArrayType::List => ListArray::from_protobuf(array)?, ProstArrayType::Unspecified => unreachable!(), diff --git a/src/common/src/array/value_reader.rs b/src/common/src/array/value_reader.rs index d9dea3573bd78..371f08bcd8002 100644 --- a/src/common/src/array/value_reader.rs +++ b/src/common/src/array/value_reader.rs @@ -19,7 +19,7 @@ use byteorder::{BigEndian, ReadBytesExt}; use super::ArrayResult; use crate::array::{ - Array, ArrayBuilder, BytesArrayBuilder, PrimitiveArrayItemType, Utf8ArrayBuilder, + ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Utf8ArrayBuilder, }; use crate::types::{Decimal, OrderedF32, OrderedF64}; @@ -62,24 +62,39 @@ impl PrimitiveValueReader for DecimalValueReader { } pub trait VarSizedValueReader { - fn read(buf: &[u8]) -> ArrayResult<<::ArrayType as Array>::RefItem<'_>>; + fn read(buf: &[u8], builder: &mut AB) -> ArrayResult<()>; } pub struct Utf8ValueReader; impl VarSizedValueReader for Utf8ValueReader { - fn read(buf: &[u8]) -> ArrayResult<&str> { - match from_utf8(buf) { - Ok(s) => Ok(s), + fn read(buf: &[u8], builder: &mut Utf8ArrayBuilder) -> ArrayResult<()> { + let s = match from_utf8(buf) { + Ok(s) => s, Err(e) => bail!("failed to read utf8 string from bytes: {}", e), - } + }; + builder.append(Some(s)); + Ok(()) } } pub struct BytesValueReader; impl VarSizedValueReader for BytesValueReader { - fn read(buf: &[u8]) -> ArrayResult<&[u8]> { - Ok(buf) + fn read(buf: &[u8], builder: &mut BytesArrayBuilder) -> ArrayResult<()> { + builder.append(Some(buf)); + Ok(()) + } +} + +pub struct JsonbValueReader; + +impl VarSizedValueReader for JsonbValueReader { + fn read(buf: &[u8], builder: &mut JsonbArrayBuilder) -> ArrayResult<()> { + let Some(v) = super::JsonbVal::value_deserialize(buf) else { + bail!("failed to read jsonb from bytes"); + }; + builder.append_move(v); + Ok(()) } } diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index 8439f87e9a1f2..48901111f3288 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -31,8 +31,8 @@ use chrono::{Datelike, Timelike}; use fixedbitset::FixedBitSet; use crate::array::{ - Array, ArrayBuilder, ArrayBuilderImpl, ArrayError, ArrayImpl, ArrayResult, DataChunk, ListRef, - StructRef, + Array, ArrayBuilder, ArrayBuilderImpl, ArrayError, ArrayImpl, ArrayResult, DataChunk, JsonbRef, + ListRef, StructRef, }; use crate::collection::estimate_size::EstimateSize; use crate::hash::VirtualNode; @@ -463,6 +463,20 @@ impl HashKeySerDe<'_> for NaiveTimeWrapper { } } +impl<'a> HashKeySerDe<'a> for JsonbRef<'a> { + type S = Vec; + + /// This should never be called + fn serialize(self) -> Self::S { + todo!() + } + + /// This should never be called + fn deserialize(_source: &mut R) -> Self { + todo!() + } +} + impl<'a> HashKeySerDe<'a> for StructRef<'a> { type S = Vec; diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index 340b6e7dca70a..01365d85c325d 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -25,7 +25,7 @@ use rand::prelude::Distribution; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; -use crate::array::{Array, ArrayBuilder, ArrayRef, ListValue, StructValue}; +use crate::array::{Array, ArrayBuilder, ArrayRef, JsonbVal, ListValue, StructValue}; use crate::types::{ Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, NativeType, Scalar, @@ -117,6 +117,12 @@ impl RandValue for bool { } } +impl RandValue for JsonbVal { + fn rand_value(_rand: &mut R) -> Self { + JsonbVal::dummy() + } +} + impl RandValue for StructValue { fn rand_value(_rand: &mut R) -> Self { StructValue::new(vec![])