Skip to content

Commit

Permalink
refactor(source): more refactor on avro
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 4, 2024
1 parent 6593cfe commit d291c08
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl AvroParserConfig {
}
}

/// Performs type mapping from avro schema to RisingWave schema
pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling)
}
Expand Down
39 changes: 32 additions & 7 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::error::ConnectorResult;
use crate::parser::avro::schema_resolver::ConfluentSchemaCache;
use crate::parser::avro::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use crate::parser::unified::avro::{
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
avro_extract_field_schema, avro_schema_skip_nullable_union, AvroAccess, AvroParseOptions,
};
use crate::parser::unified::AccessImpl;
use crate::parser::{AccessBuilder, EncodingProperties, EncodingType};
Expand Down Expand Up @@ -128,12 +128,37 @@ impl DebeziumAvroParserConfig {
)
}

/// Performs type mapping from avro schema to RisingWave schema
pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
// Refer to debezium_avro_msg_schema.avsc for how the schema looks like:

// "fields": [
// {
// "name": "before",
// "type": [
// "null",
// {
// "type": "record",
// "name": "Value",
// "fields": [...],
// }
// ],
// "default": null
// },
// {
// "name": "after",
// "type": [
// "null",
// "Value"
// ],
// "default": null
// },
// ...]
avro_schema_to_column_descs(
avro_schema_skip_union(avro_extract_field_schema(
// FIXME: use resolved schema here.
// Currently it works because "after" refers to a subtree in "before",
// but in theory, inside "before" there could also be a reference.
// FIXME: use resolved schema here.
// Currently it works because "after" refers to a subtree in "before",
// but in theory, inside "before" there could also be a reference.
avro_schema_skip_nullable_union(avro_extract_field_schema(
&self.outer_schema,
Some("before"),
)?)?,
Expand Down Expand Up @@ -230,7 +255,7 @@ mod tests {

let outer_schema = get_outer_schema();
let expected_inner_schema = Schema::parse_str(inner_shema_str).unwrap();
let extracted_inner_schema = avro_schema_skip_union(
let extracted_inner_schema = avro_schema_skip_nullable_union(
avro_extract_field_schema(&outer_schema, Some("before")).unwrap(),
)
.unwrap();
Expand Down Expand Up @@ -321,7 +346,7 @@ mod tests {
fn test_map_to_columns() {
let outer_schema = get_outer_schema();
let columns = avro_schema_to_column_descs(
avro_schema_skip_union(
avro_schema_skip_nullable_union(
avro_extract_field_schema(&outer_schema, Some("before")).unwrap(),
)
.unwrap(),
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub async fn schema_to_columns(
serde_json::from_slice(&bytes)?
};
let context = Context::default();
// XXX: Is it reasonable to rely on avro for JSON Schema?
let avro_schema = convert_avro(&json_schema, context).to_string();
let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
// TODO: do we need to support map type here?
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ pub(crate) fn extract_decimal(bytes: Vec<u8>) -> AccessResult<(u32, u32, u32)> {
}
}

pub fn avro_schema_skip_union(schema: &Schema) -> ConnectorResult<&Schema> {
/// Convert unions like `[null, T]` to `T`.
pub fn avro_schema_skip_nullable_union(schema: &Schema) -> ConnectorResult<&Schema> {
match schema {
Schema::Union(union_schema) => {
let inner_schema = union_schema
Expand Down Expand Up @@ -401,7 +402,7 @@ pub fn avro_extract_field_schema<'a>(
Ok(&field.schema)
}
Schema::Array(schema) => Ok(schema),
Schema::Union(_) => avro_schema_skip_union(schema),
Schema::Union(_) => avro_schema_skip_nullable_union(schema),
Schema::Map(schema) => Ok(schema),
_ => bail!("avro schema does not have inner item, schema: {:?}", schema),
}
Expand Down

0 comments on commit d291c08

Please sign in to comment.