diff --git a/Cargo.lock b/Cargo.lock index 2423dea315854..5d73cdf712f51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10675,6 +10675,7 @@ dependencies = [ "anyhow", "apache-avro 0.16.0", "chrono", + "easy-ext", "itertools 0.12.1", "jsonbb", "num-bigint", diff --git a/src/common/src/types/cow.rs b/src/common/src/types/cow.rs index bc0b6065dd605..928af654b0366 100644 --- a/src/common/src/types/cow.rs +++ b/src/common/src/types/cow.rs @@ -18,6 +18,19 @@ use super::{Datum, DatumRef, ToDatumRef, ToOwnedDatum}; /// /// We do not use [`std::borrow::Cow`] because it requires the borrowed variant /// to be a reference, whereas what we have is a [`DatumRef`] with a lifetime. +/// +/// # Usage +/// +/// Generally, you don't need to match on the variants of `DatumCow` to access +/// the underlying datum. Instead, you can... +/// +/// - call [`to_datum_ref`](ToDatumRef::to_datum_ref) to get a borrowed +/// [`DatumRef`] without any allocation, which can be used to append to an +/// array builder or to encode into the storage representation, +/// +/// - call [`to_owned_datum`](ToOwnedDatum::to_owned_datum) to get an owned +/// [`Datum`] with potentially an allocation, which can be used to store in a +/// struct without lifetime constraints. #[derive(Debug, Clone)] pub enum DatumCow<'a> { Borrowed(DatumRef<'a>), diff --git a/src/connector/benches/json_vs_plain_parser.rs b/src/connector/benches/json_vs_plain_parser.rs index 603911156eb04..b9a1139dcb03b 100644 --- a/src/connector/benches/json_vs_plain_parser.rs +++ b/src/connector/benches/json_vs_plain_parser.rs @@ -83,8 +83,7 @@ mod old_json_parser { let mut errors = Vec::new(); for value in values { let accessor = JsonAccess::new(value); - match writer - .do_insert(|column| accessor.access_cow(&[&column.name], &column.data_type)) + match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type)) { Ok(_) => {} Err(err) => errors.push(err), diff --git a/src/connector/codec/Cargo.toml b/src/connector/codec/Cargo.toml index a7b45aaa640bc..ef12b325d446d 100644 --- a/src/connector/codec/Cargo.toml +++ b/src/connector/codec/Cargo.toml @@ -26,6 +26,7 @@ chrono = { version = "0.4", default-features = false, features = [ "clock", "std", ] } +easy-ext = "1" itertools = { workspace = true } jsonbb = { workspace = true } num-bigint = "0.4" diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index fd71409668a4d..e48eecfb7e2cf 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -25,7 +25,7 @@ use risingwave_common::array::{ListValue, StructValue}; use risingwave_common::bail; use risingwave_common::log::LogSuppresser; use risingwave_common::types::{ - DataType, Date, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, + DataType, Date, DatumCow, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, }; use risingwave_common::util::iter_util::ZipEqFast; @@ -266,23 +266,19 @@ impl<'a> AvroParseOptions<'a> { } } -// TODO: No need to use two lifetimes here. -pub struct AvroAccess<'a, 'b> { +pub struct AvroAccess<'a> { value: &'a Value, - options: AvroParseOptions<'b>, + options: AvroParseOptions<'a>, } -impl<'a, 'b> AvroAccess<'a, 'b> { - pub fn new(value: &'a Value, options: AvroParseOptions<'b>) -> Self { +impl<'a> AvroAccess<'a> { + pub fn new(value: &'a Value, options: AvroParseOptions<'a>) -> Self { Self { value, options } } } -impl<'a, 'b> Access for AvroAccess<'a, 'b> -where - 'a: 'b, -{ - fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult { +impl Access for AvroAccess<'_> { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { let mut value = self.value; let mut options: AvroParseOptions<'_> = self.options.clone(); @@ -312,7 +308,10 @@ where Err(create_error())?; } - options.convert_to_datum(value, type_expected) + // TODO: may borrow the value directly + options + .convert_to_datum(value, type_expected) + .map(Into::into) } } diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index 8e5d15221970f..d71186815697e 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -44,33 +44,36 @@ pub enum AccessError { pub type AccessResult = std::result::Result; /// Access to a field in the data structure. -/// -/// Only one of these two methods should be implemented. See documentation for more details. pub trait Access { /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data), /// and then converts it to RisingWave `Datum`. /// `type_expected` might or might not be used during the conversion depending on the implementation. /// + /// # Path + /// /// We usually expect the data is a record (struct), and `path` represents field path. /// The data (or part of the data) represents the whole row (`Vec`), /// and we use different `path` to access one column at a time. /// /// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`. - fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult { - self.access_cow(path, type_expected) - .map(ToOwnedDatum::to_owned_datum) - } + /// + /// # Returns + /// + /// The implementation should prefer to return a borrowed [`DatumRef`](risingwave_common::types::DatumRef) + /// through [`DatumCow::Borrowed`] to avoid unnecessary allocation if possible, especially for fields + /// with string or bytes data. If that's not the case, it may return an owned [`Datum`] through + /// [`DatumCow::Owned`]. + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult>; +} - /// Similar to `access`, but may return a borrowed [`DatumCow::Borrowed`] to avoid unnecessary allocation. - /// If not overridden, it will call forward to `access` and always wrap the result in [`DatumCow::Owned`]. +// Note: made an extension trait to disallow implementing or overriding `access_owned`. +#[easy_ext::ext(AccessExt)] +impl A { + /// Similar to `access`, but always returns an owned [`Datum`]. See [`Access::access`] for more details. /// - /// This should be preferred over `access` for both callers and implementors. - // TODO: implement this method in all parsers and remove `access` method. - fn access_cow<'a>( - &'a self, - path: &[&str], - type_expected: &DataType, - ) -> AccessResult> { - self.access(path, type_expected).map(Into::into) + /// Always prefer calling `access` directly if possible to avoid unnecessary allocation. + pub fn access_owned(&self, path: &[&str], type_expected: &DataType) -> AccessResult { + self.access(path, type_expected) + .map(ToOwnedDatum::to_owned_datum) } } diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index e1b2b546b3de7..4b73c6ac7c80f 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -42,7 +42,7 @@ pub struct AvroAccessBuilder { } impl AccessBuilder for AvroAccessBuilder { - async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { + async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { self.value = self.parse_avro_value(&payload).await?; Ok(AccessImpl::Avro(AvroAccess::new( self.value.as_ref().unwrap(), diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 255c3ef829c79..5df7fa28118d3 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -26,7 +26,7 @@ pub struct BytesAccessBuilder { impl AccessBuilder for BytesAccessBuilder { #[allow(clippy::unused_async)] - async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { + async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { Ok(AccessImpl::Bytes(BytesAccess::new( &self.column_name, payload, diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 140d354312516..430d5072a88db 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -44,7 +44,7 @@ pub struct DebeziumAvroAccessBuilder { // TODO: reduce encodingtype match impl AccessBuilder for DebeziumAvroAccessBuilder { - async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { + async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { let (schema_id, mut raw_payload) = extract_schema_id(&payload)?; let schema = self.schema_resolver.get_by_id(schema_id).await?; self.value = Some(from_avro_datum(schema.as_ref(), &mut raw_payload, None)?); diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index c02ae8f655150..85399ed772768 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -41,7 +41,7 @@ impl DebeziumJsonAccessBuilder { impl AccessBuilder for DebeziumJsonAccessBuilder { #[allow(clippy::unused_async)] - async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { + async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { self.value = Some(payload); let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(self.value.as_mut().unwrap()) @@ -79,7 +79,7 @@ impl DebeziumMongoJsonAccessBuilder { impl AccessBuilder for DebeziumMongoJsonAccessBuilder { #[allow(clippy::unused_async)] - async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { + async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { self.value = Some(payload); let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(self.value.as_mut().unwrap()) diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index e327307327cea..9d62d76eff7a7 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -46,7 +46,7 @@ pub struct JsonAccessBuilder { impl AccessBuilder for JsonAccessBuilder { #[allow(clippy::unused_async)] - async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { + async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { // XXX: When will we enter this branch? if payload.is_empty() { self.value = Some("{}".into()); diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 6b5901d8dd302..d439da2491e6b 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -842,7 +842,7 @@ async fn into_chunk_stream_inner( } pub trait AccessBuilder { - async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult>; + async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult>; } #[derive(Debug)] @@ -887,10 +887,7 @@ impl AccessBuilderImpl { Ok(accessor) } - pub async fn generate_accessor( - &mut self, - payload: Vec, - ) -> ConnectorResult> { + pub async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { let accessor = match self { Self::Avro(builder) => builder.generate_accessor(payload).await?, Self::Protobuf(builder) => builder.generate_accessor(payload).await?, diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 6fc2b1b935ec7..f34fd29837174 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -102,7 +102,7 @@ impl PlainParser { }; } - let mut row_op: KvEvent, AccessImpl<'_, '_>> = KvEvent::default(); + let mut row_op: KvEvent, AccessImpl<'_>> = KvEvent::default(); if let Some(data) = key && let Some(key_builder) = self.key_builder.as_mut() diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index ed57c146516f0..b5df0aeb83909 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -46,7 +46,7 @@ pub struct ProtobufAccessBuilder { impl AccessBuilder for ProtobufAccessBuilder { #[allow(clippy::unused_async)] - async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { + async fn generate_accessor(&mut self, payload: Vec) -> ConnectorResult> { let payload = if self.confluent_wire_type { resolve_pb_header(&payload)? } else { @@ -583,6 +583,7 @@ mod test { use prost::Message; use risingwave_common::types::StructType; + use risingwave_connector_codec::decoder::AccessExt; use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::data::data_type::PbTypeName; use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; @@ -591,7 +592,6 @@ mod test { use super::*; use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage}; use crate::parser::protobuf::recursive::AllTypes; - use crate::parser::unified::Access; use crate::parser::SpecificParserConfig; fn schema_dir() -> String { @@ -896,7 +896,7 @@ mod test { fn pb_eq(a: &ProtobufAccess, field_name: &str, value: ScalarImpl) { let dummy_type = DataType::Varchar; - let d = a.access(&[field_name], &dummy_type).unwrap().unwrap(); + let d = a.access_owned(&[field_name], &dummy_type).unwrap().unwrap(); assert_eq!(d, value, "field: {} value: {:?}", field_name, d); } diff --git a/src/connector/src/parser/unified/bytes.rs b/src/connector/src/parser/unified/bytes.rs index f9064c3ec3079..2f8f27124a307 100644 --- a/src/connector/src/parser/unified/bytes.rs +++ b/src/connector/src/parser/unified/bytes.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl}; use super::{Access, AccessError, AccessResult}; @@ -29,14 +29,16 @@ impl<'a> BytesAccess<'a> { } } -impl<'a> Access for BytesAccess<'a> { +impl Access for BytesAccess<'_> { /// path is empty currently, `type_expected` should be `Bytea` - fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { if let DataType::Bytea = type_expected { if self.column_name.is_none() || (path.len() == 1 && self.column_name.as_ref().unwrap() == path[0]) { - return Ok(Some(ScalarImpl::Bytea(Box::from(self.bytes.as_slice())))); + return Ok(DatumCow::Borrowed(Some(ScalarRefImpl::Bytea( + self.bytes.as_slice(), + )))); } return Err(AccessError::Undefined { name: path[0].to_string(), diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index a353a39369c32..d90463698577d 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{DataType, Datum, Scalar, ScalarImpl, Timestamptz}; +use risingwave_common::types::{ + DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef, +}; +use risingwave_connector_codec::decoder::AccessExt; use risingwave_pb::plan_common::additional_column::ColumnType; use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation}; @@ -84,20 +87,26 @@ pub fn parse_transaction_meta( accessor: &impl Access, connector_props: &ConnectorProperties, ) -> AccessResult { - if let (Some(ScalarImpl::Utf8(status)), Some(ScalarImpl::Utf8(id))) = ( - accessor.access(&[TRANSACTION_STATUS], &DataType::Varchar)?, - accessor.access(&[TRANSACTION_ID], &DataType::Varchar)?, + if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = ( + accessor + .access(&[TRANSACTION_STATUS], &DataType::Varchar)? + .to_datum_ref(), + accessor + .access(&[TRANSACTION_ID], &DataType::Varchar)? + .to_datum_ref(), ) { // The id field has different meanings for different databases: // PG: txID:LSN // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23) - match status.as_ref() { + match status { DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props { ConnectorProperties::PostgresCdc(_) => { let (tx_id, _) = id.split_once(':').unwrap(); return Ok(TransactionControl::Begin { id: tx_id.into() }); } - ConnectorProperties::MysqlCdc(_) => return Ok(TransactionControl::Begin { id }), + ConnectorProperties::MysqlCdc(_) => { + return Ok(TransactionControl::Begin { id: id.into() }) + } _ => {} }, DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props { @@ -105,7 +114,9 @@ pub fn parse_transaction_meta( let (tx_id, _) = id.split_once(':').unwrap(); return Ok(TransactionControl::Commit { id: tx_id.into() }); } - ConnectorProperties::MysqlCdc(_) => return Ok(TransactionControl::Commit { id }), + ConnectorProperties::MysqlCdc(_) => { + return Ok(TransactionControl::Commit { id: id.into() }) + } _ => {} }, _ => {} @@ -160,7 +171,7 @@ impl ChangeEvent for DebeziumChangeEvent where A: Access, { - fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult { + fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult> { match self.op()? { ChangeEventOperation::Delete => { // For delete events of MongoDB, the "before" and "after" field both are null in the value, @@ -201,12 +212,12 @@ where .value_accessor .as_ref() .expect("value_accessor must be provided for upsert operation") - .access(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?; - Ok(ts_ms.map(|scalar| { + .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?; + Ok(DatumCow::Owned(ts_ms.map(|scalar| { Timestamptz::from_millis(scalar.into_int64()) .expect("source.ts_ms must in millisecond") .to_scalar_value() - })) + }))) } ColumnType::DatabaseName(_) => self .value_accessor @@ -240,8 +251,10 @@ where fn op(&self) -> Result { if let Some(accessor) = &self.value_accessor { - if let Some(ScalarImpl::Utf8(op)) = accessor.access(&[OP], &DataType::Varchar)? { - match op.as_ref() { + if let Some(ScalarRefImpl::Utf8(op)) = + accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref() + { + match op { DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => { return Ok(ChangeEventOperation::Upsert) } @@ -327,12 +340,12 @@ impl Access for MongoJsonAccess where A: Access, { - fn access(&self, path: &[&str], type_expected: &DataType) -> super::AccessResult { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { match path { ["after" | "before", "_id"] => { - let payload = self.access(&[path[0]], &DataType::Jsonb)?; + let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?; if let Some(ScalarImpl::Jsonb(bson_doc)) = payload { - Ok(extract_bson_id(type_expected, &bson_doc.take())?) + Ok(extract_bson_id(type_expected, &bson_doc.take())?.into()) } else { // fail to extract the "_id" field from the message payload Err(AccessError::Undefined { @@ -348,9 +361,9 @@ where ["_id"] => { let ret = self.accessor.access(path, type_expected); if matches!(ret, Err(AccessError::Undefined { .. })) { - let id_bson = self.accessor.access(&["id"], &DataType::Jsonb)?; + let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?; if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson { - Ok(extract_bson_id(type_expected, &bson_doc.take())?) + Ok(extract_bson_id(type_expected, &bson_doc.take())?.into()) } else { // fail to extract the "_id" field from the message key Err(AccessError::Undefined { diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index a467f1a0f98ce..e72234cff04f9 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -630,11 +630,7 @@ impl<'a> JsonAccess<'a> { } impl Access for JsonAccess<'_> { - fn access_cow<'a>( - &'a self, - path: &[&str], - type_expected: &DataType, - ) -> AccessResult> { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { let mut value = &self.value; for (idx, &key) in path.iter().enumerate() { if let Some(sub_value) = if self.options.ignoring_keycase { diff --git a/src/connector/src/parser/unified/kv_event.rs b/src/connector/src/parser/unified/kv_event.rs index afc059772e210..7e52d2f4c3c24 100644 --- a/src/connector/src/parser/unified/kv_event.rs +++ b/src/connector/src/parser/unified/kv_event.rs @@ -56,7 +56,7 @@ where { fn access_key(&self, path: &[&str], type_expected: &DataType) -> AccessResult> { if let Some(ka) = &self.key_accessor { - ka.access_cow(path, type_expected) + ka.access(path, type_expected) } else { Err(AccessError::Undefined { name: "key".to_string(), @@ -67,7 +67,7 @@ where fn access_value(&self, path: &[&str], type_expected: &DataType) -> AccessResult> { if let Some(va) = &self.value_accessor { - va.access_cow(path, type_expected) + va.access(path, type_expected) } else { Err(AccessError::Undefined { name: "value".to_string(), diff --git a/src/connector/src/parser/unified/maxwell.rs b/src/connector/src/parser/unified/maxwell.rs index 6abe86583988e..0bc6e7ab1d94a 100644 --- a/src/connector/src/parser/unified/maxwell.rs +++ b/src/connector/src/parser/unified/maxwell.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl, ToDatumRef}; use super::{Access, ChangeEvent}; use crate::parser::unified::ChangeEventOperation; @@ -36,8 +36,10 @@ where { fn op(&self) -> std::result::Result { const OP: &str = "type"; - if let Some(ScalarImpl::Utf8(op)) = self.0.access(&[OP], &DataType::Varchar)? { - match op.as_ref() { + if let Some(ScalarRefImpl::Utf8(op)) = + self.0.access(&[OP], &DataType::Varchar)?.to_datum_ref() + { + match op { MAXWELL_INSERT_OP | MAXWELL_UPDATE_OP => return Ok(ChangeEventOperation::Upsert), MAXWELL_DELETE_OP => return Ok(ChangeEventOperation::Delete), _ => (), @@ -49,7 +51,7 @@ where }) } - fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult { + fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult> { const DATA: &str = "data"; self.0.access(&[DATA, &desc.name], &desc.data_type) } diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index fb6dadb09c0f7..6c8316042026f 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -36,16 +36,16 @@ pub mod maxwell; pub mod protobuf; pub mod util; -pub enum AccessImpl<'a, 'b> { - Avro(AvroAccess<'a, 'b>), +pub enum AccessImpl<'a> { + Avro(AvroAccess<'a>), Bytes(BytesAccess<'a>), Protobuf(ProtobufAccess), Json(JsonAccess<'a>), MongoJson(MongoJsonAccess>), } -impl Access for AccessImpl<'_, '_> { - fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult { +impl Access for AccessImpl<'_> { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { match self { Self::Avro(accessor) => accessor.access(path, type_expected), Self::Bytes(accessor) => accessor.access(path, type_expected), @@ -54,20 +54,6 @@ impl Access for AccessImpl<'_, '_> { Self::MongoJson(accessor) => accessor.access(path, type_expected), } } - - fn access_cow<'a>( - &'a self, - path: &[&str], - type_expected: &DataType, - ) -> AccessResult> { - match self { - Self::Avro(accessor) => accessor.access_cow(path, type_expected), - Self::Bytes(accessor) => accessor.access_cow(path, type_expected), - Self::Protobuf(accessor) => accessor.access_cow(path, type_expected), - Self::Json(accessor) => accessor.access_cow(path, type_expected), - Self::MongoJson(accessor) => accessor.access_cow(path, type_expected), - } - } } #[derive(Debug, Clone, Copy)] @@ -82,8 +68,7 @@ pub trait ChangeEvent { /// Access the operation type. fn op(&self) -> AccessResult; /// Access the field. - // TODO: return `DatumCow` - fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult; + fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult>; } impl ChangeEvent for (ChangeEventOperation, A) @@ -94,7 +79,7 @@ where Ok(self.0) } - fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult { + fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult> { self.1.access(&[desc.name.as_str()], &desc.data_type) } } diff --git a/src/connector/src/parser/unified/protobuf.rs b/src/connector/src/parser/unified/protobuf.rs index af2565a964082..4bea2cbab306b 100644 --- a/src/connector/src/parser/unified/protobuf.rs +++ b/src/connector/src/parser/unified/protobuf.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, LazyLock}; use prost_reflect::{DescriptorPool, DynamicMessage, ReflectMessage}; use risingwave_common::log::LogSuppresser; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, DatumCow}; use thiserror_ext::AsReport; use super::{Access, AccessResult}; @@ -38,7 +38,11 @@ impl ProtobufAccess { } impl Access for ProtobufAccess { - fn access(&self, path: &[&str], _type_expected: &DataType) -> AccessResult { + fn access<'a>( + &'a self, + path: &[&str], + _type_expected: &DataType, + ) -> AccessResult> { debug_assert_eq!(1, path.len()); let field_desc = self .message @@ -54,6 +58,7 @@ impl Access for ProtobufAccess { })?; let value = self.message.get_field(&field_desc); - from_protobuf_value(&field_desc, &value, &self.descriptor_pool) + // TODO: may borrow the value directly + from_protobuf_value(&field_desc, &value, &self.descriptor_pool).map(Into::into) } } diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index 604b6a840a7ff..df5e3b66e3136 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -96,7 +96,7 @@ impl UpsertParser { payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, ) -> ConnectorResult<()> { - let mut row_op: KvEvent, AccessImpl<'_, '_>> = KvEvent::default(); + let mut row_op: KvEvent, AccessImpl<'_>> = KvEvent::default(); if let Some(data) = key { row_op.with_key(self.key_builder.generate_accessor(data).await?); }