Skip to content

Commit

Permalink
use cow in path of json
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jun 7, 2024
1 parent 1a0f3ce commit 2ec8e20
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 85 deletions.
3 changes: 2 additions & 1 deletion src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ mod old_json_parser {
let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type))
match writer
.do_insert(|column| accessor.access_cow(&[&column.name], &column.data_type))
{
Ok(_) => {}
Err(err) => errors.push(err),
Expand Down
1 change: 1 addition & 0 deletions src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl<'a> AvroParseOptions<'a> {
}
}

// TODO: No need to use two lifetimes here.
pub struct AvroAccess<'a, 'b> {
value: &'a Value,
options: AvroParseOptions<'b>,
Expand Down
25 changes: 23 additions & 2 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
pub mod avro;
pub mod utils;

use risingwave_common::types::{DataType, Datum};
use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum};
use thiserror::Error;
use thiserror_ext::Macro;

Expand Down Expand Up @@ -43,6 +43,9 @@ pub enum AccessError {

pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;

/// Access to a field in the data structure.
///
/// Only one of these two methods should be implemented. See documentation for more details.
pub trait Access {
/// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
/// and then converts it to RisingWave `Datum`.
Expand All @@ -53,5 +56,23 @@ pub trait Access {
/// and we use different `path` to access one column at a time.
///
/// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`.
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum>;
// #[deprecated(note = "Use `access_cow` instead.")]
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access_cow(path, type_expected)
.map(ToOwnedDatum::to_owned_datum)
}

/// Similar to `access`, but may return a borrowed [`DatumCow::Ref`] to avoid unnecessary allocation.
/// If not overridden, it will call forward to `access` and always wrap the result in [`DatumCow::Owned`].
///
/// This should be preferred over `access` for both callers and implementors.
// TODO: remove `access` and make this the only method.
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
// #[expect(deprecated)]
self.access(path, type_expected).map(Into::into)
}
}
1 change: 0 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ mod util;

pub use debezium::DEBEZIUM_IGNORE_KEY;
use risingwave_common::buffer::BitmapBuilder;
pub use unified::json::JsonBorrowAccess;
pub use unified::{AccessError, AccessResult};

/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
Expand Down
91 changes: 20 additions & 71 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{
DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, ScalarRefImpl, Time,
Timestamp, Timestamptz,
DataType, Date, Datum, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp,
Timestamptz, ToOwnedDatum,
};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector_codec::decoder::utils::extract_decimal;
Expand Down Expand Up @@ -201,30 +201,21 @@ impl JsonParseOptions {
}
}

pub fn parse_borrow<'a>(
pub fn parse<'a>(
&self,
value: &'a BorrowedValue<'a>,
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
Ok(match (type_expected, value.value_type()) {
(DataType::Varchar, ValueType::String) => {
DatumCow::Ref(Some(ScalarRefImpl::Utf8(value.as_str().unwrap().into())))
}
_ => self.parse(value, type_expected)?.into(),
})
}

pub fn parse(&self, value: &BorrowedValue<'_>, type_expected: &DataType) -> AccessResult {
let create_error = || AccessError::TypeError {
expected: format!("{:?}", type_expected),
got: value.value_type().to_string(),
value: value.to_string(),
};

let v: ScalarImpl = match (type_expected, value.value_type()) {
(_, ValueType::Null) => return Ok(None),
(_, ValueType::Null) => return Ok(Datum::None.into()),
// ---- Boolean -----
(DataType::Boolean , ValueType::Bool) => value.as_bool().unwrap().into(),
(DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(),

(
DataType::Boolean,
Expand Down Expand Up @@ -438,7 +429,7 @@ impl JsonParseOptions {
.map_err(|_| create_error())?
.into(),
// ---- Varchar -----
(DataType::Varchar , ValueType::String) => value.as_str().unwrap().into(),
(DataType::Varchar, ValueType::String) => return Ok(DatumCow::Ref(Some(value.as_str().unwrap().into()))),
(
DataType::Varchar,
ValueType::Bool
Expand Down Expand Up @@ -548,7 +539,7 @@ impl JsonParseOptions {
}
&BorrowedValue::Static(simd_json::StaticNode::Null)
});
self.parse(field_value, field_type)
self.parse(field_value, field_type).map(|d| d.to_owned_datum())
})
.collect::<Result<_, _>>()?,
)
Expand All @@ -564,7 +555,7 @@ impl JsonParseOptions {
let mut value = value.as_str().unwrap().as_bytes().to_vec();
let value =
simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?;
return self.parse(&value, type_expected);
return self.parse(&value, type_expected).map(|d| d.to_owned_datum().into());
}

// ---- List -----
Expand Down Expand Up @@ -619,30 +610,31 @@ impl JsonParseOptions {

(_expected, _got) => Err(create_error())?,
};
Ok(Some(v))
Ok(DatumCow::Owned(Some(v)))
}
}

pub struct JsonAccess<'a, 'b> {
value: BorrowedValue<'b>,
pub struct JsonAccess<'a> {
value: BorrowedValue<'a>,
options: &'a JsonParseOptions,
}

impl<'a, 'b> JsonAccess<'a, 'b> {
pub fn new_with_options(value: BorrowedValue<'b>, options: &'a JsonParseOptions) -> Self {
impl<'a> JsonAccess<'a> {
pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self {
Self { value, options }
}

pub fn new(value: BorrowedValue<'b>) -> Self {
pub fn new(value: BorrowedValue<'a>) -> Self {
Self::new_with_options(value, &JsonParseOptions::DEFAULT)
}
}

impl<'a, 'b> Access for JsonAccess<'a, 'b>
where
'a: 'b,
{
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult {
impl Access for JsonAccess<'_> {
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
let mut value = &self.value;
for (idx, &key) in path.iter().enumerate() {
if let Some(sub_value) = if self.options.ignoring_keycase {
Expand All @@ -662,46 +654,3 @@ where
self.options.parse(value, type_expected)
}
}

#[derive(Clone, Copy)]
pub struct JsonBorrowAccess<'a, 'b> {
value: &'b BorrowedValue<'b>,
options: &'a JsonParseOptions,
}

impl<'a, 'b> JsonBorrowAccess<'a, 'b> {
pub fn new(value: &'b BorrowedValue<'b>) -> Self {
Self {
value,
options: &JsonParseOptions::DEFAULT,
}
}
}

impl<'a, 'b> JsonBorrowAccess<'a, 'b>
where
'a: 'b,
{
pub fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'b>> {
let value = {
let mut value = self.value;
for (idx, &key) in path.iter().enumerate() {
if let Some(sub_value) = if self.options.ignoring_keycase {
json_object_get_case_insensitive(value, key)
} else {
value.get(key)
} {
value = sub_value;
} else {
Err(AccessError::Undefined {
name: key.to_string(),
path: path.iter().take(idx).join("."),
})?;
}
}
value
};

self.options.parse_borrow(value, type_expected)
}
}
14 changes: 7 additions & 7 deletions src/connector/src/parser/unified/kv_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, DatumCow};
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;

use super::Access;
use super::{Access, AccessResult};
use crate::parser::unified::AccessError;
use crate::source::SourceColumnDesc;

Expand Down Expand Up @@ -54,9 +54,9 @@ where
K: Access,
V: Access,
{
fn access_key(&self, path: &[&str], type_expected: &DataType) -> super::AccessResult {
fn access_key(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'_>> {
if let Some(ka) = &self.key_accessor {
ka.access(path, type_expected)
ka.access_cow(path, type_expected)
} else {
Err(AccessError::Undefined {
name: "key".to_string(),
Expand All @@ -65,9 +65,9 @@ where
}
}

fn access_value(&self, path: &[&str], type_expected: &DataType) -> super::AccessResult {
fn access_value(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'_>> {
if let Some(va) = &self.value_accessor {
va.access(path, type_expected)
va.access_cow(path, type_expected)
} else {
Err(AccessError::Undefined {
name: "value".to_string(),
Expand All @@ -76,7 +76,7 @@ where
}
}

pub fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult {
pub fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>> {
match desc.additional_column.column_type {
Some(AdditionalColumnType::Key(_)) => self.access_key(&[&desc.name], &desc.data_type),
None => self.access_value(&[&desc.name], &desc.data_type),
Expand Down
21 changes: 18 additions & 3 deletions src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Unified parsers for both normal events or CDC events of multiple message formats
use auto_impl::auto_impl;
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, DatumCow};
pub use risingwave_connector_codec::decoder::{
bail_uncategorized, uncategorized, Access, AccessError, AccessResult,
};
Expand All @@ -40,8 +40,8 @@ pub enum AccessImpl<'a, 'b> {
Avro(AvroAccess<'a, 'b>),
Bytes(BytesAccess<'a>),
Protobuf(ProtobufAccess),
Json(JsonAccess<'a, 'b>),
MongoJson(MongoJsonAccess<JsonAccess<'a, 'b>>),
Json(JsonAccess<'a>),
MongoJson(MongoJsonAccess<JsonAccess<'a>>),
}

impl Access for AccessImpl<'_, '_> {
Expand All @@ -54,6 +54,20 @@ impl Access for AccessImpl<'_, '_> {
Self::MongoJson(accessor) => accessor.access(path, type_expected),
}
}

fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
match self {
Self::Avro(accessor) => accessor.access_cow(path, type_expected),
Self::Bytes(accessor) => accessor.access_cow(path, type_expected),
Self::Protobuf(accessor) => accessor.access_cow(path, type_expected),
Self::Json(accessor) => accessor.access_cow(path, type_expected),
Self::MongoJson(accessor) => accessor.access_cow(path, type_expected),
}
}
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -68,6 +82,7 @@ pub trait ChangeEvent {
/// Access the operation type.
fn op(&self) -> AccessResult<ChangeEventOperation>;
/// Access the field.
// TODO: return `DatumCow`
fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult;
}

Expand Down

0 comments on commit 2ec8e20

Please sign in to comment.