Skip to content

Commit

Permalink
owned without lifetime as per ##12515
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Oct 12, 2023
1 parent 268e581 commit e8db4eb
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 38 deletions.
57 changes: 30 additions & 27 deletions src/connector/src/sink/encoder/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use apache_avro::schema::Schema as AvroSchema;
use apache_avro::types::{Record, Value};
use apache_avro::Writer;
Expand All @@ -24,32 +26,32 @@ use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};

type Result<T> = std::result::Result<T, FieldEncodeError>;

pub struct AvroEncoder<'a> {
schema: &'a Schema,
col_indices: Option<&'a [usize]>,
avro_schema: &'a AvroSchema,
pub struct AvroEncoder {
schema: Schema,
col_indices: Option<Vec<usize>>,
avro_schema: Arc<AvroSchema>,
}

impl<'a> AvroEncoder<'a> {
impl AvroEncoder {
pub fn new(
schema: &'a Schema,
col_indices: Option<&'a [usize]>,
avro_schema: &'a AvroSchema,
schema: Schema,
col_indices: Option<Vec<usize>>,
avro_schema: Arc<AvroSchema>,
) -> SinkResult<Self> {
match col_indices {
match &col_indices {
Some(col_indices) => validate_fields(
col_indices.iter().map(|idx| {
let f = &schema[*idx];
(f.name.as_str(), &f.data_type)
}),
avro_schema,
&avro_schema,
)?,
None => validate_fields(
schema
.fields
.iter()
.map(|f| (f.name.as_str(), &f.data_type)),
avro_schema,
&avro_schema,
)?,
};

Expand All @@ -61,15 +63,15 @@ impl<'a> AvroEncoder<'a> {
}
}

impl<'a> RowEncoder for AvroEncoder<'a> {
type Output = (Record<'a>, &'a AvroSchema);
impl RowEncoder for AvroEncoder {
type Output = (Value, Arc<AvroSchema>);

fn schema(&self) -> &Schema {
self.schema
&self.schema
}

fn col_indices(&self) -> Option<&[usize]> {
self.col_indices
self.col_indices.as_deref()
}

fn encode_cols(
Expand All @@ -82,15 +84,15 @@ impl<'a> RowEncoder for AvroEncoder<'a> {
let f = &self.schema[idx];
((f.name.as_str(), &f.data_type), row.datum_at(idx))
}),
self.avro_schema,
&self.avro_schema,
)?;
Ok((record, self.avro_schema))
Ok((record.into(), self.avro_schema.clone()))
}
}

impl<'a> SerTo<Vec<u8>> for (Record<'a>, &'a AvroSchema) {
impl SerTo<Vec<u8>> for (Value, Arc<AvroSchema>) {
fn ser_to(self) -> SinkResult<Vec<u8>> {
let mut w = Writer::new(self.1, Vec::new());
let mut w = Writer::new(&self.1, Vec::new());
w.append(self.0)
.and_then(|_| w.into_inner())
.map_err(|e| crate::sink::SinkError::Encode(e.to_string()))
Expand Down Expand Up @@ -614,7 +616,7 @@ mod tests {
.unwrap();
let mut record = Record::new(&avro_schema).unwrap();
record.put("f0", Value::String("2".into()));
let res: SinkResult<Vec<u8>> = (record, &avro_schema).ser_to();
let res: SinkResult<Vec<u8>> = (Value::from(record), Arc::new(avro_schema)).ser_to();
assert_eq!(res.unwrap_err().to_string(), "Encode error: Value does not match schema: Reason: Unsupported value-schema combination");
}

Expand All @@ -631,6 +633,7 @@ mod tests {
}"#,
)
.unwrap();
let avro_schema = Arc::new(avro_schema);

let schema = Schema::new(vec![
Field::with_name(DataType::Int64, "opt"),
Expand All @@ -640,10 +643,10 @@ mod tests {
Some(ScalarImpl::Int64(31)),
Some(ScalarImpl::Int32(15)),
]);
let encoder = AvroEncoder::new(&schema, None, &avro_schema).unwrap();
let encoder = AvroEncoder::new(schema, None, avro_schema.clone()).unwrap();
let actual = encoder.encode(row).unwrap();
assert_eq!(
Value::from(actual.0),
actual.0,
Value::Record(vec![
("req".into(), Value::Int(15)),
("opt".into(), Value::Union(1, Value::Long(31).into())),
Expand All @@ -652,18 +655,18 @@ mod tests {

let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]);
let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]);
let encoder = AvroEncoder::new(&schema, None, &avro_schema).unwrap();
let encoder = AvroEncoder::new(schema, None, avro_schema.clone()).unwrap();
let actual = encoder.encode(row).unwrap();
assert_eq!(
Value::from(actual.0),
actual.0,
Value::Record(vec![
("req".into(), Value::Int(15)),
("opt".into(), Value::Union(0, Value::Null.into())),
])
);

let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
let Err(err) = AvroEncoder::new(&schema, None, &avro_schema) else {
let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone()) else {
panic!()
};
assert_eq!(
Expand All @@ -676,7 +679,7 @@ mod tests {
Field::with_name(DataType::Int32, "req"),
Field::with_name(DataType::Varchar, "extra"),
]);
let Err(err) = AvroEncoder::new(&schema, None, &avro_schema) else {
let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone()) else {
panic!()
};
assert_eq!(
Expand All @@ -686,7 +689,7 @@ mod tests {

let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]);
let Err(err) = AvroEncoder::new(&schema, None, &avro_schema) else {
let Err(err) = AvroEncoder::new(schema, None, avro_schema.into()) else {
panic!()
};
assert_eq!(
Expand Down
22 changes: 11 additions & 11 deletions src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo};

type Result<T> = std::result::Result<T, FieldEncodeError>;

pub struct ProtoEncoder<'a> {
schema: &'a Schema,
col_indices: Option<&'a [usize]>,
pub struct ProtoEncoder {
schema: Schema,
col_indices: Option<Vec<usize>>,
descriptor: MessageDescriptor,
}

impl<'a> ProtoEncoder<'a> {
impl ProtoEncoder {
pub fn new(
schema: &'a Schema,
col_indices: Option<&'a [usize]>,
schema: Schema,
col_indices: Option<Vec<usize>>,
descriptor: MessageDescriptor,
) -> SinkResult<Self> {
match col_indices {
match &col_indices {
Some(col_indices) => validate_fields(
col_indices.iter().map(|idx| {
let f = &schema[*idx];
Expand All @@ -63,15 +63,15 @@ impl<'a> ProtoEncoder<'a> {
}
}

impl<'a> RowEncoder for ProtoEncoder<'a> {
impl RowEncoder for ProtoEncoder {
type Output = DynamicMessage;

fn schema(&self) -> &Schema {
self.schema
&self.schema
}

fn col_indices(&self) -> Option<&[usize]> {
self.col_indices
self.col_indices.as_deref()
}

fn encode_cols(
Expand Down Expand Up @@ -371,7 +371,7 @@ mod tests {
Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(3))),
]);

let encoder = ProtoEncoder::new(&schema, None, descriptor.clone()).unwrap();
let encoder = ProtoEncoder::new(schema, None, descriptor.clone()).unwrap();
let m = encoder.encode(row).unwrap();
let encoded: Vec<u8> = m.ser_to().unwrap();
assert_eq!(
Expand Down

0 comments on commit e8db4eb

Please sign in to comment.