Skip to content

Commit

Permalink
rename and refine docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jun 10, 2024
1 parent 1e28008 commit 06010a1
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 79 deletions.
3 changes: 1 addition & 2 deletions src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ mod old_json_parser {
let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match writer
.do_insert(|column| accessor.access_cow(&[&column.name], &column.data_type))
match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type))
{
Ok(_) => {}
Err(err) => errors.push(err),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl<'a, 'b> Access for AvroAccess<'a, 'b>
where
'a: 'b,
{
fn access_cow<'aa>(
fn access<'aa>(
&'aa self,
path: &[&str],
type_expected: &DataType,
Expand Down
50 changes: 25 additions & 25 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,40 +44,40 @@ pub enum AccessError {
pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;

/// Access to a field in the data structure.
///
/// Only one of these two methods should be implemented. See documentation for more details.
pub trait Access {
// /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
// /// and then converts it to RisingWave `Datum`.
// /// `type_expected` might or might not be used during the conversion depending on the implementation.
// ///
// /// We usually expect the data is a record (struct), and `path` represents field path.
// /// The data (or part of the data) represents the whole row (`Vec<Datum>`),
// /// and we use different `path` to access one column at a time.
// ///
// /// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`.
// #[deprecated]
// fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
// self.access_cow(path, type_expected)
// .map(ToOwnedDatum::to_owned_datum)
// }

/// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
/// and then converts it to RisingWave `Datum`.
/// `type_expected` might or might not be used during the conversion depending on the implementation.
///
/// # Path
///
/// We usually expect the data is a record (struct), and `path` represents field path.
/// The data (or part of the data) represents the whole row (`Vec<Datum>`),
/// and we use different `path` to access one column at a time.
///
/// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`.
/// Similar to `access`, but may return a borrowed [`DatumCow::Borrowed`] to avoid unnecessary allocation.
/// If not overridden, it will call forward to `access` and always wrap the result in [`DatumCow::Owned`].
///
/// This should be preferred over `access` for both callers and implementors.
// TODO: implement this method in all parsers and remove `access` method.
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>>;
///
/// # Returns
///
/// The implementation should prefer to return a borrowed [`DatumRef`](risingwave_common::types::DatumRef)
/// through [`DatumCow::Borrowed`] to avoid unnecessary allocation if possible, especially for fields
/// with string or bytes data. If that's not the case, it may return an owned [`Datum`] through
/// [`DatumCow::Owned`].
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>>;
}

// Note: made an extension trait to disallow implementing or overriding `access_owned`.
#[easy_ext::ext(AccessExt)]
impl<A: Access> A {
pub fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access_cow(path, type_expected)
/// Similar to `access`, but always returns an owned [`Datum`]. See [`Access::access`] for more details.
///
/// Always prefer calling `access` directly if possible to avoid unnecessary allocation.
pub fn access_owned(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access(path, type_expected)
.map(ToOwnedDatum::to_owned_datum)
}
}
2 changes: 1 addition & 1 deletion src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ mod test {

fn pb_eq(a: &ProtobufAccess, field_name: &str, value: ScalarImpl) {
let dummy_type = DataType::Varchar;
let d = a.access(&[field_name], &dummy_type).unwrap().unwrap();
let d = a.access_owned(&[field_name], &dummy_type).unwrap().unwrap();
assert_eq!(d, value, "field: {} value: {:?}", field_name, d);
}

Expand Down
6 changes: 1 addition & 5 deletions src/connector/src/parser/unified/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ impl<'a> BytesAccess<'a> {

impl Access for BytesAccess<'_> {
/// path is empty currently, `type_expected` should be `Bytea`
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
if let DataType::Bytea = type_expected {
if self.column_name.is_none()
|| (path.len() == 1 && self.column_name.as_ref().unwrap() == path[0])
Expand Down
43 changes: 19 additions & 24 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ pub fn parse_transaction_meta(
) -> AccessResult<TransactionControl> {
if let (Some(ScalarRefImpl::Utf8(status)), Some(ScalarRefImpl::Utf8(id))) = (
accessor
.access_cow(&[TRANSACTION_STATUS], &DataType::Varchar)?
.access(&[TRANSACTION_STATUS], &DataType::Varchar)?
.to_datum_ref(),
accessor
.access_cow(&[TRANSACTION_ID], &DataType::Varchar)?
.access(&[TRANSACTION_ID], &DataType::Varchar)?
.to_datum_ref(),
) {
// The id field has different meanings for different databases:
Expand Down Expand Up @@ -181,16 +181,16 @@ where
.key_accessor
.as_ref()
.expect("key_accessor must be provided for delete operation")
.access_cow(&[&desc.name], &desc.data_type);
.access(&[&desc.name], &desc.data_type);
}

if let Some(va) = self.value_accessor.as_ref() {
va.access_cow(&[BEFORE, &desc.name], &desc.data_type)
va.access(&[BEFORE, &desc.name], &desc.data_type)
} else {
self.key_accessor
.as_ref()
.unwrap()
.access_cow(&[&desc.name], &desc.data_type)
.access(&[&desc.name], &desc.data_type)
}
}

Expand All @@ -202,7 +202,7 @@ where
self.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access_cow(&[AFTER, &desc.name], &desc.data_type)
.access(&[AFTER, &desc.name], &desc.data_type)
},
|additional_column_type| {
match *additional_column_type {
Expand All @@ -212,7 +212,7 @@ where
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
.access_owned(&[SOURCE, SOURCE_TS_MS], &DataType::Int64)?;
Ok(DatumCow::Owned(ts_ms.map(|scalar| {
Timestamptz::from_millis(scalar.into_int64())
.expect("source.ts_ms must in millisecond")
Expand All @@ -223,22 +223,22 @@ where
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access_cow(&[SOURCE, SOURCE_DB], &desc.data_type),
.access(&[SOURCE, SOURCE_DB], &desc.data_type),
ColumnType::SchemaName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access_cow(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
.access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
ColumnType::TableName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access_cow(&[SOURCE, SOURCE_TABLE], &desc.data_type),
.access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
ColumnType::CollectionName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access_cow(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
.access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
_ => Err(AccessError::UnsupportedAdditionalColumn {
name: desc.name.clone(),
}),
Expand All @@ -251,9 +251,8 @@ where

fn op(&self) -> Result<ChangeEventOperation, AccessError> {
if let Some(accessor) = &self.value_accessor {
if let Some(ScalarRefImpl::Utf8(op)) = accessor
.access_cow(&[OP], &DataType::Varchar)?
.to_datum_ref()
if let Some(ScalarRefImpl::Utf8(op)) =
accessor.access(&[OP], &DataType::Varchar)?.to_datum_ref()
{
match op {
DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => {
Expand Down Expand Up @@ -341,14 +340,10 @@ impl<A> Access for MongoJsonAccess<A>
where
A: Access,
{
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
match path {
["after" | "before", "_id"] => {
let payload = self.access(&[path[0]], &DataType::Jsonb)?;
let payload = self.access_owned(&[path[0]], &DataType::Jsonb)?;
if let Some(ScalarImpl::Jsonb(bson_doc)) = payload {
Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
} else {
Expand All @@ -359,14 +354,14 @@ where
})?
}
}
["after" | "before", "payload"] => self.access_cow(&[path[0]], &DataType::Jsonb),
["after" | "before", "payload"] => self.access(&[path[0]], &DataType::Jsonb),
// To handle a DELETE message, we need to extract the "_id" field from the message key, because it is not in the payload.
// In addition, the "_id" field is named as "id" in the key. An example of message key:
// {"schema":null,"payload":{"id":"{\"$oid\": \"65bc9fb6c485f419a7a877fe\"}"}}
["_id"] => {
let ret = self.accessor.access_cow(path, type_expected);
let ret = self.accessor.access(path, type_expected);
if matches!(ret, Err(AccessError::Undefined { .. })) {
let id_bson = self.accessor.access(&["id"], &DataType::Jsonb)?;
let id_bson = self.accessor.access_owned(&["id"], &DataType::Jsonb)?;
if let Some(ScalarImpl::Jsonb(bson_doc)) = id_bson {
Ok(extract_bson_id(type_expected, &bson_doc.take())?.into())
} else {
Expand All @@ -380,7 +375,7 @@ where
ret
}
}
_ => self.accessor.access_cow(path, type_expected),
_ => self.accessor.access(path, type_expected),
}
}
}
6 changes: 1 addition & 5 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,11 +630,7 @@ impl<'a> JsonAccess<'a> {
}

impl Access for JsonAccess<'_> {
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
let mut value = &self.value;
for (idx, &key) in path.iter().enumerate() {
if let Some(sub_value) = if self.options.ignoring_keycase {
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/unified/kv_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
{
fn access_key(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'_>> {
if let Some(ka) = &self.key_accessor {
ka.access_cow(path, type_expected)
ka.access(path, type_expected)
} else {
Err(AccessError::Undefined {
name: "key".to_string(),
Expand All @@ -67,7 +67,7 @@ where

fn access_value(&self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'_>> {
if let Some(va) = &self.value_accessor {
va.access_cow(path, type_expected)
va.access(path, type_expected)
} else {
Err(AccessError::Undefined {
name: "value".to_string(),
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/unified/maxwell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
fn op(&self) -> std::result::Result<super::ChangeEventOperation, super::AccessError> {
const OP: &str = "type";
if let Some(ScalarRefImpl::Utf8(op)) =
self.0.access_cow(&[OP], &DataType::Varchar)?.to_datum_ref()
self.0.access(&[OP], &DataType::Varchar)?.to_datum_ref()
{
match op {
MAXWELL_INSERT_OP | MAXWELL_UPDATE_OP => return Ok(ChangeEventOperation::Upsert),
Expand All @@ -53,6 +53,6 @@ where

fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
const DATA: &str = "data";
self.0.access_cow(&[DATA, &desc.name], &desc.data_type)
self.0.access(&[DATA, &desc.name], &desc.data_type)
}
}
18 changes: 7 additions & 11 deletions src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,13 @@ pub enum AccessImpl<'a, 'b> {
}

impl Access for AccessImpl<'_, '_> {
fn access_cow<'a>(
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
match self {
Self::Avro(accessor) => accessor.access_cow(path, type_expected),
Self::Bytes(accessor) => accessor.access_cow(path, type_expected),
Self::Protobuf(accessor) => accessor.access_cow(path, type_expected),
Self::Json(accessor) => accessor.access_cow(path, type_expected),
Self::MongoJson(accessor) => accessor.access_cow(path, type_expected),
Self::Avro(accessor) => accessor.access(path, type_expected),
Self::Bytes(accessor) => accessor.access(path, type_expected),
Self::Protobuf(accessor) => accessor.access(path, type_expected),
Self::Json(accessor) => accessor.access(path, type_expected),
Self::MongoJson(accessor) => accessor.access(path, type_expected),
}
}
}
Expand Down Expand Up @@ -85,6 +81,6 @@ where
}

fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>> {
self.1.access_cow(&[desc.name.as_str()], &desc.data_type)
self.1.access(&[desc.name.as_str()], &desc.data_type)
}
}
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ProtobufAccess {
}

impl Access for ProtobufAccess {
fn access_cow<'a>(
fn access<'a>(
&'a self,
path: &[&str],
_type_expected: &DataType,
Expand Down

0 comments on commit 06010a1

Please sign in to comment.