diff --git a/e2e_test/commands/sr_register b/e2e_test/commands/sr_register new file mode 100755 index 0000000000000..57dc65e50610d --- /dev/null +++ b/e2e_test/commands/sr_register @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +set -euo pipefail + +# Register a schema to schema registry +# +# Usage: sr_register +# +# https://docs.confluent.io/platform/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions + +# Validate arguments +if [[ $# -ne 2 ]]; then + echo "Usage: sr_register " + exit 1 +fi + +subject="$1" +schema="$2" + + +if [[ -z $subject || -z $schema ]]; then + echo "Error: Arguments cannot be empty" + exit 1 +fi + +echo "$schema" | jq '{"schema": tojson}' \ +| curl -X POST -H 'content-type:application/vnd.schemaregistry.v1+json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/${subject}/versions" diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 446fc6196d32b..57677af57cd92 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -12,8 +12,7 @@ system ok rpk topic create 'avro_alter_source_test' system ok -echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \ -| curl -X POST -H 'content-type:application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions" +sr_register avro_alter_source_test-value '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' statement ok create source s @@ -27,8 +26,7 @@ FORMAT PLAIN ENCODE AVRO ( # create a new version of schema and produce a message system ok -echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \ -| curl -X POST -H 'content-type:application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions" +sr_register avro_alter_source_test-value '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' system ok echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test diff --git a/e2e_test/source_inline/kafka/avro/union.slt b/e2e_test/source_inline/kafka/avro/union.slt new file mode 100644 index 0000000000000..44e1db659d120 --- /dev/null +++ b/e2e_test/source_inline/kafka/avro/union.slt @@ -0,0 +1,175 @@ +control substitution on + +system ok +rpk topic delete 'avro-union' || true; \ +(rpk sr subject delete 'avro-union-value' && rpk sr subject delete 'avro-union-value' --permanent) || true; +rpk topic create avro-union + +system ok +sr_register avro-union-value ' +{ + "type": "record", + "name": "Root", + "fields": [ + { + "name": "unionType", + "type": ["int", "string"] + }, + { + "name": "unionTypeComplex", + "type": [ + "null", + {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]}, + {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]}, + {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]} + ] + }, + { + "name": "enumField", + "type": ["null", "int", { + "type": "enum", + "name": "myEnum", + "namespace": "my.namespace", + "symbols": ["A", "B", "C", "D"] + }], + "default": null + } + ] +} +' + +system ok +cat<: ScalarBounds> + 'a + Copy { macro_rules! scalar_impl_enum { ($( { $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty } ),*) => { /// `ScalarImpl` embeds all possible scalars in the evaluation framework. + /// + /// See `for_all_variants` for the definition. #[derive(Debug, Clone, PartialEq, Eq, EstimateSize)] pub enum ScalarImpl { $( $variant_name($scalar) ),* diff --git a/src/connector/codec/src/decoder/avro/mod.rs b/src/connector/codec/src/decoder/avro/mod.rs index cdd9aea416c8f..2cd6cf5ac77c3 100644 --- a/src/connector/codec/src/decoder/avro/mod.rs +++ b/src/connector/codec/src/decoder/avro/mod.rs @@ -15,7 +15,7 @@ mod schema; use std::sync::LazyLock; -use apache_avro::schema::{DecimalSchema, RecordSchema}; +use apache_avro::schema::{DecimalSchema, RecordSchema, UnionSchema}; use apache_avro::types::{Value, ValueKind}; use apache_avro::{Decimal as AvroDecimal, Schema}; use chrono::Datelike; @@ -33,6 +33,7 @@ use risingwave_common::util::iter_util::ZipEqFast; pub use self::schema::{avro_schema_to_column_descs, MapHandling, ResolvedAvroSchema}; use super::utils::extract_decimal; use super::{bail_uncategorized, uncategorized, Access, AccessError, AccessResult}; +use crate::decoder::avro::schema::avro_schema_to_struct_field_name; #[derive(Clone)] /// Options for parsing an `AvroValue` into Datum, with an optional avro schema. @@ -107,6 +108,54 @@ impl<'a> AvroParseOptions<'a> { let v: ScalarImpl = match (type_expected, value) { (_, Value::Null) => return Ok(DatumCow::NULL), + // ---- Union (with >=2 non null variants), and nullable Union ([null, record]) ----- + (DataType::Struct(struct_type_info), Value::Union(variant, v)) => { + let Some(Schema::Union(u)) = self.schema else { + // XXX: Is this branch actually unreachable? (if self.schema is correctly used) + return Err(create_error()); + }; + + if let Some(inner) = get_nullable_union_inner(u) { + // nullable Union ([null, record]) + return Self { + schema: Some(inner), + relax_numeric: self.relax_numeric, + } + .convert_to_datum(v, type_expected); + } + let variant_schema = &u.variants()[*variant as usize]; + + if matches!(variant_schema, &Schema::Null) { + return Ok(DatumCow::NULL); + } + + // Here we compare the field name, instead of using the variant idx to find the field idx. + // The latter approach might also work, but might be more error-prone. + // We will need to get the index of the "null" variant, and then re-map the variant index to the field index. + // XXX: probably we can unwrap here (if self.schema is correctly used) + let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?; + + let mut fields = Vec::with_capacity(struct_type_info.len()); + for (field_name, field_type) in struct_type_info + .names() + .zip_eq_fast(struct_type_info.types()) + { + if field_name == expected_field_name { + let datum = Self { + schema: Some(variant_schema), + relax_numeric: self.relax_numeric, + } + .convert_to_datum(v, field_type)? + .to_owned_datum(); + + fields.push(datum) + } else { + fields.push(None) + } + } + StructValue::new(fields).into() + } + // nullable Union ([null, T]) (_, Value::Union(_, v)) => { let schema = self.extract_inner_schema(None); return Self { @@ -290,6 +339,12 @@ impl Access for AvroAccess<'_> { let mut value = self.value; let mut options: AvroParseOptions<'_> = self.options.clone(); + debug_assert!( + path.len() == 1 + || (path.len() == 2 && matches!(path[0], "before" | "after" | "source")), + "unexpected path access: {:?}", + path + ); let mut i = 0; while i < path.len() { let key = path[i]; @@ -299,6 +354,29 @@ impl Access for AvroAccess<'_> { }; match value { Value::Union(_, v) => { + // The debezium "before" field is a nullable union. + // "fields": [ + // { + // "name": "before", + // "type": [ + // "null", + // { + // "type": "record", + // "name": "Value", + // "fields": [...], + // } + // ], + // "default": null + // }, + // { + // "name": "after", + // "type": [ + // "null", + // "Value" + // ], + // "default": null + // }, + // ...] value = v; options.schema = options.extract_inner_schema(None); continue; @@ -338,18 +416,30 @@ pub(crate) fn avro_decimal_to_rust_decimal( )) } -pub fn avro_schema_skip_union(schema: &Schema) -> anyhow::Result<&Schema> { +/// If the union schema is `[null, T]` or `[T, null]`, returns `Some(T)`; otherwise returns `None`. +fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> { + let variants = union_schema.variants(); + // Note: `[null, null] is invalid`, we don't need to worry about that. + if variants.len() == 2 && variants.contains(&Schema::Null) { + let inner_schema = variants + .iter() + .find(|s| !matches!(s, &&Schema::Null)) + .unwrap(); + Some(inner_schema) + } else { + None + } +} + +pub fn avro_schema_skip_nullable_union(schema: &Schema) -> anyhow::Result<&Schema> { match schema { - Schema::Union(union_schema) => { - let inner_schema = union_schema - .variants() - .iter() - .find(|s| !matches!(s, &&Schema::Null)) - .ok_or_else(|| { - anyhow::format_err!("illegal avro record schema {:?}", union_schema) - })?; - Ok(inner_schema) - } + Schema::Union(union_schema) => match get_nullable_union_inner(union_schema) { + Some(s) => Ok(s), + None => Err(anyhow::format_err!( + "illegal avro union schema, expected [null, T], got {:?}", + union_schema + )), + }, other => Ok(other), } } @@ -372,7 +462,9 @@ pub fn avro_extract_field_schema<'a>( Ok(&field.schema) } Schema::Array(schema) => Ok(schema), - Schema::Union(_) => avro_schema_skip_union(schema), + // Only nullable union should be handled here. + // We will not extract inner schema for real union (and it's not extractable). + Schema::Union(_) => avro_schema_skip_nullable_union(schema), Schema::Map(schema) => Ok(schema), _ => bail!("avro schema does not have inner item, schema: {:?}", schema), } @@ -476,11 +568,337 @@ pub(crate) fn avro_to_jsonb(avro: &Value, builder: &mut jsonbb::Builder) -> Acce mod tests { use std::str::FromStr; - use apache_avro::Decimal as AvroDecimal; + use apache_avro::{from_avro_datum, Decimal as AvroDecimal}; + use expect_test::expect; use risingwave_common::types::{Datum, Decimal}; use super::*; + /// Test the behavior of the Rust Avro lib for handling union with logical type. + #[test] + fn test_avro_lib_union() { + // duplicate types + let s = Schema::parse_str(r#"["null", "null"]"#); + expect![[r#" + Err( + Unions cannot contain duplicate types, + ) + "#]] + .assert_debug_eq(&s); + let s = Schema::parse_str(r#"["int", "int"]"#); + expect![[r#" + Err( + Unions cannot contain duplicate types, + ) + "#]] + .assert_debug_eq(&s); + // multiple map/array are considered as the same type, regardless of the element type! + let s = Schema::parse_str( + r#"[ +"null", +{ + "type": "map", + "values" : "long", + "default": {} +}, +{ + "type": "map", + "values" : "int", + "default": {} +} +] +"#, + ); + expect![[r#" + Err( + Unions cannot contain duplicate types, + ) + "#]] + .assert_debug_eq(&s); + let s = Schema::parse_str( + r#"[ +"null", +{ + "type": "array", + "items" : "long", + "default": {} +}, +{ + "type": "array", + "items" : "int", + "default": {} +} +] +"#, + ); + expect![[r#" + Err( + Unions cannot contain duplicate types, + ) + "#]] + .assert_debug_eq(&s); + // multiple named types + let s = Schema::parse_str( + r#"[ +"null", +{"type":"fixed","name":"a","size":16}, +{"type":"fixed","name":"b","size":32} +] +"#, + ); + expect![[r#" + Ok( + Union( + UnionSchema { + schemas: [ + Null, + Fixed( + FixedSchema { + name: Name { + name: "a", + namespace: None, + }, + aliases: None, + doc: None, + size: 16, + attributes: {}, + }, + ), + Fixed( + FixedSchema { + name: Name { + name: "b", + namespace: None, + }, + aliases: None, + doc: None, + size: 32, + attributes: {}, + }, + ), + ], + variant_index: { + Null: 0, + }, + }, + ), + ) + "#]] + .assert_debug_eq(&s); + + // union in union + let s = Schema::parse_str(r#"["int", ["null", "int"]]"#); + expect![[r#" + Err( + Unions may not directly contain a union, + ) + "#]] + .assert_debug_eq(&s); + + // logical type + let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap(); + expect![[r#" + Union( + UnionSchema { + schemas: [ + Null, + Uuid, + ], + variant_index: { + Null: 0, + Uuid: 1, + }, + }, + ) + "#]] + .assert_debug_eq(&s); + // Note: Java Avro lib rejects this (logical type unions with its physical type) + let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap(); + expect![[r#" + Union( + UnionSchema { + schemas: [ + String, + Uuid, + ], + variant_index: { + String: 0, + Uuid: 1, + }, + }, + ) + "#]] + .assert_debug_eq(&s); + // Note: Java Avro lib rejects this (logical type unions with its physical type) + let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap(); + expect![[r#" + Union( + UnionSchema { + schemas: [ + Int, + Date, + ], + variant_index: { + Int: 0, + Date: 1, + }, + }, + ) + "#]] + .assert_debug_eq(&s); + // Note: Java Avro lib allows this (2 decimal with different "name") + let s = Schema::parse_str( + r#"[ +{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2}, +{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2} +]"#, + ); + expect![[r#" + Err( + Unions cannot contain duplicate types, + ) + "#]] + .assert_debug_eq(&s); + } + + #[test] + fn test_avro_lib_union_record_bug() { + // multiple named types (record) + let s = Schema::parse_str( + r#" + { + "type": "record", + "name": "Root", + "fields": [ + { + "name": "unionTypeComplex", + "type": [ + "null", + {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]}, + {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]}, + {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]} + ] + } + ] + } + "#, + ) + .unwrap(); + + let bytes = hex::decode("060c").unwrap(); + // Correct should be variant 3 (Sms) + let correct_value = from_avro_datum(&s, &mut bytes.as_slice(), None); + expect![[r#" + Ok( + Record( + [ + ( + "unionTypeComplex", + Union( + 3, + Record( + [ + ( + "inner", + Int( + 6, + ), + ), + ], + ), + ), + ), + ], + ), + ) + "#]] + .assert_debug_eq(&correct_value); + // Bug: We got variant 2 (Fax) here, if we pass the reader schema. + let wrong_value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s)); + expect![[r#" + Ok( + Record( + [ + ( + "unionTypeComplex", + Union( + 2, + Record( + [ + ( + "inner", + Int( + 6, + ), + ), + ], + ), + ), + ), + ], + ), + ) + "#]] + .assert_debug_eq(&wrong_value); + + // The bug below can explain what happened. + // The two records below are actually incompatible: https://avro.apache.org/docs/1.11.1/specification/_print/#schema-resolution + // > both schemas are records with the _same (unqualified) name_ + // In from_avro_datum, it first reads the value with the writer schema, and then + // it just uses the reader schema to interpret the value. + // The value doesn't have record "name" information. So it wrongly passed the conversion. + // The correct way is that we need to use both the writer and reader schema in the second step to interpret the value. + + let s = Schema::parse_str( + r#" + { + "type": "record", + "name": "Root", + "fields": [ + { + "name": "a", + "type": "int" + } + ] + } + "#, + ) + .unwrap(); + let s2 = Schema::parse_str( + r#" +{ + "type": "record", + "name": "Root222", + "fields": [ + { + "name": "a", + "type": "int" + } + ] +} + "#, + ) + .unwrap(); + + let bytes = hex::decode("0c").unwrap(); + let value = from_avro_datum(&s, &mut bytes.as_slice(), Some(&s2)); + expect![[r#" + Ok( + Record( + [ + ( + "a", + Int( + 6, + ), + ), + ], + ), + ) + "#]] + .assert_debug_eq(&value); + } + #[test] fn test_convert_decimal() { // 280 diff --git a/src/connector/codec/src/decoder/avro/schema.rs b/src/connector/codec/src/decoder/avro/schema.rs index fe96495d089ea..324b7fd426a56 100644 --- a/src/connector/codec/src/decoder/avro/schema.rs +++ b/src/connector/codec/src/decoder/avro/schema.rs @@ -14,14 +14,18 @@ use std::sync::{Arc, LazyLock}; +use anyhow::Context; use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema}; use apache_avro::AvroResult; use itertools::Itertools; -use risingwave_common::bail; +use risingwave_common::error::NotImplemented; use risingwave_common::log::LogSuppresser; use risingwave_common::types::{DataType, Decimal}; +use risingwave_common::{bail, bail_not_implemented}; use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion}; +use super::get_nullable_union_inner; + /// Avro schema with `Ref` inlined. The newtype is used to indicate whether the schema is resolved. /// /// TODO: Actually most of the place should use resolved schema, but currently they just happen to work (Some edge cases are not met yet). @@ -198,20 +202,46 @@ fn avro_type_mapping( DataType::List(Box::new(item_type)) } Schema::Union(union_schema) => { - // We only support using union to represent nullable fields, not general unions. - let variants = union_schema.variants(); - if variants.len() != 2 || !variants.contains(&Schema::Null) { - bail!( - "unsupported Avro type, only unions like [null, T] is supported: {:?}", - schema - ); - } - let nested_schema = variants - .iter() - .find_or_first(|s| !matches!(s, Schema::Null)) - .unwrap(); + // Note: Unions may not immediately contain other unions. So a `null` must represent a top-level null. + // e.g., ["null", ["null", "string"]] is not allowed + + // Note: Unions may not contain more than one schema with the same type, except for the named types record, fixed and enum. + // https://avro.apache.org/docs/1.11.1/specification/_print/#unions + debug_assert!( + union_schema + .variants() + .iter() + .map(Schema::canonical_form) // Schema doesn't implement Eq, but only PartialEq. + .duplicates() + .next() + .is_none(), + "Union contains duplicate types: {union_schema:?}", + ); + match get_nullable_union_inner(union_schema) { + Some(inner) => avro_type_mapping(inner, map_handling)?, + None => { + // Convert the union to a struct, each field of the struct represents a variant of the union. + // Refer to https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2179761345 to see why it's not perfect. + // Note: Avro union's variant tag is type name, not field name (unlike Rust enum, or Protobuf oneof). + + // XXX: do we need to introduce union.handling.mode? + let (fields, field_names) = union_schema + .variants() + .iter() + // null will mean the whole struct is null + .filter(|variant| !matches!(variant, &&Schema::Null)) + .map(|variant| { + avro_type_mapping(variant, map_handling).and_then(|t| { + let name = avro_schema_to_struct_field_name(variant)?; + Ok((t, name)) + }) + }) + .process_results(|it| it.unzip::<_, _, Vec<_>, Vec<_>>()) + .context("failed to convert Avro union to struct")?; - avro_type_mapping(nested_schema, map_handling)? + DataType::new_struct(fields, field_names) + } + } } Schema::Ref { name } => { if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME @@ -219,7 +249,7 @@ fn avro_type_mapping( { DataType::Decimal } else { - bail!("unsupported Avro type: {:?}", schema); + bail_not_implemented!("Avro type: {:?}", schema); } } Schema::Map(value_schema) => { @@ -229,20 +259,25 @@ fn avro_type_mapping( if supported_avro_to_json_type(value_schema) { DataType::Jsonb } else { - bail!( - "unsupported Avro type, cannot convert map to jsonb: {:?}", + bail_not_implemented!( + issue = 16963, + "Avro map type to jsonb: {:?}", schema - ) + ); } } None => { + // We require it to be specified, because we don't want to have a bad default behavior. + // But perhaps changing the default behavior won't be a breaking change, + // because it affects only on creation time, what the result ColumnDesc will be, and the ColumnDesc will be persisted. + // This is unlike timestamp.handing.mode, which affects parser's behavior on the runtime. bail!("`map.handling.mode` not specified in ENCODE AVRO (...). Currently supported modes: `jsonb`") } } } Schema::Uuid => DataType::Varchar, Schema::Null | Schema::Fixed(_) => { - bail!("unsupported Avro type: {:?}", schema) + bail_not_implemented!("Avro type: {:?}", schema); } }; @@ -280,3 +315,71 @@ fn supported_avro_to_json_type(schema: &Schema) -> bool { | Schema::Union(_) => false, } } + +/// The field name when converting Avro union type to RisingWave struct type. +pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> Result { + Ok(match schema { + Schema::Null => unreachable!(), + Schema::Union(_) => unreachable!(), + // Primitive types + Schema::Boolean => "boolean".to_string(), + Schema::Int => "int".to_string(), + Schema::Long => "long".to_string(), + Schema::Float => "float".to_string(), + Schema::Double => "double".to_string(), + Schema::Bytes => "bytes".to_string(), + Schema::String => "string".to_string(), + // Unnamed Complex types + Schema::Array(_) => "array".to_string(), + Schema::Map(_) => "map".to_string(), + // Named Complex types + Schema::Enum(_) | Schema::Ref { name: _ } | Schema::Fixed(_) | Schema::Record(_) => { + // schema.name().unwrap().fullname(None) + // See test_avro_lib_union_record_bug + // https://github.com/risingwavelabs/risingwave/issues/17632 + bail_not_implemented!(issue=17632, "Avro named type used in Union type: {:?}", schema) + + } + + // Logical types are currently banned. See https://github.com/risingwavelabs/risingwave/issues/17616 + +/* + Schema::Uuid => "uuid".to_string(), + // Decimal is the most tricky. https://avro.apache.org/docs/1.11.1/specification/_print/#decimal + // - A decimal logical type annotates Avro bytes _or_ fixed types. + // - It has attributes `precision` and `scale`. + // "For the purposes of schema resolution, two schemas that are decimal logical types match if their scales and precisions match." + // - When the physical type is fixed, it's a named type. And a schema containing 2 decimals is possible: + // [ + // {"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2}, + // {"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2} + // ] + // In this case (a logical type's physical type is a named type), perhaps we should use the physical type's `name`. + Schema::Decimal(_) => "decimal".to_string(), + Schema::Date => "date".to_string(), + // Note: in Avro, the name style is "time-millis", etc. + // But in RisingWave (Postgres), it will require users to use quotes, i.e., + // select (struct)."time-millis", (struct).time_millies from t; + // The latter might be more user-friendly. + Schema::TimeMillis => "time_millis".to_string(), + Schema::TimeMicros => "time_micros".to_string(), + Schema::TimestampMillis => "timestamp_millis".to_string(), + Schema::TimestampMicros => "timestamp_micros".to_string(), + Schema::LocalTimestampMillis => "local_timestamp_millis".to_string(), + Schema::LocalTimestampMicros => "local_timestamp_micros".to_string(), + Schema::Duration => "duration".to_string(), +*/ + Schema::Uuid + | Schema::Decimal(_) + | Schema::Date + | Schema::TimeMillis + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::Duration => { + bail_not_implemented!(issue=17616, "Avro logicalType used in Union type: {:?}", schema) + } + }) +} diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index c7e04ab210a6e..cd7fe14ab74ea 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -16,6 +16,7 @@ pub mod avro; pub mod json; pub mod utils; +use risingwave_common::error::NotImplemented; use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum}; use thiserror::Error; use thiserror_ext::Macro; @@ -40,23 +41,41 @@ pub enum AccessError { /// Errors that are not categorized into variants above. #[error("{message}")] Uncategorized { message: String }, + + #[error(transparent)] + NotImplemented(#[from] NotImplemented), } pub type AccessResult = std::result::Result; /// Access to a field in the data structure. Created by `AccessBuilder`. +/// +/// It's the `ENCODE ...` part in `FORMAT ... ENCODE ...` pub trait Access { /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data), /// and then converts it to RisingWave `Datum`. + /// /// `type_expected` might or might not be used during the conversion depending on the implementation. /// /// # Path /// - /// We usually expect the data is a record (struct), and `path` represents field path. + /// We usually expect the data (`Access` instance) is a record (struct), and `path` represents field path. /// The data (or part of the data) represents the whole row (`Vec`), /// and we use different `path` to access one column at a time. /// - /// e.g., for Avro, we access `["col_name"]`; for Debezium Avro, we access `["before", "col_name"]`. + /// TODO: the meaning of `path` is a little confusing and maybe over-abstracted. + /// `access` does not need to serve arbitrarily deep `path` access, but just "top-level" access. + /// The API creates an illusion that arbitrary access is supported, but it's not. + /// Perhapts we should separate out another trait like `ToDatum`, + /// which only does type mapping, without caring about the path. And `path` itself is only an `enum` instead of `&[&str]`. + /// + /// What `path` to access is decided by the CDC layer, i.e., the `FORMAT ...` part (`ChangeEvent`). + /// e.g., + /// - `DebeziumChangeEvent` accesses `["before", "col_name"]` for value, + /// `["source", "db"]`, `["source", "table"]` etc. for additional columns' values, + /// `["op"]` for op type. + /// - `MaxwellChangeEvent` accesses `["data", "col_name"]` for value, `["type"]` for op type. + /// - In the simplest case, for `FORMAT PLAIN/UPSERT` (`KvEvent`), they just access `["col_name"]` for value, and op type is derived. /// /// # Returns /// diff --git a/src/connector/codec/tests/integration_tests/avro.rs b/src/connector/codec/tests/integration_tests/avro.rs index fab143b2bf9e7..11421c151d7a5 100644 --- a/src/connector/codec/tests/integration_tests/avro.rs +++ b/src/connector/codec/tests/integration_tests/avro.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::Context; use apache_avro::from_avro_datum; use risingwave_connector_codec::decoder::avro::{ avro_schema_to_column_descs, AvroAccess, AvroParseOptions, MapHandling, ResolvedAvroSchema, }; use risingwave_connector_codec::decoder::Access; use risingwave_connector_codec::AvroSchema; +use thiserror_ext::AsReport; use crate::utils::*; @@ -44,6 +46,24 @@ struct Config { data_encoding: TestDataEncoding, } +fn avro_schema_str_to_risingwave_schema( + avro_schema: &str, + config: &Config, +) -> anyhow::Result<(ResolvedAvroSchema, Vec)> { + // manually implement some logic in AvroParserConfig::map_to_columns + let avro_schema = AvroSchema::parse_str(avro_schema).context("failed to parse Avro schema")?; + let resolved_schema = + ResolvedAvroSchema::create(avro_schema.into()).context("failed to resolve Avro schema")?; + + let rw_schema = + avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling) + .context("failed to convert Avro schema to RisingWave schema")? + .iter() + .map(ColumnDesc::from) + .collect_vec(); + Ok((resolved_schema, rw_schema)) +} + /// Data driven testing for converting Avro Schema to RisingWave Schema, and then converting Avro data into RisingWave data. /// /// The expected results can be automatically updated. To run and update the tests: @@ -79,17 +99,15 @@ fn check( expected_risingwave_schema: expect_test::Expect, expected_risingwave_data: expect_test::Expect, ) { - // manually implement some logic in AvroParserConfig::map_to_columns - let avro_schema = AvroSchema::parse_str(avro_schema).expect("failed to parse Avro schema"); - let resolved_schema = - ResolvedAvroSchema::create(avro_schema.into()).expect("failed to resolve Avro schema"); - - let rw_schema = - avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling) - .expect("failed to convert Avro schema to RisingWave schema") - .iter() - .map(ColumnDesc::from) - .collect_vec(); + let (resolved_schema, rw_schema) = + match avro_schema_str_to_risingwave_schema(avro_schema, &config) { + Ok(res) => res, + Err(e) => { + expected_risingwave_schema.assert_eq(&format!("{}", e.as_report())); + expected_risingwave_data.assert_eq(""); + return; + } + }; expected_risingwave_schema.assert_eq(&format!( "{:#?}", rw_schema.iter().map(ColumnDescTestDisplay).collect_vec() @@ -554,3 +572,316 @@ fn test_1() { Owned(Float64(OrderedFloat(NaN)))"#]], ); } + +#[test] +fn test_union() { + // A basic test + check( + r#" +{ + "type": "record", + "name": "Root", + "fields": [ + { + "name": "unionType", + "type": ["int", "string"] + }, + { + "name": "unionTypeComplex", + "type": [ + "null", + {"type": "record", "name": "Email","fields": [{"name":"inner","type":"string"}]}, + {"type": "record", "name": "Fax","fields": [{"name":"inner","type":"int"}]}, + {"type": "record", "name": "Sms","fields": [{"name":"inner","type":"int"}]} + ] + }, + { + "name": "nullableString", + "type": ["null", "string"] + } + ] +} + "#, + &[ + // { + // "unionType": {"int": 114514}, + // "unionTypeComplex": {"Sms": {"inner":6}}, + // "nullableString": null + // } + "00a4fd0d060c00", + // { + // "unionType": {"int": 114514}, + // "unionTypeComplex": {"Fax": {"inner":6}}, + // "nullableString": null + // } + "00a4fd0d040c00", + // { + // "unionType": {"string": "oops"}, + // "unionTypeComplex": null, + // "nullableString": {"string": "hello"} + // } + "02086f6f707300020a68656c6c6f", + // { + // "unionType": {"string": "oops"}, + // "unionTypeComplex": {"Email": {"inner":"a@b.c"}}, + // "nullableString": null + // } + "02086f6f7073020a6140622e6300", + ], + Config { + map_handling: None, + data_encoding: TestDataEncoding::HexBinary, + }, + // FIXME: why the struct type doesn't have field_descs? https://github.com/risingwavelabs/risingwave/issues/17128 + expect![[r#" + failed to convert Avro schema to RisingWave schema: failed to convert Avro union to struct: Feature is not yet implemented: Avro named type used in Union type: Record(RecordSchema { name: Name { name: "Email", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "inner", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }], lookup: {"inner": 0}, attributes: {} }) + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17632"#]], + expect![""], + ); + + // logicalType is currently rejected + // https://github.com/risingwavelabs/risingwave/issues/17616 + check( + r#" +{ +"type": "record", +"name": "Root", +"fields": [ + { + "name": "unionLogical", + "type": ["int", {"type":"int", "logicalType": "date"}] + } +] +} + "#, + &[], + Config { + map_handling: None, + data_encoding: TestDataEncoding::HexBinary, + }, + expect![[r#" + failed to convert Avro schema to RisingWave schema: failed to convert Avro union to struct: Feature is not yet implemented: Avro logicalType used in Union type: Date + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17616"#]], + expect![""], + ); + + // test named type. Consider namespace. + // https://avro.apache.org/docs/1.11.1/specification/_print/#names + // List of things to take care: + // - Record fields and enum symbols DO NOT have namespace. + // - If the name specified contains a dot, then it is assumed to be a fullname, and any namespace also specified is IGNORED. + // - If a name doesn't have its own namespace, it will look for its most tightly enclosing named schema. + check( + r#" +{ + "type": "record", + "name": "Root", + "namespace": "RootNamespace", + "fields": [ + { + "name": "littleFieldToMakeNestingLooksBetter", + "type": ["null","int"], "default": null + }, + { + "name": "recordField", + "type": ["null", "int", { + "type": "record", + "name": "my.name.spaced.record", + "namespace": "when.name.contains.dot.namespace.is.ignored", + "fields": [ + {"name": "hello", "type": {"type": "int", "default": 1}}, + {"name": "world", "type": {"type": "double", "default": 1}} + ] + }], + "default": null + }, + { + "name": "enumField", + "type": ["null", "int", { + "type": "enum", + "name": "myEnum", + "namespace": "my.namespace", + "symbols": ["A", "B", "C", "D"] + }], + "default": null + }, + { + "name": "anotherEnumFieldUsingRootNamespace", + "type": ["null", "int", { + "type": "enum", + "name": "myEnum", + "symbols": ["A", "B", "C", "D"] + }], + "default": null + } + ] +} +"#, + &[ + // { + // "enumField":{"my.namespace.myEnum":"A"}, + // "anotherEnumFieldUsingRootNamespace":{"RootNamespace.myEnum": "D"} + // } + "000004000406", + ], + Config { + map_handling: None, + data_encoding: TestDataEncoding::HexBinary, + }, + expect![[r#" + failed to convert Avro schema to RisingWave schema: failed to convert Avro union to struct: Feature is not yet implemented: Avro named type used in Union type: Record(RecordSchema { name: Name { name: "record", namespace: Some("my.name.spaced") }, aliases: None, doc: None, fields: [RecordField { name: "hello", doc: None, aliases: None, default: None, schema: Int, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "world", doc: None, aliases: None, default: None, schema: Double, order: Ascending, position: 1, custom_attributes: {} }], lookup: {"hello": 0, "world": 1}, attributes: {} }) + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/17632"#]], + expect![""], + ); + + // This is provided by a user https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2051480710 + check( + r#" +{ + "namespace": "com.abc.efg.mqtt", + "name": "also.DataMessage", + "type": "record", + "fields": [ + { + "name": "metrics", + "type": { + "type": "array", + "items": { + "name": "also_data_metric", + "type": "record", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "norm_name", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "uom", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "data", + "type": { + "type": "array", + "items": { + "name": "dataItem", + "type": "record", + "fields": [ + { + "name": "ts", + "type": "string", + "doc": "Timestamp of the metric." + }, + { + "name": "value", + "type": [ + "null", + "boolean", + "double", + "string" + ], + "doc": "Value of the metric." + } + ] + } + }, + "doc": "The data message" + } + ], + "doc": "A metric object" + } + }, + "doc": "A list of metrics." + } + ] +} + "#, + &[ + // { + // "metrics": [ + // {"id":"foo", "name":"a", "data": [] } + // ] + // } + "0206666f6f026100000000", + // { + // "metrics": [ + // {"id":"foo", "name":"a", "norm_name": null, "uom": {"string":"c"}, "data": [{"ts":"1", "value":null}, {"ts":"2", "value": {"boolean": true }}] } + // ] + // } + "0206666f6f02610002026304023100023202010000", + ], + Config { + map_handling: None, + data_encoding: TestDataEncoding::HexBinary, + }, + expect![[r#" + [ + metrics(#1): List( + Struct { + id: Varchar, + name: Varchar, + norm_name: Varchar, + uom: Varchar, + data: List( + Struct { + ts: Varchar, + value: Struct { + boolean: Boolean, + double: Float64, + string: Varchar, + }, + }, + ), + }, + ), + ]"#]], + expect![[r#" + Owned([ + StructValue( + Utf8("foo"), + Utf8("a"), + null, + null, + [], + ), + ]) + ---- + Owned([ + StructValue( + Utf8("foo"), + Utf8("a"), + null, + Utf8("c"), + [ + StructValue( + Utf8("1"), + null, + ), + StructValue( + Utf8("2"), + StructValue( + Bool(true), + null, + null, + ), + ), + ], + ), + ])"#]], + ); +} diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 1a40b87c9d498..04f80ebba1ca1 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -19,8 +19,8 @@ use apache_avro::types::Value; use apache_avro::{from_avro_datum, Schema}; use risingwave_common::try_match_expand; use risingwave_connector_codec::decoder::avro::{ - avro_extract_field_schema, avro_schema_skip_union, avro_schema_to_column_descs, AvroAccess, - AvroParseOptions, ResolvedAvroSchema, + avro_extract_field_schema, avro_schema_skip_nullable_union, avro_schema_to_column_descs, + AvroAccess, AvroParseOptions, ResolvedAvroSchema, }; use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; use risingwave_pb::plan_common::ColumnDesc; @@ -125,8 +125,40 @@ impl DebeziumAvroParserConfig { } pub fn map_to_columns(&self) -> ConnectorResult> { + // Refer to debezium_avro_msg_schema.avsc for how the schema looks like: + + // "fields": [ + // { + // "name": "before", + // "type": [ + // "null", + // { + // "type": "record", + // "name": "Value", + // "fields": [...], + // } + // ], + // "default": null + // }, + // { + // "name": "after", + // "type": [ + // "null", + // "Value" + // ], + // "default": null + // }, + // ...] + + // Other fields are: + // - source: describes the source metadata for the event + // - op + // - ts_ms + // - transaction + // See + avro_schema_to_column_descs( - avro_schema_skip_union(avro_extract_field_schema( + avro_schema_skip_nullable_union(avro_extract_field_schema( // FIXME: use resolved schema here. // Currently it works because "after" refers to a subtree in "before", // but in theory, inside "before" there could also be a reference. @@ -227,7 +259,7 @@ mod tests { let outer_schema = get_outer_schema(); let expected_inner_schema = Schema::parse_str(inner_shema_str).unwrap(); - let extracted_inner_schema = avro_schema_skip_union( + let extracted_inner_schema = avro_schema_skip_nullable_union( avro_extract_field_schema(&outer_schema, Some("before")).unwrap(), ) .unwrap(); @@ -318,7 +350,7 @@ mod tests { fn test_map_to_columns() { let outer_schema = get_outer_schema(); let columns = avro_schema_to_column_descs( - avro_schema_skip_union( + avro_schema_skip_nullable_union( avro_extract_field_schema(&outer_schema, Some("before")).unwrap(), ) .unwrap(), diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index e4a229bb61b98..ca709e2eebc73 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -534,7 +534,7 @@ impl JsonParseOptions { (DataType::Struct(struct_type_info), ValueType::Object) => { // Collecting into a Result> doesn't reserve the capacity in advance, so we `Vec::with_capacity` instead. // https://github.com/rust-lang/rust/issues/48994 - let mut fields = Vec::with_capacity(struct_type_info.types().len()); + let mut fields = Vec::with_capacity(struct_type_info.len()); for (field_name, field_type) in struct_type_info .names() .zip_eq_fast(struct_type_info.types())