From e8db4eb2ef7b744ebcfc1d6633a2709c8ad1d8e1 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 12 Oct 2023 14:55:48 +0800 Subject: [PATCH] owned without lifetime as per ##12515 --- src/connector/src/sink/encoder/avro.rs | 57 +++++++++++++------------ src/connector/src/sink/encoder/proto.rs | 22 +++++----- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index 54d042a154f93..fc2db75eb4c38 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use apache_avro::schema::Schema as AvroSchema; use apache_avro::types::{Record, Value}; use apache_avro::Writer; @@ -24,32 +26,32 @@ use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo}; type Result = std::result::Result; -pub struct AvroEncoder<'a> { - schema: &'a Schema, - col_indices: Option<&'a [usize]>, - avro_schema: &'a AvroSchema, +pub struct AvroEncoder { + schema: Schema, + col_indices: Option>, + avro_schema: Arc, } -impl<'a> AvroEncoder<'a> { +impl AvroEncoder { pub fn new( - schema: &'a Schema, - col_indices: Option<&'a [usize]>, - avro_schema: &'a AvroSchema, + schema: Schema, + col_indices: Option>, + avro_schema: Arc, ) -> SinkResult { - match col_indices { + match &col_indices { Some(col_indices) => validate_fields( col_indices.iter().map(|idx| { let f = &schema[*idx]; (f.name.as_str(), &f.data_type) }), - avro_schema, + &avro_schema, )?, None => validate_fields( schema .fields .iter() .map(|f| (f.name.as_str(), &f.data_type)), - avro_schema, + &avro_schema, )?, }; @@ -61,15 +63,15 @@ impl<'a> AvroEncoder<'a> { } } -impl<'a> RowEncoder for AvroEncoder<'a> { - type Output = (Record<'a>, &'a AvroSchema); +impl RowEncoder for AvroEncoder { + type Output = (Value, Arc); fn schema(&self) -> &Schema { - self.schema + &self.schema } fn col_indices(&self) -> Option<&[usize]> { - self.col_indices + self.col_indices.as_deref() } fn encode_cols( @@ -82,15 +84,15 @@ impl<'a> RowEncoder for AvroEncoder<'a> { let f = &self.schema[idx]; ((f.name.as_str(), &f.data_type), row.datum_at(idx)) }), - self.avro_schema, + &self.avro_schema, )?; - Ok((record, self.avro_schema)) + Ok((record.into(), self.avro_schema.clone())) } } -impl<'a> SerTo> for (Record<'a>, &'a AvroSchema) { +impl SerTo> for (Value, Arc) { fn ser_to(self) -> SinkResult> { - let mut w = Writer::new(self.1, Vec::new()); + let mut w = Writer::new(&self.1, Vec::new()); w.append(self.0) .and_then(|_| w.into_inner()) .map_err(|e| crate::sink::SinkError::Encode(e.to_string())) @@ -614,7 +616,7 @@ mod tests { .unwrap(); let mut record = Record::new(&avro_schema).unwrap(); record.put("f0", Value::String("2".into())); - let res: SinkResult> = (record, &avro_schema).ser_to(); + let res: SinkResult> = (Value::from(record), Arc::new(avro_schema)).ser_to(); assert_eq!(res.unwrap_err().to_string(), "Encode error: Value does not match schema: Reason: Unsupported value-schema combination"); } @@ -631,6 +633,7 @@ mod tests { }"#, ) .unwrap(); + let avro_schema = Arc::new(avro_schema); let schema = Schema::new(vec![ Field::with_name(DataType::Int64, "opt"), @@ -640,10 +643,10 @@ mod tests { Some(ScalarImpl::Int64(31)), Some(ScalarImpl::Int32(15)), ]); - let encoder = AvroEncoder::new(&schema, None, &avro_schema).unwrap(); + let encoder = AvroEncoder::new(schema, None, avro_schema.clone()).unwrap(); let actual = encoder.encode(row).unwrap(); assert_eq!( - Value::from(actual.0), + actual.0, Value::Record(vec![ ("req".into(), Value::Int(15)), ("opt".into(), Value::Union(1, Value::Long(31).into())), @@ -652,10 +655,10 @@ mod tests { let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]); let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]); - let encoder = AvroEncoder::new(&schema, None, &avro_schema).unwrap(); + let encoder = AvroEncoder::new(schema, None, avro_schema.clone()).unwrap(); let actual = encoder.encode(row).unwrap(); assert_eq!( - Value::from(actual.0), + actual.0, Value::Record(vec![ ("req".into(), Value::Int(15)), ("opt".into(), Value::Union(0, Value::Null.into())), @@ -663,7 +666,7 @@ mod tests { ); let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]); - let Err(err) = AvroEncoder::new(&schema, None, &avro_schema) else { + let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone()) else { panic!() }; assert_eq!( @@ -676,7 +679,7 @@ mod tests { Field::with_name(DataType::Int32, "req"), Field::with_name(DataType::Varchar, "extra"), ]); - let Err(err) = AvroEncoder::new(&schema, None, &avro_schema) else { + let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone()) else { panic!() }; assert_eq!( @@ -686,7 +689,7 @@ mod tests { let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap(); let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]); - let Err(err) = AvroEncoder::new(&schema, None, &avro_schema) else { + let Err(err) = AvroEncoder::new(schema, None, avro_schema.into()) else { panic!() }; assert_eq!( diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index 106621fbd240b..489023f80e145 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -26,19 +26,19 @@ use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo}; type Result = std::result::Result; -pub struct ProtoEncoder<'a> { - schema: &'a Schema, - col_indices: Option<&'a [usize]>, +pub struct ProtoEncoder { + schema: Schema, + col_indices: Option>, descriptor: MessageDescriptor, } -impl<'a> ProtoEncoder<'a> { +impl ProtoEncoder { pub fn new( - schema: &'a Schema, - col_indices: Option<&'a [usize]>, + schema: Schema, + col_indices: Option>, descriptor: MessageDescriptor, ) -> SinkResult { - match col_indices { + match &col_indices { Some(col_indices) => validate_fields( col_indices.iter().map(|idx| { let f = &schema[*idx]; @@ -63,15 +63,15 @@ impl<'a> ProtoEncoder<'a> { } } -impl<'a> RowEncoder for ProtoEncoder<'a> { +impl RowEncoder for ProtoEncoder { type Output = DynamicMessage; fn schema(&self) -> &Schema { - self.schema + &self.schema } fn col_indices(&self) -> Option<&[usize]> { - self.col_indices + self.col_indices.as_deref() } fn encode_cols( @@ -371,7 +371,7 @@ mod tests { Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(3))), ]); - let encoder = ProtoEncoder::new(&schema, None, descriptor.clone()).unwrap(); + let encoder = ProtoEncoder::new(schema, None, descriptor.clone()).unwrap(); let m = encoder.encode(row).unwrap(); let encoded: Vec = m.ser_to().unwrap(); assert_eq!(