Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common): introduce JsonbVal as Scalar and JsonbRef as ScalarRef #7977

Merged
merged 3 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ num-traits = "0.2"
parking_lot = "0.12"
parse-display = "0.6"
paste = "1"
postgres-types = { version = "0.2.4", features = ["derive","with-chrono-0_4"] }
postgres-types = { version = "0.2.4", features = ["derive","with-chrono-0_4","with-serde_json-1"] }
prometheus = { version = "0.13" }
prost = "0.11"
rand = "0.8"
Expand Down
114 changes: 114 additions & 0 deletions src/common/src/array/jsonb_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use postgres_types::{ToSql as _, Type};
use serde_json::Value;

use crate::types::{Scalar, ScalarImpl, ScalarRef};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct JsonbVal(Box<Value>);

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct JsonbRef<'a>(&'a Value);

impl Scalar for JsonbVal {
type ScalarRefType<'a> = JsonbRef<'a>;

fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
JsonbRef(self.0.as_ref())
}

fn to_scalar_value(self) -> ScalarImpl {
ScalarImpl::Jsonb(self)
}
}

impl<'a> ScalarRef<'a> for JsonbRef<'a> {
type ScalarType = JsonbVal;

fn to_owned_scalar(&self) -> Self::ScalarType {
JsonbVal(self.0.clone().into())
}

fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
use std::hash::Hash as _;
self.0.to_string().hash(state)
}
}

impl PartialOrd for JsonbVal {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for JsonbVal {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.as_scalar_ref().cmp(&other.as_scalar_ref())
}
}

impl PartialOrd for JsonbRef<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for JsonbRef<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.to_string().cmp(&other.0.to_string())
}
}

impl crate::types::to_text::ToText for JsonbRef<'_> {
fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
write!(f, "{}", self.0)
}

fn write_with_type<W: std::fmt::Write>(
&self,
_ty: &crate::types::DataType,
f: &mut W,
) -> std::fmt::Result {
self.write(f)
}
}

impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
fn to_binary_with_type(
&self,
_ty: &crate::types::DataType,
) -> crate::error::Result<Option<bytes::Bytes>> {
let mut output = bytes::BytesMut::new();
self.0.to_sql(&Type::JSONB, &mut output).unwrap();
Ok(Some(output.freeze()))
}
}

impl JsonbRef<'_> {
pub fn memcmp_serialize(
&self,
serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
) -> memcomparable::Result<()> {
let s = self.0.to_string();
serde::Serialize::serialize(&s, serializer)
}

pub fn value_serialize(&self) -> Vec<u8> {
let mut output = bytes::BytesMut::new();
self.0.to_sql(&Type::JSONB, &mut output).unwrap();
output.freeze().into()
}
}
2 changes: 2 additions & 0 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod decimal_array;
pub mod error;
pub mod interval_array;
mod iterator;
mod jsonb_array;
pub mod list_array;
mod macros;
mod primitive_array;
Expand All @@ -52,6 +53,7 @@ pub use data_chunk_iter::RowRef;
pub use decimal_array::{DecimalArray, DecimalArrayBuilder};
pub use interval_array::{IntervalArray, IntervalArrayBuilder};
pub use iterator::ArrayIterator;
pub use jsonb_array::{JsonbRef, JsonbVal};
pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue};
use paste::paste;
pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType};
Expand Down
6 changes: 4 additions & 2 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ use self::struct_type::StructType;
use self::to_binary::ToBinary;
use self::to_text::ToText;
use crate::array::{
read_interval_unit, ArrayBuilderImpl, ListRef, ListValue, PrimitiveArrayItemType, StructRef,
StructValue,
read_interval_unit, ArrayBuilderImpl, JsonbRef, JsonbVal, ListRef, ListValue,
PrimitiveArrayItemType, StructRef, StructValue,
};
use crate::error::Result as RwResult;

Expand Down Expand Up @@ -469,6 +469,7 @@ macro_rules! for_all_scalar_variants {
{ NaiveDate, naivedate, NaiveDateWrapper, NaiveDateWrapper },
{ NaiveDateTime, naivedatetime, NaiveDateTimeWrapper, NaiveDateTimeWrapper },
{ NaiveTime, naivetime, NaiveTimeWrapper, NaiveTimeWrapper },
{ Jsonb, jsonb, JsonbVal, JsonbRef<'scalar> },
{ Struct, struct, StructValue, StructRef<'scalar> },
{ List, list, ListValue, ListRef<'scalar> },
{ Bytea, bytea, Box<[u8]>, &'scalar [u8] }
Expand Down Expand Up @@ -854,6 +855,7 @@ impl ScalarRefImpl<'_> {
v.0.num_seconds_from_midnight().serialize(&mut *ser)?;
v.0.nanosecond().serialize(ser)?;
}
Self::Jsonb(v) => v.memcmp_serialize(ser)?,
Self::Struct(v) => v.memcmp_serialize(ser)?,
Self::List(v) => v.memcmp_serialize(ser)?,
};
Expand Down
1 change: 1 addition & 0 deletions src/common/src/types/to_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl ToBinary for ScalarRefImpl<'_> {
ScalarRefImpl::NaiveDateTime(v) => v.to_binary_with_type(ty),
ScalarRefImpl::NaiveTime(v) => v.to_binary_with_type(ty),
ScalarRefImpl::Bytea(v) => v.to_binary_with_type(ty),
ScalarRefImpl::Jsonb(v) => v.to_binary_with_type(ty),
ScalarRefImpl::Struct(_) => todo!(),
ScalarRefImpl::List(_) => todo!(),
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
ScalarRefImpl::NaiveTime(v) => {
serialize_naivetime(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
}
ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
ScalarRefImpl::List(v) => serialize_list(v, buf),
}
Expand Down