Skip to content

Commit

Permalink
make access owned as ext
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jun 10, 2024
1 parent 28ff9d9 commit 1e28008
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
easy-ext = "1"
itertools = { workspace = true }
jsonbb = { workspace = true }
num-bigint = "0.4"
Expand Down
38 changes: 22 additions & 16 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;
///
/// Only one of these two methods should be implemented. See documentation for more details.
pub trait Access {
/// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
/// and then converts it to RisingWave `Datum`.
/// `type_expected` might or might not be used during the conversion depending on the implementation.
///
/// We usually expect the data is a record (struct), and `path` represents field path.
/// The data (or part of the data) represents the whole row (`Vec<Datum>`),
/// and we use different `path` to access one column at a time.
///
/// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`.
#[deprecated]
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access_cow(path, type_expected)
.map(ToOwnedDatum::to_owned_datum)
}
// /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
// /// and then converts it to RisingWave `Datum`.
// /// `type_expected` might or might not be used during the conversion depending on the implementation.
// ///
// /// We usually expect the data is a record (struct), and `path` represents field path.
// /// The data (or part of the data) represents the whole row (`Vec<Datum>`),
// /// and we use different `path` to access one column at a time.
// ///
// /// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`.
// #[deprecated]
// fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
// self.access_cow(path, type_expected)
// .map(ToOwnedDatum::to_owned_datum)
// }

/// Similar to `access`, but may return a borrowed [`DatumCow::Borrowed`] to avoid unnecessary allocation.
/// If not overridden, it will call forward to `access` and always wrap the result in [`DatumCow::Owned`].
Expand All @@ -71,7 +71,13 @@ pub trait Access {
&'a self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'a>> {
self.access(path, type_expected).map(Into::into)
) -> AccessResult<DatumCow<'a>>;
}

#[easy_ext::ext(AccessExt)]
impl<A: Access> A {
pub fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
self.access_cow(path, type_expected)
.map(ToOwnedDatum::to_owned_datum)
}
}
2 changes: 1 addition & 1 deletion src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ mod test {

use prost::Message;
use risingwave_common::types::StructType;
use risingwave_connector_codec::decoder::AccessExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::data::data_type::PbTypeName;
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
Expand All @@ -591,7 +592,6 @@ mod test {
use super::*;
use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage};
use crate::parser::protobuf::recursive::AllTypes;
use crate::parser::unified::Access;
use crate::parser::SpecificParserConfig;

fn schema_dir() -> String {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use risingwave_common::types::{
DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef,
};
use risingwave_connector_codec::decoder::AccessExt;
use risingwave_pb::plan_common::additional_column::ColumnType;

use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
Expand Down
10 changes: 0 additions & 10 deletions src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,6 @@ pub enum AccessImpl<'a, 'b> {
}

impl Access for AccessImpl<'_, '_> {
fn access(&self, path: &[&str], type_expected: &DataType) -> AccessResult {
match self {
Self::Avro(accessor) => accessor.access(path, type_expected),
Self::Bytes(accessor) => accessor.access(path, type_expected),
Self::Protobuf(accessor) => accessor.access(path, type_expected),
Self::Json(accessor) => accessor.access(path, type_expected),
Self::MongoJson(accessor) => accessor.access(path, type_expected),
}
}

fn access_cow<'a>(
&'a self,
path: &[&str],
Expand Down

0 comments on commit 1e28008

Please sign in to comment.