Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_fix_select_unexpected_level_count
  • Loading branch information
Li0k committed Oct 25, 2023
2 parents eeb51c7 + 7f82929 commit cce986d
Show file tree
Hide file tree
Showing 18 changed files with 491 additions and 307 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ hyper = "0.14"
hytra = { workspace = true }
itertools = "0.11"
itoa = "1.0"
jsonbb = "0.1"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" }
memcomparable = { version = "0.2", features = ["decimal"] }
num-integer = "0.1"
Expand Down
128 changes: 46 additions & 82 deletions src/common/src/array/jsonb_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,35 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem::size_of;

use risingwave_pb::data::{PbArray, PbArrayType};
use serde_json::Value;

use super::{Array, ArrayBuilder};
use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::estimate_size::EstimateSize;
use crate::types::{DataType, JsonbRef, JsonbVal, F32, F64};
use crate::util::iter_util::ZipEqFast;
use crate::types::{DataType, JsonbRef, JsonbVal, Scalar};

#[derive(Debug)]
pub struct JsonbArrayBuilder {
bitmap: BitmapBuilder,
data: Vec<Value>,
builder: jsonbb::Builder,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct JsonbArray {
bitmap: Bitmap,
data: Vec<Value>,
/// Elements are stored as a single JSONB array value.
data: jsonbb::Value,
}

impl ArrayBuilder for JsonbArrayBuilder {
type ArrayType = JsonbArray;

fn new(capacity: usize) -> Self {
let mut builder = jsonbb::Builder::with_capacity(capacity);
builder.begin_array();
Self {
bitmap: BitmapBuilder::with_capacity(capacity),
data: Vec::with_capacity(capacity),
builder,
}
}

Expand All @@ -54,13 +53,15 @@ impl ArrayBuilder for JsonbArrayBuilder {
match value {
Some(x) => {
self.bitmap.append_n(n, true);
self.data
.extend(std::iter::repeat(x).take(n).map(|x| x.0.clone()));
for _ in 0..n {
self.builder.add_value(x.0);
}
}
None => {
self.bitmap.append_n(n, false);
self.data
.extend(std::iter::repeat(*JsonbVal::dummy().0).take(n));
for _ in 0..n {
self.builder.add_null();
}
}
}
}
Expand All @@ -69,29 +70,44 @@ impl ArrayBuilder for JsonbArrayBuilder {
for bit in other.bitmap.iter() {
self.bitmap.append(bit);
}
self.data.extend_from_slice(&other.data);
for value in other.data.as_array().unwrap().iter() {
self.builder.add_value(value);
}
}

fn pop(&mut self) -> Option<()> {
self.data.pop().map(|_| self.bitmap.pop().unwrap())
self.bitmap.pop()?;
self.builder.pop();
Some(())
}

fn len(&self) -> usize {
self.bitmap.len()
}

fn finish(self) -> Self::ArrayType {
fn finish(mut self) -> Self::ArrayType {
self.builder.end_array();
Self::ArrayType {
bitmap: self.bitmap.finish(),
data: self.data,
data: self.builder.finish(),
}
}
}

impl JsonbArrayBuilder {
pub fn append_move(&mut self, value: JsonbVal) {
self.bitmap.append(true);
self.data.push(*value.0);
impl JsonbArray {
/// Loads a `JsonbArray` from a protobuf array.
///
/// See also `JsonbArray::to_protobuf`.
pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
ensure!(
array.values.len() == 1,
"Must have exactly 1 buffer in a jsonb array"
);
let arr = JsonbArray {
bitmap: array.get_null_bitmap()?.into(),
data: jsonbb::Value::from_bytes(&array.values[0].body),
};
Ok(arr.into())
}
}

Expand All @@ -101,52 +117,23 @@ impl Array for JsonbArray {
type RefItem<'a> = JsonbRef<'a>;

unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
JsonbRef(self.data.get_unchecked(idx))
JsonbRef(self.data.as_array().unwrap().get(idx).unwrap())
}

fn len(&self) -> usize {
self.data.len()
self.bitmap.len()
}

fn to_protobuf(&self) -> PbArray {
// The memory layout contains `serde_json::Value` trees, but in protobuf we transmit this as
// variable length bytes in value encoding. That is, one buffer of length n+1 containing
// start and end offsets into the 2nd buffer containing all value bytes concatenated.

use risingwave_pb::common::buffer::CompressionType;
use risingwave_pb::common::Buffer;

let mut offset_buffer =
Vec::<u8>::with_capacity((1 + self.data.len()) * std::mem::size_of::<u64>());
let mut data_buffer = Vec::<u8>::with_capacity(self.data.len());

let mut offset = 0;
for (v, not_null) in self.data.iter().zip_eq_fast(self.null_bitmap().iter()) {
if !not_null {
continue;
}
let d = JsonbRef(v).value_serialize();
offset_buffer.extend_from_slice(&(offset as u64).to_be_bytes());
data_buffer.extend_from_slice(&d);
offset += d.len();
}
offset_buffer.extend_from_slice(&(offset as u64).to_be_bytes());

let values = vec![
Buffer {
compression: CompressionType::None as i32,
body: offset_buffer,
},
Buffer {
compression: CompressionType::None as i32,
body: data_buffer,
},
];

let null_bitmap = self.null_bitmap().to_protobuf();
PbArray {
null_bitmap: Some(null_bitmap),
values,
null_bitmap: Some(self.null_bitmap().to_protobuf()),
values: vec![Buffer {
compression: CompressionType::None as i32,
body: self.data.as_bytes().to_vec(),
}],
array_type: PbArrayType::Jsonb as i32,
struct_array_data: None,
list_array_data: None,
Expand Down Expand Up @@ -176,7 +163,7 @@ impl FromIterator<Option<JsonbVal>> for JsonbArray {
let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
for i in iter {
match i {
Some(x) => builder.append_move(x),
Some(x) => builder.append(Some(x.as_scalar_ref())),
None => builder.append(None),
}
}
Expand All @@ -190,31 +177,8 @@ impl FromIterator<JsonbVal> for JsonbArray {
}
}

// TODO: We need to fix this later.
impl EstimateSize for JsonbArray {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<Value>()
}
}

impl From<F32> for Value {
fn from(v: F32) -> Value {
serde_json::Number::from_f64(v.0 as f64)
.expect("todo: convert Inf/NaN to jsonb")
.into()
}
}

impl From<F64> for Value {
fn from(v: F64) -> Value {
serde_json::Number::from_f64(v.0)
.expect("todo: convert Inf/NaN to jsonb")
.into()
}
}

impl From<JsonbRef<'_>> for Value {
fn from(v: JsonbRef<'_>) -> Value {
v.0.clone()
self.bitmap.estimated_heap_size() + self.data.capacity()
}
}
4 changes: 1 addition & 3 deletions src/common/src/array/proto_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ impl ArrayImpl {
PbArrayType::Timestamp => read_timestamp_array(array, cardinality)?,
PbArrayType::Timestamptz => read_timestamptz_array(array, cardinality)?,
PbArrayType::Interval => read_interval_array(array, cardinality)?,
PbArrayType::Jsonb => {
read_string_array::<JsonbArrayBuilder, JsonbValueReader>(array, cardinality)?
}
PbArrayType::Jsonb => JsonbArray::from_protobuf(array)?,
PbArrayType::Struct => StructArray::from_protobuf(array)?,
PbArrayType::List => ListArray::from_protobuf(array)?,
PbArrayType::Unspecified => unreachable!(),
Expand Down
15 changes: 1 addition & 14 deletions src/common/src/array/value_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use byteorder::{BigEndian, ReadBytesExt};

use super::ArrayResult;
use crate::array::{
ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Serial,
Utf8ArrayBuilder,
ArrayBuilder, BytesArrayBuilder, PrimitiveArrayItemType, Serial, Utf8ArrayBuilder,
};
use crate::types::{Decimal, F32, F64};

Expand Down Expand Up @@ -89,15 +88,3 @@ impl VarSizedValueReader<BytesArrayBuilder> for BytesValueReader {
Ok(())
}
}

pub struct JsonbValueReader;

impl VarSizedValueReader<JsonbArrayBuilder> for JsonbValueReader {
fn read(buf: &[u8], builder: &mut JsonbArrayBuilder) -> ArrayResult<()> {
let Some(v) = super::JsonbVal::value_deserialize(buf) else {
bail!("failed to read jsonb from bytes");
};
builder.append_move(v);
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/common/src/test_utils/rand_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl RandValue for Int256 {

impl RandValue for JsonbVal {
fn rand_value<R: rand::Rng>(_rand: &mut R) -> Self {
JsonbVal::dummy()
JsonbVal::null()
}
}

Expand Down
Loading

0 comments on commit cce986d

Please sign in to comment.