From 7f6220add10b40f54274ec15d3aeae92325bc460 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 25 Jun 2024 15:58:02 +0800 Subject: [PATCH] refine current union code --- src/connector/codec/src/decoder/avro/mod.rs | 82 +++++++++++++--- .../codec/src/decoder/avro/schema.rs | 95 ++++++++++++++++--- .../src/parser/debezium/avro_parser.rs | 10 +- 3 files changed, 157 insertions(+), 30 deletions(-) diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index cdd9aea416c8f..d67d1b5c8c8d0 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -15,7 +15,7 @@ mod schema; use std::sync::LazyLock; -use apache_avro::schema::{DecimalSchema, RecordSchema}; +use apache_avro::schema::{DecimalSchema, RecordSchema, UnionSchema}; use apache_avro::types::{Value, ValueKind}; use apache_avro::{Decimal as AvroDecimal, Schema}; use chrono::Datelike; @@ -338,18 +338,35 @@ pub(crate) fn avro_decimal_to_rust_decimal( )) } -pub fn avro_schema_skip_union(schema: &Schema) -> anyhow::Result<&Schema> { +/// If the union schema is `[null, T]` or `[T, null]`, returns `Some(T)`; otherwise returns `None`. +fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> { + let variants = union_schema.variants(); + if variants.len() == 2 + || variants + .iter() + .filter(|s| matches!(s, &&Schema::Null)) + .count() + == 1 + { + let inner_schema = variants + .iter() + .find(|s| !matches!(s, &&Schema::Null)) + .unwrap(); + Some(inner_schema) + } else { + None + } +} + +pub fn avro_schema_skip_nullable_union(schema: &Schema) -> anyhow::Result<&Schema> { match schema { - Schema::Union(union_schema) => { - let inner_schema = union_schema - .variants() - .iter() - .find(|s| !matches!(s, &&Schema::Null)) - .ok_or_else(|| { - anyhow::format_err!("illegal avro record schema {:?}", union_schema) - })?; - Ok(inner_schema) - } + Schema::Union(union_schema) => match get_nullable_union_inner(union_schema) { + Some(s) => Ok(s), + None => Err(anyhow::format_err!( + "illegal avro union schema, expected [null, T], got {:?}", + union_schema + )), + }, other => Ok(other), } } @@ -372,7 +389,7 @@ pub fn avro_extract_field_schema<'a>( Ok(&field.schema) } Schema::Array(schema) => Ok(schema), - Schema::Union(_) => avro_schema_skip_union(schema), + Schema::Union(_) => avro_schema_skip_nullable_union(schema), Schema::Map(schema) => Ok(schema), _ => bail!("avro schema does not have inner item, schema: {:?}", schema), } @@ -477,10 +494,49 @@ mod tests { use std::str::FromStr; use apache_avro::Decimal as AvroDecimal; + use expect_test::expect; use risingwave_common::types::{Datum, Decimal}; use super::*; + /// Test the behavior of the Rust Avro lib for handling union with logical type. + #[test] + fn test_union_logical_type() { + let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap(); + expect![[r#" + Union( + UnionSchema { + schemas: [ + Null, + Uuid, + ], + variant_index: { + Null: 0, + Uuid: 1, + }, + }, + ) + "#]] + .assert_debug_eq(&s); + + let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap(); + expect![[r#" + Union( + UnionSchema { + schemas: [ + String, + Uuid, + ], + variant_index: { + String: 0, + Uuid: 1, + }, + }, + ) + "#]] + .assert_debug_eq(&s); + } + #[test] fn test_convert_decimal() { // 280 diff --git a/src/connector/codec/src/decoder/avro/schema.rs b/src/connector/codec/src/decoder/avro/schema.rs index fe96495d089ea..066fc45273353 100644 --- a/src/connector/codec/src/decoder/avro/schema.rs +++ b/src/connector/codec/src/decoder/avro/schema.rs @@ -14,6 +14,7 @@ use std::sync::{Arc, LazyLock}; +use anyhow::Context; use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema}; use apache_avro::AvroResult; use itertools::Itertools; @@ -22,6 +23,8 @@ use risingwave_common::log::LogSuppresser; use risingwave_common::types::{DataType, Decimal}; use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion}; +use super::get_nullable_union_inner; + /// Avro schema with `Ref` inlined. The newtype is used to indicate whether the schema is resolved. /// /// TODO: Actually most of the place should use resolved schema, but currently they just happen to work (Some edge cases are not met yet). @@ -198,20 +201,84 @@ fn avro_type_mapping( DataType::List(Box::new(item_type)) } Schema::Union(union_schema) => { + // Unions may not contain more than one schema with the same type, except for the named types record, fixed and enum. + // https://avro.apache.org/docs/1.11.1/specification/_print/#unions + debug_assert!( + union_schema + .variants() + .iter() + .map(Schema::canonical_form) // Schema doesn't implement Eq, but only PartialEq. + .duplicates() + .next() + .is_none(), + "Union contains duplicate types: {union_schema:?}", + ); // We only support using union to represent nullable fields, not general unions. - let variants = union_schema.variants(); - if variants.len() != 2 || !variants.contains(&Schema::Null) { - bail!( - "unsupported Avro type, only unions like [null, T] is supported: {:?}", - schema - ); - } - let nested_schema = variants - .iter() - .find_or_first(|s| !matches!(s, Schema::Null)) - .unwrap(); + match get_nullable_union_inner(union_schema) { + Some(inner) => avro_type_mapping(inner, map_handling)?, + None => { + // Convert the union to a struct, each field of the struct represents a variant of the union. + // Refer to https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2179761345 to see why it's not perfect. + // Note: Avro union's variant tag is type name, not field name (unlike Rust enum, or Protobuf oneof). + + // XXX: do we need to introduce union.handling.mode? - avro_type_mapping(nested_schema, map_handling)? + let (fields, field_names) = union_schema + .variants() + .iter() + .filter(|variant| !matches!(variant, &&Schema::Null)) + .map(|variant| { + avro_type_mapping(variant, map_handling).map(|t| { + let name = match variant { + Schema::Null => unreachable!(), + Schema::Boolean => "boolean".to_string(), + Schema::Int => "integer".to_string(), + Schema::Long => "bigint".to_string(), + Schema::Float => "real".to_string(), + Schema::Double => "double precision".to_string(), + Schema::Bytes => "bytea".to_string(), + Schema::String => "text".to_string(), + Schema::Array(_) => "array".to_string(), + Schema::Map(_) =>"map".to_string(), + Schema::Union(_) => "union".to_string(), + // For logical types, should we use the real type or the logical type as the field name? + // + // Example about the representation: + // schema: ["null", {"type":"string","logicalType":"uuid"}] + // data: {"string": "67e55044-10b1-426f-9247-bb680e5fe0c8"} + // + // Note: for union with logical type AND the real type, e.g., ["string", {"type":"string","logicalType":"uuid"}] + // In this case, the uuid cannot be constructed. Some library + // https://issues.apache.org/jira/browse/AVRO-2380 + Schema::Uuid => "uuid".to_string(), + Schema::Decimal(_) => todo!(), + Schema::Date => "date".to_string(), + Schema::TimeMillis => "time without time zone".to_string(), + Schema::TimeMicros => "time without time zone".to_string(), + Schema::TimestampMillis => "timestamp without time zone".to_string(), + Schema::TimestampMicros => "timestamp without time zone".to_string(), + Schema::LocalTimestampMillis => "timestamp without time zone".to_string(), + Schema::LocalTimestampMicros => "timestamp without time zone".to_string(), + Schema::Duration => "interval".to_string(), + Schema::Enum(_) + | Schema::Ref { name: _ } + | Schema::Fixed(_) => todo!(), + | Schema::Record(_) => variant.name().unwrap().fullname(None), // XXX: Is the namespace correct here? + }; + (t, name) + }) + }) + .process_results(|it| it.unzip::<_, _, Vec<_>, Vec<_>>()) + .context("failed to convert Avro union to struct")?; + + DataType::new_struct(fields, field_names); + + bail!( + "unsupported Avro type, only unions like [null, T] is supported: {:?}", + schema + ); + } + } } Schema::Ref { name } => { if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME @@ -236,6 +303,10 @@ fn avro_type_mapping( } } None => { + // We require it to be specified, because we don't want to have a bad default behavior. + // But perhaps changing the default behavior won't be a breaking change, + // because it affects only on creation time, what the result ColumnDesc will be, and the ColumnDesc will be persisted. + // This is unlike timestamp.handing.mode, which affects parser's behavior on the runtime. bail!("`map.handling.mode` not specified in ENCODE AVRO (...). Currently supported modes: `jsonb`") } } diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 1a40b87c9d498..467fb4c7379da 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -19,8 +19,8 @@ use apache_avro::types::Value; use apache_avro::{from_avro_datum, Schema}; use risingwave_common::try_match_expand; use risingwave_connector_codec::decoder::avro::{ - avro_extract_field_schema, avro_schema_skip_union, avro_schema_to_column_descs, AvroAccess, - AvroParseOptions, ResolvedAvroSchema, + avro_extract_field_schema, avro_schema_skip_nullable_union, avro_schema_to_column_descs, + AvroAccess, AvroParseOptions, ResolvedAvroSchema, }; use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; use risingwave_pb::plan_common::ColumnDesc; @@ -126,7 +126,7 @@ impl DebeziumAvroParserConfig { pub fn map_to_columns(&self) -> ConnectorResult> { avro_schema_to_column_descs( - avro_schema_skip_union(avro_extract_field_schema( + avro_schema_skip_nullable_union(avro_extract_field_schema( // FIXME: use resolved schema here. // Currently it works because "after" refers to a subtree in "before", // but in theory, inside "before" there could also be a reference. @@ -227,7 +227,7 @@ mod tests { let outer_schema = get_outer_schema(); let expected_inner_schema = Schema::parse_str(inner_shema_str).unwrap(); - let extracted_inner_schema = avro_schema_skip_union( + let extracted_inner_schema = avro_schema_skip_nullable_union( avro_extract_field_schema(&outer_schema, Some("before")).unwrap(), ) .unwrap(); @@ -318,7 +318,7 @@ mod tests { fn test_map_to_columns() { let outer_schema = get_outer_schema(); let columns = avro_schema_to_column_descs( - avro_schema_skip_union( + avro_schema_skip_nullable_union( avro_extract_field_schema(&outer_schema, Some("before")).unwrap(), ) .unwrap(),