diff --git a/src/connector/benches/json_vs_plain_parser.rs b/src/connector/benches/json_vs_plain_parser.rs index b9a1139dcb03b..603911156eb04 100644 --- a/src/connector/benches/json_vs_plain_parser.rs +++ b/src/connector/benches/json_vs_plain_parser.rs @@ -83,7 +83,8 @@ mod old_json_parser { let mut errors = Vec::new(); for value in values { let accessor = JsonAccess::new(value); - match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type)) + match writer + .do_insert(|column| accessor.access_cow(&[&column.name], &column.data_type)) { Ok(_) => {} Err(err) => errors.push(err), diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index 57ec0ad61dbab..fd71409668a4d 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -266,6 +266,7 @@ impl<'a> AvroParseOptions<'a> { } } +// TODO: No need to use two lifetimes here. pub struct AvroAccess<'a, 'b> { value: &'a Value, options: AvroParseOptions<'b>, diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index b38d1db0a89db..de948855517eb 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -15,7 +15,7 @@ pub mod avro; pub mod utils; -use risingwave_common::types::{DataType, Datum}; +use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum}; use thiserror::Error; use thiserror_ext::Macro; @@ -43,6 +43,9 @@ pub enum AccessError { pub type AccessResult = std::result::Result; +/// Access to a field in the data structure. +/// +/// Only one of these two methods should be implemented. See documentation for more details. pub trait Access { /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data), /// and then converts it to RisingWave `Datum`. @@ -53,5 +56,23 @@ pub trait Access { /// and we use different `path` to access one column at a time. /// /// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`. - fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult; + // #[deprecated(note = "Use `access_cow` instead.")] + fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult { + self.access_cow(path, type_expected) + .map(ToOwnedDatum::to_owned_datum) + } + + /// Similar to `access`, but may return a borrowed [`DatumCow::Ref`] to avoid unnecessary allocation. + /// If not overridden, it will call forward to `access` and always wrap the result in [`DatumCow::Owned`]. + /// + /// This should be preferred over `access` for both callers and implementors. + // TODO: remove `access` and make this the only method. + fn access_cow<'a>( + &'a self, + path: &[&str], + type_expected: &DataType, + ) -> AccessResult> { + // #[expect(deprecated)] + self.access(path, type_expected).map(Into::into) + } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 78898fbdef58a..6b5901d8dd302 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -87,7 +87,6 @@ mod util; pub use debezium::DEBEZIUM_IGNORE_KEY; use risingwave_common::buffer::BitmapBuilder; -pub use unified::json::JsonBorrowAccess; pub use unified::{AccessError, AccessResult}; /// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`]. diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index d88520742cd73..c2f74c375f2bb 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -22,8 +22,8 @@ use risingwave_common::array::{ListValue, StructValue}; use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea}; use risingwave_common::log::LogSuppresser; use risingwave_common::types::{ - DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, ScalarRefImpl, Time, - Timestamp, Timestamptz, + DataType, Date, Datum, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, + Timestamptz, ToOwnedDatum, }; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector_codec::decoder::utils::extract_decimal; @@ -201,20 +201,11 @@ impl JsonParseOptions { } } - pub fn parse_borrow<'a>( + pub fn parse<'a>( &self, value: &'a BorrowedValue<'a>, type_expected: &DataType, ) -> AccessResult> { - Ok(match (type_expected, value.value_type()) { - (DataType::Varchar, ValueType::String) => { - DatumCow::Ref(Some(ScalarRefImpl::Utf8(value.as_str().unwrap().into()))) - } - _ => self.parse(value, type_expected)?.into(), - }) - } - - pub fn parse(&self, value: &BorrowedValue<'_>, type_expected: &DataType) -> AccessResult { let create_error = || AccessError::TypeError { expected: format!("{:?}", type_expected), got: value.value_type().to_string(), @@ -222,9 +213,9 @@ impl JsonParseOptions { }; let v: ScalarImpl = match (type_expected, value.value_type()) { - (_, ValueType::Null) => return Ok(None), + (_, ValueType::Null) => return Ok(Datum::None.into()), // ---- Boolean ----- - (DataType::Boolean , ValueType::Bool) => value.as_bool().unwrap().into(), + (DataType::Boolean, ValueType::Bool) => value.as_bool().unwrap().into(), ( DataType::Boolean, @@ -438,7 +429,7 @@ impl JsonParseOptions { .map_err(|_| create_error())? .into(), // ---- Varchar ----- - (DataType::Varchar , ValueType::String) => value.as_str().unwrap().into(), + (DataType::Varchar, ValueType::String) => return Ok(DatumCow::Ref(Some(value.as_str().unwrap().into()))), ( DataType::Varchar, ValueType::Bool @@ -548,7 +539,7 @@ impl JsonParseOptions { } &BorrowedValue::Static(simd_json::StaticNode::Null) }); - self.parse(field_value, field_type) + self.parse(field_value, field_type).map(|d| d.to_owned_datum()) }) .collect::>()?, ) @@ -564,7 +555,7 @@ impl JsonParseOptions { let mut value = value.as_str().unwrap().as_bytes().to_vec(); let value = simd_json::to_borrowed_value(&mut value[..]).map_err(|_| create_error())?; - return self.parse(&value, type_expected); + return self.parse(&value, type_expected).map(|d| d.to_owned_datum().into()); } // ---- List ----- @@ -619,30 +610,31 @@ impl JsonParseOptions { (_expected, _got) => Err(create_error())?, }; - Ok(Some(v)) + Ok(DatumCow::Owned(Some(v))) } } -pub struct JsonAccess<'a, 'b> { - value: BorrowedValue<'b>, +pub struct JsonAccess<'a> { + value: BorrowedValue<'a>, options: &'a JsonParseOptions, } -impl<'a, 'b> JsonAccess<'a, 'b> { - pub fn new_with_options(value: BorrowedValue<'b>, options: &'a JsonParseOptions) -> Self { +impl<'a> JsonAccess<'a> { + pub fn new_with_options(value: BorrowedValue<'a>, options: &'a JsonParseOptions) -> Self { Self { value, options } } - pub fn new(value: BorrowedValue<'b>) -> Self { + pub fn new(value: BorrowedValue<'a>) -> Self { Self::new_with_options(value, &JsonParseOptions::DEFAULT) } } -impl<'a, 'b> Access for JsonAccess<'a, 'b> -where - 'a: 'b, -{ - fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult { +impl Access for JsonAccess<'_> { + fn access_cow<'a>( + &'a self, + path: &[&str], + type_expected: &DataType, + ) -> AccessResult> { let mut value = &self.value; for (idx, &key) in path.iter().enumerate() { if let Some(sub_value) = if self.options.ignoring_keycase { @@ -662,46 +654,3 @@ where self.options.parse(value, type_expected) } } - -#[derive(Clone, Copy)] -pub struct JsonBorrowAccess<'a, 'b> { - value: &'b BorrowedValue<'b>, - options: &'a JsonParseOptions, -} - -impl<'a, 'b> JsonBorrowAccess<'a, 'b> { - pub fn new(value: &'b BorrowedValue<'b>) -> Self { - Self { - value, - options: &JsonParseOptions::DEFAULT, - } - } -} - -impl<'a, 'b> JsonBorrowAccess<'a, 'b> -where - 'a: 'b, -{ - pub fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult> { - let value = { - let mut value = self.value; - for (idx, &key) in path.iter().enumerate() { - if let Some(sub_value) = if self.options.ignoring_keycase { - json_object_get_case_insensitive(value, key) - } else { - value.get(key) - } { - value = sub_value; - } else { - Err(AccessError::Undefined { - name: key.to_string(), - path: path.iter().take(idx).join("."), - })?; - } - } - value - }; - - self.options.parse_borrow(value, type_expected) - } -} diff --git a/src/connector/src/parser/unified/kv_event.rs b/src/connector/src/parser/unified/kv_event.rs index b67952e66c167..afc059772e210 100644 --- a/src/connector/src/parser/unified/kv_event.rs +++ b/src/connector/src/parser/unified/kv_event.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, DatumCow}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; -use super::Access; +use super::{Access, AccessResult}; use crate::parser::unified::AccessError; use crate::source::SourceColumnDesc; @@ -54,9 +54,9 @@ where K: Access, V: Access, { - fn access_key(&self, path: &[&str], type_expected: &DataType) -> super::AccessResult { + fn access_key(&self, path: &[&str], type_expected: &DataType) -> AccessResult> { if let Some(ka) = &self.key_accessor { - ka.access(path, type_expected) + ka.access_cow(path, type_expected) } else { Err(AccessError::Undefined { name: "key".to_string(), @@ -65,9 +65,9 @@ where } } - fn access_value(&self, path: &[&str], type_expected: &DataType) -> super::AccessResult { + fn access_value(&self, path: &[&str], type_expected: &DataType) -> AccessResult> { if let Some(va) = &self.value_accessor { - va.access(path, type_expected) + va.access_cow(path, type_expected) } else { Err(AccessError::Undefined { name: "value".to_string(), @@ -76,7 +76,7 @@ where } } - pub fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult { + pub fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult> { match desc.additional_column.column_type { Some(AdditionalColumnType::Key(_)) => self.access_key(&[&desc.name], &desc.data_type), None => self.access_value(&[&desc.name], &desc.data_type), diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index 234d7c2ad7492..fb6dadb09c0f7 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -15,7 +15,7 @@ //! Unified parsers for both normal events or CDC events of multiple message formats use auto_impl::auto_impl; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, DatumCow}; pub use risingwave_connector_codec::decoder::{ bail_uncategorized, uncategorized, Access, AccessError, AccessResult, }; @@ -40,8 +40,8 @@ pub enum AccessImpl<'a, 'b> { Avro(AvroAccess<'a, 'b>), Bytes(BytesAccess<'a>), Protobuf(ProtobufAccess), - Json(JsonAccess<'a, 'b>), - MongoJson(MongoJsonAccess>), + Json(JsonAccess<'a>), + MongoJson(MongoJsonAccess>), } impl Access for AccessImpl<'_, '_> { @@ -54,6 +54,20 @@ impl Access for AccessImpl<'_, '_> { Self::MongoJson(accessor) => accessor.access(path, type_expected), } } + + fn access_cow<'a>( + &'a self, + path: &[&str], + type_expected: &DataType, + ) -> AccessResult> { + match self { + Self::Avro(accessor) => accessor.access_cow(path, type_expected), + Self::Bytes(accessor) => accessor.access_cow(path, type_expected), + Self::Protobuf(accessor) => accessor.access_cow(path, type_expected), + Self::Json(accessor) => accessor.access_cow(path, type_expected), + Self::MongoJson(accessor) => accessor.access_cow(path, type_expected), + } + } } #[derive(Debug, Clone, Copy)] @@ -68,6 +82,7 @@ pub trait ChangeEvent { /// Access the operation type. fn op(&self) -> AccessResult; /// Access the field. + // TODO: return `DatumCow` fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult; }