Skip to content

Commit

Permalink
feat(common): introduce JsonbArrayBuilder and JsonbArray (#7986)
Browse files Browse the repository at this point in the history
Part of the `jsonb` type support (preview all on [this branch](https://github.com/risingwavelabs/risingwave/compare/XJ-jsonb-WIP-2?expand=1)):
* #7977
* Introduce `ArrayBuilder` and `Array` **(this PR)**
* Introduce `DataType::Jsonb`
* Add more expressions #7714

In this PR:
* The in-memory layout of `JsonbArray` is simply a `Vec` or roots of json trees. This is easier to operate but do have space for optimization.
* The protobuf layout is the same as `bytea` variable length array, with each element storing its value encoding. In case we switch to a newer protobuf layout, a new ArrayType enum value can be introduced without affecting parts other than `to_protobuf` and `from_protobuf`.
* Refactored `VarSizedValueReader` from returning `RefItem` into accepting `&mut ArrayBuilder`. This is to use `JsonbArrayBuilder::append_move` on the deserialized `OwnedItem`. It is impossible to get a `JsonbArray::RefItem` from `&[u8]`.
* Blanket implementation for `arrow` / `HashKeySerDe` / `RandValue`.

Approved-By: BugenZhao
  • Loading branch information
xiangjinwu authored Feb 17, 2023
1 parent cdb5d9b commit 7f9588a
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 15 deletions.
6 changes: 6 additions & 0 deletions dashboard/proto/gen/data.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ enum ArrayType {
STRUCT = 13;
LIST = 14;
BYTEA = 15;
JSONB = 16;
}

message Array {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/array/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ impl From<&ListArray> for arrow_array::ListArray {
Time64NanosecondBuilder::with_capacity(a.len()),
|b, v| b.append_option(v.map(|d| d.into_arrow())),
),
ArrayImpl::Jsonb(_) => todo!("list of jsonb"),
ArrayImpl::Struct(_) => todo!("list of struct"),
ArrayImpl::List(_) => todo!("list of list"),
ArrayImpl::Bytea(a) => build(
Expand Down
3 changes: 1 addition & 2 deletions src/common/src/array/column_proto_readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ pub fn read_string_array<B: ArrayBuilder, R: VarSizedValueReader<B>>(
offset
)
})?;
let v = R::read(buf.as_slice())?;
builder.append(Some(v));
R::read(buf.as_slice(), &mut builder)?;
} else {
builder.append(None);
}
Expand Down
159 changes: 158 additions & 1 deletion src/common/src/array/jsonb_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use postgres_types::{ToSql as _, Type};
use postgres_types::{FromSql as _, ToSql as _, Type};
use serde_json::Value;

use super::{Array, ArrayBuilder};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::types::{Scalar, ScalarImpl, ScalarRef};
use crate::util::iter_util::ZipEqFast;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct JsonbVal(Box<Value>); // The `Box` is just to keep `size_of::<ScalarImpl>` smaller.
Expand Down Expand Up @@ -113,6 +116,18 @@ impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
}
}

impl JsonbVal {
/// Constructs a value without specific meaning. Usually used as a lightweight placeholder.
pub fn dummy() -> Self {
Self(Value::Null.into())
}

pub fn value_deserialize(buf: &[u8]) -> Option<Self> {
let v = Value::from_sql(&Type::JSONB, buf).ok()?;
Some(Self(v.into()))
}
}

impl JsonbRef<'_> {
pub fn memcmp_serialize(
&self,
Expand All @@ -134,3 +149,145 @@ impl JsonbRef<'_> {
output.freeze().into()
}
}

#[derive(Debug)]
pub struct JsonbArrayBuilder {
bitmap: BitmapBuilder,
data: Vec<Value>,
}

#[derive(Debug, Clone)]
pub struct JsonbArray {
bitmap: Bitmap,
data: Vec<Value>,
}

impl ArrayBuilder for JsonbArrayBuilder {
type ArrayType = JsonbArray;

fn with_meta(capacity: usize, _meta: super::ArrayMeta) -> Self {
Self {
bitmap: BitmapBuilder::with_capacity(capacity),
data: Vec::with_capacity(capacity),
}
}

fn append_n(&mut self, n: usize, value: Option<<Self::ArrayType as Array>::RefItem<'_>>) {
match value {
Some(x) => {
self.bitmap.append_n(n, true);
self.data
.extend(std::iter::repeat(x).take(n).map(|x| x.0.clone()));
}
None => {
self.bitmap.append_n(n, false);
self.data
.extend(std::iter::repeat(*JsonbVal::dummy().0).take(n));
}
}
}

fn append_array(&mut self, other: &Self::ArrayType) {
for bit in other.bitmap.iter() {
self.bitmap.append(bit);
}
self.data.extend_from_slice(&other.data);
}

fn pop(&mut self) -> Option<()> {
self.data.pop().map(|_| self.bitmap.pop().unwrap())
}

fn finish(self) -> Self::ArrayType {
Self::ArrayType {
bitmap: self.bitmap.finish(),
data: self.data,
}
}
}

impl JsonbArrayBuilder {
pub fn append_move(
&mut self,
value: <<JsonbArrayBuilder as ArrayBuilder>::ArrayType as Array>::OwnedItem,
) {
self.bitmap.append(true);
self.data.push(*value.0);
}
}

impl Array for JsonbArray {
type Builder = JsonbArrayBuilder;
type OwnedItem = JsonbVal;
type RefItem<'a> = JsonbRef<'a>;

unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
JsonbRef(self.data.get_unchecked(idx))
}

fn len(&self) -> usize {
self.data.len()
}

fn to_protobuf(&self) -> super::ProstArray {
// The memory layout contains `serde_json::Value` trees, but in protobuf we transmit this as
// variable length bytes in value encoding. That is, one buffer of length n+1 containing
// start and end offsets into the 2nd buffer containing all value bytes concatenated.

use risingwave_pb::common::buffer::CompressionType;
use risingwave_pb::common::Buffer;

let mut offset_buffer =
Vec::<u8>::with_capacity((1 + self.data.len()) * std::mem::size_of::<u64>());
let mut data_buffer = Vec::<u8>::with_capacity(self.data.len());

let mut offset = 0;
for (v, not_null) in self.data.iter().zip_eq_fast(self.null_bitmap().iter()) {
if !not_null {
continue;
}
let d = JsonbRef(v).value_serialize();
offset_buffer.extend_from_slice(&(offset as u64).to_be_bytes());
data_buffer.extend_from_slice(&d);
offset += d.len();
}
offset_buffer.extend_from_slice(&(offset as u64).to_be_bytes());

let values = vec![
Buffer {
compression: CompressionType::None as i32,
body: offset_buffer,
},
Buffer {
compression: CompressionType::None as i32,
body: data_buffer,
},
];

let null_bitmap = self.null_bitmap().to_protobuf();
super::ProstArray {
null_bitmap: Some(null_bitmap),
values,
array_type: super::ProstArrayType::Jsonb as i32,
struct_array_data: None,
list_array_data: None,
}
}

fn null_bitmap(&self) -> &Bitmap {
&self.bitmap
}

fn into_null_bitmap(self) -> Bitmap {
self.bitmap
}

fn set_bitmap(&mut self, bitmap: Bitmap) {
self.bitmap = bitmap;
}

fn create_builder(&self, capacity: usize) -> super::ArrayBuilderImpl {
let array_builder = Self::Builder::new(capacity);
super::ArrayBuilderImpl::Jsonb(array_builder)
}
}
12 changes: 11 additions & 1 deletion src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use data_chunk_iter::RowRef;
pub use decimal_array::{DecimalArray, DecimalArrayBuilder};
pub use interval_array::{IntervalArray, IntervalArrayBuilder};
pub use iterator::ArrayIterator;
pub use jsonb_array::{JsonbRef, JsonbVal};
pub use jsonb_array::{JsonbArray, JsonbArrayBuilder, JsonbRef, JsonbVal};
pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue};
use paste::paste;
pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType};
Expand Down Expand Up @@ -345,6 +345,7 @@ macro_rules! for_all_variants {
{ NaiveDate, naivedate, NaiveDateArray, NaiveDateArrayBuilder },
{ NaiveDateTime, naivedatetime, NaiveDateTimeArray, NaiveDateTimeArrayBuilder },
{ NaiveTime, naivetime, NaiveTimeArray, NaiveTimeArrayBuilder },
{ Jsonb, jsonb, JsonbArray, JsonbArrayBuilder },
{ Struct, struct, StructArray, StructArrayBuilder },
{ List, list, ListArray, ListArrayBuilder },
{ Bytea, bytea, BytesArray, BytesArrayBuilder}
Expand Down Expand Up @@ -383,6 +384,12 @@ impl From<Utf8Array> for ArrayImpl {
}
}

impl From<JsonbArray> for ArrayImpl {
fn from(arr: JsonbArray) -> Self {
Self::Jsonb(arr)
}
}

impl From<StructArray> for ArrayImpl {
fn from(arr: StructArray) -> Self {
Self::Struct(arr)
Expand Down Expand Up @@ -676,6 +683,9 @@ impl ArrayImpl {
ProstArrayType::Time => read_naive_time_array(array, cardinality)?,
ProstArrayType::Timestamp => read_naive_date_time_array(array, cardinality)?,
ProstArrayType::Interval => read_interval_unit_array(array, cardinality)?,
ProstArrayType::Jsonb => {
read_string_array::<JsonbArrayBuilder, JsonbValueReader>(array, cardinality)?
}
ProstArrayType::Struct => StructArray::from_protobuf(array)?,
ProstArrayType::List => ListArray::from_protobuf(array)?,
ProstArrayType::Unspecified => unreachable!(),
Expand Down
31 changes: 23 additions & 8 deletions src/common/src/array/value_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use byteorder::{BigEndian, ReadBytesExt};

use super::ArrayResult;
use crate::array::{
Array, ArrayBuilder, BytesArrayBuilder, PrimitiveArrayItemType, Utf8ArrayBuilder,
ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Utf8ArrayBuilder,
};
use crate::types::{Decimal, OrderedF32, OrderedF64};

Expand Down Expand Up @@ -62,24 +62,39 @@ impl PrimitiveValueReader<Decimal> for DecimalValueReader {
}

pub trait VarSizedValueReader<AB: ArrayBuilder> {
fn read(buf: &[u8]) -> ArrayResult<<<AB as ArrayBuilder>::ArrayType as Array>::RefItem<'_>>;
fn read(buf: &[u8], builder: &mut AB) -> ArrayResult<()>;
}

pub struct Utf8ValueReader;

impl VarSizedValueReader<Utf8ArrayBuilder> for Utf8ValueReader {
fn read(buf: &[u8]) -> ArrayResult<&str> {
match from_utf8(buf) {
Ok(s) => Ok(s),
fn read(buf: &[u8], builder: &mut Utf8ArrayBuilder) -> ArrayResult<()> {
let s = match from_utf8(buf) {
Ok(s) => s,
Err(e) => bail!("failed to read utf8 string from bytes: {}", e),
}
};
builder.append(Some(s));
Ok(())
}
}

pub struct BytesValueReader;

impl VarSizedValueReader<BytesArrayBuilder> for BytesValueReader {
fn read(buf: &[u8]) -> ArrayResult<&[u8]> {
Ok(buf)
fn read(buf: &[u8], builder: &mut BytesArrayBuilder) -> ArrayResult<()> {
builder.append(Some(buf));
Ok(())
}
}

pub struct JsonbValueReader;

impl VarSizedValueReader<JsonbArrayBuilder> for JsonbValueReader {
fn read(buf: &[u8], builder: &mut JsonbArrayBuilder) -> ArrayResult<()> {
let Some(v) = super::JsonbVal::value_deserialize(buf) else {
bail!("failed to read jsonb from bytes");
};
builder.append_move(v);
Ok(())
}
}
18 changes: 16 additions & 2 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use chrono::{Datelike, Timelike};
use fixedbitset::FixedBitSet;

use crate::array::{
Array, ArrayBuilder, ArrayBuilderImpl, ArrayError, ArrayImpl, ArrayResult, DataChunk, ListRef,
StructRef,
Array, ArrayBuilder, ArrayBuilderImpl, ArrayError, ArrayImpl, ArrayResult, DataChunk, JsonbRef,
ListRef, StructRef,
};
use crate::collection::estimate_size::EstimateSize;
use crate::hash::VirtualNode;
Expand Down Expand Up @@ -463,6 +463,20 @@ impl HashKeySerDe<'_> for NaiveTimeWrapper {
}
}

impl<'a> HashKeySerDe<'a> for JsonbRef<'a> {
type S = Vec<u8>;

/// This should never be called
fn serialize(self) -> Self::S {
todo!()
}

/// This should never be called
fn deserialize<R: Read>(_source: &mut R) -> Self {
todo!()
}
}

impl<'a> HashKeySerDe<'a> for StructRef<'a> {
type S = Vec<u8>;

Expand Down
8 changes: 7 additions & 1 deletion src/common/src/test_utils/rand_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use rand::prelude::Distribution;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};

use crate::array::{Array, ArrayBuilder, ArrayRef, ListValue, StructValue};
use crate::array::{Array, ArrayBuilder, ArrayRef, JsonbVal, ListValue, StructValue};
use crate::types::{
Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, NativeType,
Scalar,
Expand Down Expand Up @@ -117,6 +117,12 @@ impl RandValue for bool {
}
}

impl RandValue for JsonbVal {
fn rand_value<R: rand::Rng>(_rand: &mut R) -> Self {
JsonbVal::dummy()
}
}

impl RandValue for StructValue {
fn rand_value<R: rand::Rng>(_rand: &mut R) -> Self {
StructValue::new(vec![])
Expand Down

0 comments on commit 7f9588a

Please sign in to comment.