diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 2dd84d2515ea8..a9c3211d51228 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -175,6 +175,7 @@ impl AvroParserConfig { } } + /// Performs type mapping from avro schema to RisingWave schema pub fn map_to_columns(&self) -> ConnectorResult> { avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling) } diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 8d451cc36eaf8..1d1adceaab14d 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -25,7 +25,7 @@ use crate::error::ConnectorResult; use crate::parser::avro::schema_resolver::ConfluentSchemaCache; use crate::parser::avro::util::{avro_schema_to_column_descs, ResolvedAvroSchema}; use crate::parser::unified::avro::{ - avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions, + avro_extract_field_schema, avro_schema_skip_nullable_union, AvroAccess, AvroParseOptions, }; use crate::parser::unified::AccessImpl; use crate::parser::{AccessBuilder, EncodingProperties, EncodingType}; @@ -128,12 +128,37 @@ impl DebeziumAvroParserConfig { ) } + /// Performs type mapping from avro schema to RisingWave schema pub fn map_to_columns(&self) -> ConnectorResult> { + // Refer to debezium_avro_msg_schema.avsc for how the schema looks like: + + // "fields": [ + // { + // "name": "before", + // "type": [ + // "null", + // { + // "type": "record", + // "name": "Value", + // "fields": [...], + // } + // ], + // "default": null + // }, + // { + // "name": "after", + // "type": [ + // "null", + // "Value" + // ], + // "default": null + // }, + // ...] avro_schema_to_column_descs( - avro_schema_skip_union(avro_extract_field_schema( - // FIXME: use resolved schema here. - // Currently it works because "after" refers to a subtree in "before", - // but in theory, inside "before" there could also be a reference. + // FIXME: use resolved schema here. + // Currently it works because "after" refers to a subtree in "before", + // but in theory, inside "before" there could also be a reference. + avro_schema_skip_nullable_union(avro_extract_field_schema( &self.outer_schema, Some("before"), )?)?, @@ -230,7 +255,7 @@ mod tests { let outer_schema = get_outer_schema(); let expected_inner_schema = Schema::parse_str(inner_shema_str).unwrap(); - let extracted_inner_schema = avro_schema_skip_union( + let extracted_inner_schema = avro_schema_skip_nullable_union( avro_extract_field_schema(&outer_schema, Some("before")).unwrap(), ) .unwrap(); @@ -321,7 +346,7 @@ mod tests { fn test_map_to_columns() { let outer_schema = get_outer_schema(); let columns = avro_schema_to_column_descs( - avro_schema_skip_union( + avro_schema_skip_nullable_union( avro_extract_field_schema(&outer_schema, Some("before")).unwrap(), ) .unwrap(), diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 8d8925d4768cd..a6be70f33d006 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -99,6 +99,7 @@ pub async fn schema_to_columns( serde_json::from_slice(&bytes)? }; let context = Context::default(); + // XXX: Is it reasonable to rely on avro for JSON Schema? let avro_schema = convert_avro(&json_schema, context).to_string(); let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?; // TODO: do we need to support map type here? diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index 29e25f1b46301..705a2cdede54c 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -368,7 +368,8 @@ pub(crate) fn extract_decimal(bytes: Vec) -> AccessResult<(u32, u32, u32)> { } } -pub fn avro_schema_skip_union(schema: &Schema) -> ConnectorResult<&Schema> { +/// Convert unions like `[null, T]` to `T`. +pub fn avro_schema_skip_nullable_union(schema: &Schema) -> ConnectorResult<&Schema> { match schema { Schema::Union(union_schema) => { let inner_schema = union_schema @@ -401,7 +402,7 @@ pub fn avro_extract_field_schema<'a>( Ok(&field.schema) } Schema::Array(schema) => Ok(schema), - Schema::Union(_) => avro_schema_skip_union(schema), + Schema::Union(_) => avro_schema_skip_nullable_union(schema), Schema::Map(schema) => Ok(schema), _ => bail!("avro schema does not have inner item, schema: {:?}", schema), }