Skip to content

Commit

Permalink
cleanup avro lifetime
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jun 10, 2024
1 parent 06010a1 commit d21f36b
Show file tree
Hide file tree
Showing 11 changed files with 20 additions and 32 deletions.
20 changes: 6 additions & 14 deletions src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,27 +266,19 @@ impl<'a> AvroParseOptions<'a> {
}
}

// TODO: No need to use two lifetimes here.
pub struct AvroAccess<'a, 'b> {
pub struct AvroAccess<'a> {
value: &'a Value,
options: AvroParseOptions<'b>,
options: AvroParseOptions<'a>,
}

impl<'a, 'b> AvroAccess<'a, 'b> {
pub fn new(value: &'a Value, options: AvroParseOptions<'b>) -> Self {
impl<'a> AvroAccess<'a> {
pub fn new(value: &'a Value, options: AvroParseOptions<'a>) -> Self {
Self { value, options }
}
}

impl<'a, 'b> Access for AvroAccess<'a, 'b>
where
'a: 'b,
{
fn access<'aa>(
&'aa self,
path: &[&str],
type_expected: &DataType,
) -> AccessResult<DatumCow<'aa>> {
impl Access for AvroAccess<'_> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
let mut value = self.value;
let mut options: AvroParseOptions<'_> = self.options.clone();

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct AvroAccessBuilder {
}

impl AccessBuilder for AvroAccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = self.parse_avro_value(&payload).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct BytesAccessBuilder {

impl AccessBuilder for BytesAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
Ok(AccessImpl::Bytes(BytesAccess::new(
&self.column_name,
payload,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct DebeziumAvroAccessBuilder {

// TODO: reduce encodingtype match
impl AccessBuilder for DebeziumAvroAccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let (schema_id, mut raw_payload) = extract_schema_id(&payload)?;
let schema = self.schema_resolver.get_by_id(schema_id).await?;
self.value = Some(from_avro_datum(schema.as_ref(), &mut raw_payload, None)?);
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl DebeziumJsonAccessBuilder {

impl AccessBuilder for DebeziumJsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = Some(payload);
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(self.value.as_mut().unwrap())
Expand Down Expand Up @@ -79,7 +79,7 @@ impl DebeziumMongoJsonAccessBuilder {

impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
self.value = Some(payload);
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(self.value.as_mut().unwrap())
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct JsonAccessBuilder {

impl AccessBuilder for JsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
// XXX: When will we enter this branch?
if payload.is_empty() {
self.value = Some("{}".into());
Expand Down
7 changes: 2 additions & 5 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}

pub trait AccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>>;
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -887,10 +887,7 @@ impl AccessBuilderImpl {
Ok(accessor)
}

pub async fn generate_accessor(
&mut self,
payload: Vec<u8>,
) -> ConnectorResult<AccessImpl<'_, '_>> {
pub async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let accessor = match self {
Self::Avro(builder) => builder.generate_accessor(payload).await?,
Self::Protobuf(builder) => builder.generate_accessor(payload).await?,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl PlainParser {
};
}

let mut row_op: KvEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> = KvEvent::default();
let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();

if let Some(data) = key
&& let Some(key_builder) = self.key_builder.as_mut()
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct ProtobufAccessBuilder {

impl AccessBuilder for ProtobufAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let payload = if self.confluent_wire_type {
resolve_pb_header(&payload)?
} else {
Expand Down
7 changes: 3 additions & 4 deletions src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ pub mod maxwell;
pub mod protobuf;
pub mod util;

pub enum AccessImpl<'a, 'b> {
Avro(AvroAccess<'a, 'b>),
pub enum AccessImpl<'a> {
Avro(AvroAccess<'a>),
Bytes(BytesAccess<'a>),
Protobuf(ProtobufAccess),
Json(JsonAccess<'a>),
MongoJson(MongoJsonAccess<JsonAccess<'a>>),
}

impl Access for AccessImpl<'_, '_> {
impl Access for AccessImpl<'_> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
match self {
Self::Avro(accessor) => accessor.access(path, type_expected),
Expand All @@ -68,7 +68,6 @@ pub trait ChangeEvent {
/// Access the operation type.
fn op(&self) -> AccessResult<ChangeEventOperation>;
/// Access the field.
// TODO: return `DatumCow`
fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>>;
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/upsert_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl UpsertParser {
payload: Option<Vec<u8>>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> ConnectorResult<()> {
let mut row_op: KvEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> = KvEvent::default();
let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
if let Some(data) = key {
row_op.with_key(self.key_builder.generate_accessor(data).await?);
}
Expand Down

0 comments on commit d21f36b

Please sign in to comment.