diff --git a/src/connector/benches/json_vs_plain_parser.rs b/src/connector/benches/json_vs_plain_parser.rs index 603911156eb04..b9a1139dcb03b 100644 --- a/src/connector/benches/json_vs_plain_parser.rs +++ b/src/connector/benches/json_vs_plain_parser.rs @@ -83,8 +83,7 @@ mod old_json_parser { let mut errors = Vec::new(); for value in values { let accessor = JsonAccess::new(value); - match writer - .do_insert(|column| accessor.access_cow(&[&column.name], &column.data_type)) + match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type)) { Ok(_) => {} Err(err) => errors.push(err), diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index 187ddaf4db7be..8822847c200a1 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -282,7 +282,7 @@ impl<'a, 'b> Access for AvroAccess<'a, 'b> where 'a: 'b, { - fn access_cow<'aa>( + fn access<'aa>( &'aa self, path: &[&str], type_expected: &DataType, diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index 27befdda89f98..11c641474a143 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -44,40 +44,40 @@ pub enum AccessError { pub type AccessResult = std::result::Result; /// Access to a field in the data structure. -/// -/// Only one of these two methods should be implemented. See documentation for more details. pub trait Access { - // /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data), - // /// and then converts it to RisingWave `Datum`. - // /// `type_expected` might or might not be used during the conversion depending on the implementation. - // /// - // /// We usually expect the data is a record (struct), and `path` represents field path. - // /// The data (or part of the data) represents the whole row (`Vec`), - // /// and we use different `path` to access one column at a time. - // /// - // /// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`. - // #[deprecated] - // fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult { - // self.access_cow(path, type_expected) - // .map(ToOwnedDatum::to_owned_datum) - // } - + /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data), + /// and then converts it to RisingWave `Datum`. + /// `type_expected` might or might not be used during the conversion depending on the implementation. + /// + /// # Path + /// + /// We usually expect the data is a record (struct), and `path` represents field path. + /// The data (or part of the data) represents the whole row (`Vec`), + /// and we use different `path` to access one column at a time. + /// + /// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`. /// Similar to `access`, but may return a borrowed [`DatumCow::Borrowed`] to avoid unnecessary allocation. /// If not overridden, it will call forward to `access` and always wrap the result in [`DatumCow::Owned`]. /// /// This should be preferred over `access` for both callers and implementors. - // TODO: implement this method in all parsers and remove `access` method. - fn access_cow<'a>( - &'a self, - path: &[&str], - type_expected: &DataType, - ) -> AccessResult>; + /// + /// # Returns + /// + /// The implementation should prefer to return a borrowed [`DatumRef`](risingwave_common::types::DatumRef) + /// through [`DatumCow::Borrowed`] to avoid unnecessary allocation if possible, especially for fields + /// with string or bytes data. If that's not the case, it may return an owned [`Datum`] through + /// [`DatumCow::Owned`]. + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult>; } +// Note: made an extension trait to disallow implementing or overriding `access_owned`. #[easy_ext::ext(AccessExt)] impl A { - pub fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult { - self.access_cow(path, type_expected) + /// Similar to `access`, but always returns an owned [`Datum`]. See [`Access::access`] for more details. + /// + /// Always prefer calling `access` directly if possible to avoid unnecessary allocation. + pub fn access_owned(&self, path: &[&str], type_expected: &DataType) -> AccessResult { + self.access(path, type_expected) .map(ToOwnedDatum::to_owned_datum) } } diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 48309a44bb1d3..447eec7c78021 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -896,7 +896,7 @@ mod test { fn pb_eq(a: &ProtobufAccess, field_name: &str, value: ScalarImpl) { let dummy_type = DataType::Varchar; - let d = a.access(&[field_name], &dummy_type).unwrap().unwrap(); + let d = a.access_owned(&[field_name], &dummy_type).unwrap().unwrap(); assert_eq!(d, value, "field: {} value: {:?}", field_name, d); } diff --git a/src/connector/src/parser/unified/bytes.rs b/src/connector/src/parser/unified/bytes.rs index c2c657f9d18e1..2f8f27124a307 100644 --- a/src/connector/src/parser/unified/bytes.rs +++ b/src/connector/src/parser/unified/bytes.rs @@ -31,11 +31,7 @@ impl<'a> BytesAccess<'a> { impl Access for BytesAccess<'_> { /// path is empty currently, `type_expected` should be `Bytea` - fn access_cow<'a>( - &'a self, - path: &[&str], - type_expected: &DataType, - ) -> AccessResult> { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { if let DataType::Bytea = type_expected { if self.column_name.is_none() || (path.len() == 1 && self.column_name.as_ref().unwrap() == path[0]) diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index d2d7f508b5d59..d90463698577d 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -89,10 +89,10 @@ pub fn parse_transaction_meta( ) -> AccessResult { if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = ( accessor - .access_cow(&[TRANSACTION_STATUS], &DataType::Varchar)? + .access(&[TRANSACTION_STATUS], &DataType::Varchar)? .to_datum_ref(), accessor - .access_cow(&[TRANSACTION_ID], &DataType::Varchar)? + .access(&[TRANSACTION_ID], &DataType::Varchar)? .to_datum_ref(), ) { // The id field has different meanings for different databases: @@ -181,16 +181,16 @@ where .key_accessor .as_ref() .expect("key_accessor must be provided for delete operation") - .access_cow(&[&desc.name], &desc.data_type); + .access(&[&desc.name], &desc.data_type); } if let Some(va) = self.value_accessor.as_ref() { - va.access_cow(&[BEFORE, &desc.name], &desc.data_type) + va.access(&[BEFORE, &desc.name], &desc.data_type) } else { self.key_accessor .as_ref() .unwrap() - .access_cow(&[&desc.name], &desc.data_type) + .access(&[&desc.name], &desc.data_type) } } @@ -202,7 +202,7 @@ where self.value_accessor .as_ref() .expect("value_accessor must be provided for upsert operation") - .access_cow(&[AFTER, &desc.name], &desc.data_type) + .access(&[AFTER, &desc.name], &desc.data_type) }, |additional_column_type| { match *additional_column_type { @@ -212,7 +212,7 @@ where .value_accessor .as_ref() .expect("value_accessor must be provided for upsert operation") - .access(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?; + .access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?; Ok(DatumCow::Owned(ts_ms.map(|scalar| { Timestamptz::from_millis(scalar.into_int64()) .expect("source.ts_ms must in millisecond") @@ -223,22 +223,22 @@ where .value_accessor .as_ref() .expect("value_accessor must be provided for upsert operation") - .access_cow(&[SOURCE, SOURCE_DB], &desc.data_type), + .access(&[SOURCE, SOURCE_DB], &desc.data_type), ColumnType::SchemaName(_) => self .value_accessor .as_ref() .expect("value_accessor must be provided for upsert operation") - .access_cow(&[SOURCE, SOURCE_SCHEMA], &desc.data_type), + .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type), ColumnType::TableName(_) => self .value_accessor .as_ref() .expect("value_accessor must be provided for upsert operation") - .access_cow(&[SOURCE, SOURCE_TABLE], &desc.data_type), + .access(&[SOURCE, SOURCE_TABLE], &desc.data_type), ColumnType::CollectionName(_) => self .value_accessor .as_ref() .expect("value_accessor must be provided for upsert operation") - .access_cow(&[SOURCE, SOURCE_COLLECTION], &desc.data_type), + .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type), _ => Err(AccessError::UnsupportedAdditionalColumn { name: desc.name.clone(), }), @@ -251,9 +251,8 @@ where fn op(&self) -> Result { if let Some(accessor) = &self.value_accessor { - if let Some(ScalarRefImpl::Utf8(op)) = accessor - .access_cow(&[OP], &DataType::Varchar)? - .to_datum_ref() + if let Some(ScalarRefImpl::Utf8(op)) = + accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref() { match op { DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => { @@ -341,14 +340,10 @@ impl Access for MongoJsonAccess where A: Access, { - fn access_cow<'a>( - &'a self, - path: &[&str], - type_expected: &DataType, - ) -> AccessResult> { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { match path { ["after" | "before", "_id"] => { - let payload = self.access(&[path[0]], &DataType::Jsonb)?; + let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?; if let Some(ScalarImpl::Jsonb(bson_doc)) = payload { Ok(extract_bson_id(type_expected, &bson_doc.take())?.into()) } else { @@ -359,14 +354,14 @@ where })? } } - ["after" | "before", "payload"] => self.access_cow(&[path[0]], &DataType::Jsonb), + ["after" | "before", "payload"] => self.access(&[path[0]], &DataType::Jsonb), // To handle a DELETE message, we need to extract the "_id" field from the message key, because it is not in the payload. // In addition, the "_id" field is named as "id" in the key. An example of message key: // {"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}} ["_id"] => { - let ret = self.accessor.access_cow(path, type_expected); + let ret = self.accessor.access(path, type_expected); if matches!(ret, Err(AccessError::Undefined { .. })) { - let id_bson = self.accessor.access(&["id"], &DataType::Jsonb)?; + let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?; if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson { Ok(extract_bson_id(type_expected, &bson_doc.take())?.into()) } else { @@ -380,7 +375,7 @@ where ret } } - _ => self.accessor.access_cow(path, type_expected), + _ => self.accessor.access(path, type_expected), } } } diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index a467f1a0f98ce..e72234cff04f9 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -630,11 +630,7 @@ impl<'a> JsonAccess<'a> { } impl Access for JsonAccess<'_> { - fn access_cow<'a>( - &'a self, - path: &[&str], - type_expected: &DataType, - ) -> AccessResult> { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { let mut value = &self.value; for (idx, &key) in path.iter().enumerate() { if let Some(sub_value) = if self.options.ignoring_keycase { diff --git a/src/connector/src/parser/unified/kv_event.rs b/src/connector/src/parser/unified/kv_event.rs index afc059772e210..7e52d2f4c3c24 100644 --- a/src/connector/src/parser/unified/kv_event.rs +++ b/src/connector/src/parser/unified/kv_event.rs @@ -56,7 +56,7 @@ where { fn access_key(&self, path: &[&str], type_expected: &DataType) -> AccessResult> { if let Some(ka) = &self.key_accessor { - ka.access_cow(path, type_expected) + ka.access(path, type_expected) } else { Err(AccessError::Undefined { name: "key".to_string(), @@ -67,7 +67,7 @@ where fn access_value(&self, path: &[&str], type_expected: &DataType) -> AccessResult> { if let Some(va) = &self.value_accessor { - va.access_cow(path, type_expected) + va.access(path, type_expected) } else { Err(AccessError::Undefined { name: "value".to_string(), diff --git a/src/connector/src/parser/unified/maxwell.rs b/src/connector/src/parser/unified/maxwell.rs index 7cb952e2b9740..0bc6e7ab1d94a 100644 --- a/src/connector/src/parser/unified/maxwell.rs +++ b/src/connector/src/parser/unified/maxwell.rs @@ -37,7 +37,7 @@ where fn op(&self) -> std::result::Result { const OP: &str = "type"; if let Some(ScalarRefImpl::Utf8(op)) = - self.0.access_cow(&[OP], &DataType::Varchar)?.to_datum_ref() + self.0.access(&[OP], &DataType::Varchar)?.to_datum_ref() { match op { MAXWELL_INSERT_OP | MAXWELL_UPDATE_OP => return Ok(ChangeEventOperation::Upsert), @@ -53,6 +53,6 @@ where fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult> { const DATA: &str = "data"; - self.0.access_cow(&[DATA, &desc.name], &desc.data_type) + self.0.access(&[DATA, &desc.name], &desc.data_type) } } diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index ca5ce3e4bbc84..876658f24c29d 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -45,17 +45,13 @@ pub enum AccessImpl<'a, 'b> { } impl Access for AccessImpl<'_, '_> { - fn access_cow<'a>( - &'a self, - path: &[&str], - type_expected: &DataType, - ) -> AccessResult> { + fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { match self { - Self::Avro(accessor) => accessor.access_cow(path, type_expected), - Self::Bytes(accessor) => accessor.access_cow(path, type_expected), - Self::Protobuf(accessor) => accessor.access_cow(path, type_expected), - Self::Json(accessor) => accessor.access_cow(path, type_expected), - Self::MongoJson(accessor) => accessor.access_cow(path, type_expected), + Self::Avro(accessor) => accessor.access(path, type_expected), + Self::Bytes(accessor) => accessor.access(path, type_expected), + Self::Protobuf(accessor) => accessor.access(path, type_expected), + Self::Json(accessor) => accessor.access(path, type_expected), + Self::MongoJson(accessor) => accessor.access(path, type_expected), } } } @@ -85,6 +81,6 @@ where } fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult> { - self.1.access_cow(&[desc.name.as_str()], &desc.data_type) + self.1.access(&[desc.name.as_str()], &desc.data_type) } } diff --git a/src/connector/src/parser/unified/protobuf.rs b/src/connector/src/parser/unified/protobuf.rs index 83237d3dcf381..4bea2cbab306b 100644 --- a/src/connector/src/parser/unified/protobuf.rs +++ b/src/connector/src/parser/unified/protobuf.rs @@ -38,7 +38,7 @@ impl ProtobufAccess { } impl Access for ProtobufAccess { - fn access_cow<'a>( + fn access<'a>( &'a self, path: &[&str], _type_expected: &DataType,