diff --git a/e2e_test/source_inline/kafka/avro/ref.slt b/e2e_test/source_inline/kafka/avro/ref.slt new file mode 100644 index 0000000000000..15f36cf4c0d5d --- /dev/null +++ b/e2e_test/source_inline/kafka/avro/ref.slt @@ -0,0 +1,128 @@ +control substitution on + + +system ok +rpk topic create avro-ref + + +system ok +sr_register avro-ref-value AVRO < Node + + +system ok +curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro-ref-value" + + +system ok +curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro-ref-value?permanent=true" + + +system ok +sr_register avro-ref-value AVRO <, ) -> anyhow::Result> { + let resolved = ResolvedSchema::try_from(schema)?; if let Schema::Record(RecordSchema { fields, .. }) = schema { let mut index = 0; + let mut ancestor_records: Vec = vec![]; let fields = fields .iter() .map(|field| { - avro_field_to_column_desc(&field.name, &field.schema, &mut index, map_handling) + avro_field_to_column_desc( + &field.name, + &field.schema, + &mut index, + &mut ancestor_records, + resolved.get_names(), + map_handling, + ) }) .collect::>()?; Ok(fields) @@ -105,10 +113,22 @@ fn avro_field_to_column_desc( name: &str, schema: &Schema, index: &mut i32, + ancestor_records: &mut Vec, + refs: &NamesRef<'_>, map_handling: Option, ) -> anyhow::Result { - let data_type = avro_type_mapping(schema, map_handling)?; + let data_type = avro_type_mapping(schema, ancestor_records, refs, map_handling)?; match schema { + Schema::Ref { name: ref_name } => { + avro_field_to_column_desc( + name, + refs[ref_name], // `ResolvedSchema::try_from` already handles lookup failure + index, + ancestor_records, + refs, + map_handling, + ) + } Schema::Record(RecordSchema { name: schema_name, fields, @@ -116,7 +136,16 @@ fn avro_field_to_column_desc( }) => { let vec_column = fields .iter() - .map(|f| avro_field_to_column_desc(&f.name, &f.schema, index, map_handling)) + .map(|f| { + avro_field_to_column_desc( + &f.name, + &f.schema, + index, + ancestor_records, + refs, + map_handling, + ) + }) .collect::>()?; *index += 1; Ok(ColumnDesc { @@ -146,9 +175,11 @@ fn avro_field_to_column_desc( } } -/// This function expects resolved schema (no `Ref`). +/// This function expects original schema (with `Ref`). fn avro_type_mapping( schema: &Schema, + ancestor_records: &mut Vec, + refs: &NamesRef<'_>, map_handling: Option, ) -> anyhow::Result { let data_type = match schema { @@ -190,16 +221,34 @@ fn avro_type_mapping( return Ok(DataType::Decimal); } - StructType::new( + let unique_name = name.fullname(None); + if ancestor_records.contains(&unique_name) { + bail!( + "circular reference detected in Avro schema: {} -> {}", + ancestor_records.join(" -> "), + unique_name + ); + } + + ancestor_records.push(unique_name); + let ty = StructType::new( fields .iter() - .map(|f| Ok((&f.name, avro_type_mapping(&f.schema, map_handling)?))) + .map(|f| { + Ok(( + &f.name, + avro_type_mapping(&f.schema, ancestor_records, refs, map_handling)?, + )) + }) .collect::>>()?, ) - .into() + .into(); + ancestor_records.pop(); + ty } Schema::Array(item_schema) => { - let item_type = avro_type_mapping(item_schema.as_ref(), map_handling)?; + let item_type = + avro_type_mapping(item_schema.as_ref(), ancestor_records, refs, map_handling)?; DataType::List(Box::new(item_type)) } Schema::Union(union_schema) => { @@ -219,7 +268,7 @@ fn avro_type_mapping( "Union contains duplicate types: {union_schema:?}", ); match get_nullable_union_inner(union_schema) { - Some(inner) => avro_type_mapping(inner, map_handling)?, + Some(inner) => avro_type_mapping(inner, ancestor_records, refs, map_handling)?, None => { // Convert the union to a struct, each field of the struct represents a variant of the union. // Refer to https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2179761345 to see why it's not perfect. @@ -232,10 +281,11 @@ fn avro_type_mapping( // null will mean the whole struct is null .filter(|variant| !matches!(variant, &&Schema::Null)) .map(|variant| { - avro_type_mapping(variant, map_handling).and_then(|t| { - let name = avro_schema_to_struct_field_name(variant)?; - Ok((name, t)) - }) + avro_type_mapping(variant, ancestor_records, refs, map_handling) + .and_then(|t| { + let name = avro_schema_to_struct_field_name(variant)?; + Ok((name, t)) + }) }) .try_collect::<_, Vec<_>, _>() .context("failed to convert Avro union to struct")?; @@ -250,7 +300,12 @@ fn avro_type_mapping( { DataType::Decimal } else { - bail_not_implemented!("Avro type: {:?}", schema); + avro_type_mapping( + refs[name], // `ResolvedSchema::try_from` already handles lookup failure + ancestor_records, + refs, + map_handling, + )? } } Schema::Map(value_schema) => { @@ -268,8 +323,13 @@ fn avro_type_mapping( } } Some(MapHandling::Map) | None => { - let value = avro_type_mapping(value_schema.as_ref(), map_handling) - .context("failed to convert Avro map type")?; + let value = avro_type_mapping( + value_schema.as_ref(), + ancestor_records, + refs, + map_handling, + ) + .context("failed to convert Avro map type")?; DataType::Map(MapType::from_kv(DataType::Varchar, value)) } } diff --git a/src/connector/codec/tests/integration_tests/avro.rs b/src/connector/codec/tests/integration_tests/avro.rs index d916dc1aba426..2221917cc2b05 100644 --- a/src/connector/codec/tests/integration_tests/avro.rs +++ b/src/connector/codec/tests/integration_tests/avro.rs @@ -56,7 +56,7 @@ fn avro_schema_str_to_risingwave_schema( ResolvedAvroSchema::create(avro_schema.into()).context("failed to resolve Avro schema")?; let rw_schema = - avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling) + avro_schema_to_column_descs(&resolved_schema.original_schema, config.map_handling) .context("failed to convert Avro schema to RisingWave schema")? .iter() .map(ColumnDesc::from) diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index ac93ab3e69807..e817267778a50 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -229,7 +229,7 @@ impl AvroParserConfig { } pub fn map_to_columns(&self) -> ConnectorResult> { - avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling) + avro_schema_to_column_descs(&self.schema.original_schema, self.map_handling) .map_err(Into::into) } }