Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): resolve avro Ref during avro_schema_to_column_descs without hack #19601

Merged
merged 3 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions e2e_test/source_inline/kafka/avro/ref.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
control substitution on


system ok
rpk topic create avro-ref


system ok
sr_register avro-ref-value AVRO <<EOF
{
"type": "record",
"name": "Node",
"fields": [
{
"name": "value",
"type": "int"
},
{
"name": "next",
"type": ["null", "Node"]
}
]
}
EOF


statement error
create source s WITH (${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'avro-ref') FORMAT PLAIN ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: connector error
2: circular reference detected in Avro schema: Node -> Node


system ok
curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro-ref-value"


system ok
curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro-ref-value?permanent=true"


system ok
sr_register avro-ref-value AVRO <<EOF
{
"type": "record",
"name": "Root",
"fields": [
{
"name": "foo",
"type": {
"type": "record",
"name": "Seg",
"fields": [
{
"name": "a",
"type": {
"type": "record",
"name": "Point",
"fields": [
{
"name": "x",
"type": "int"
},
{
"name": "y",
"type": "int"
}
]
}
},
{
"name": "b",
"type": "Point"
}
]
}
},
{
"name": "bar",
"type": "Seg"
}
]
}
EOF


statement ok
create source s WITH (${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'avro-ref') FORMAT PLAIN ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');


system ok
rpk topic produce avro-ref --schema-id=topic <<EOF
{"foo":{"a":{"x":3,"y":4},"b":{"x":5,"y":6}},"bar":{"a":{"x":6,"y":5},"b":{"x":4,"y":3}}}
EOF


query IIIIIIII
select
(foo).a.x,
(foo).a.y,
(foo).b.x,
(foo).b.y,
(bar).a.x,
(bar).a.y,
(bar).b.x,
(bar).b.y
from s;
----
3 4 5 6 6 5 4 3


statement ok
drop source s;


system ok
curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro-ref-value"


system ok
curl -X DELETE "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro-ref-value?permanent=true"


system ok
rpk topic delete 'avro-ref'
98 changes: 79 additions & 19 deletions src/connector/codec/src/decoder/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::{Arc, LazyLock};

use anyhow::Context;
use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema};
use apache_avro::schema::{DecimalSchema, NamesRef, RecordSchema, ResolvedSchema, Schema};
use apache_avro::AvroResult;
use itertools::Itertools;
use risingwave_common::error::NotImplemented;
Expand Down Expand Up @@ -76,20 +76,28 @@ impl MapHandling {
}
}

/// This function expects resolved schema (no `Ref`).
/// FIXME: require passing resolved schema here.
/// This function expects original schema (with `Ref`).
/// TODO: change `map_handling` to some `Config`, and also unify debezium.
/// TODO: use `ColumnDesc` in common instead of PB.
pub fn avro_schema_to_column_descs(
schema: &Schema,
map_handling: Option<MapHandling>,
) -> anyhow::Result<Vec<ColumnDesc>> {
let resolved = ResolvedSchema::try_from(schema)?;
if let Schema::Record(RecordSchema { fields, .. }) = schema {
let mut index = 0;
let mut ancestor_records: Vec<String> = vec![];
let fields = fields
.iter()
.map(|field| {
avro_field_to_column_desc(&field.name, &field.schema, &mut index, map_handling)
avro_field_to_column_desc(
&field.name,
&field.schema,
&mut index,
&mut ancestor_records,
resolved.get_names(),
map_handling,
)
})
.collect::<anyhow::Result<_>>()?;
Ok(fields)
Expand All @@ -105,18 +113,39 @@ fn avro_field_to_column_desc(
name: &str,
schema: &Schema,
index: &mut i32,
ancestor_records: &mut Vec<String>,
refs: &NamesRef<'_>,
map_handling: Option<MapHandling>,
) -> anyhow::Result<ColumnDesc> {
let data_type = avro_type_mapping(schema, map_handling)?;
let data_type = avro_type_mapping(schema, ancestor_records, refs, map_handling)?;
match schema {
Schema::Ref { name: ref_name } => {
avro_field_to_column_desc(
name,
refs[ref_name], // `ResolvedSchema::try_from` already handles lookup failure
index,
ancestor_records,
refs,
map_handling,
)
}
Schema::Record(RecordSchema {
name: schema_name,
fields,
..
}) => {
let vec_column = fields
.iter()
.map(|f| avro_field_to_column_desc(&f.name, &f.schema, index, map_handling))
.map(|f| {
avro_field_to_column_desc(
&f.name,
&f.schema,
index,
ancestor_records,
refs,
map_handling,
)
})
.collect::<anyhow::Result<_>>()?;
*index += 1;
Ok(ColumnDesc {
Expand Down Expand Up @@ -146,9 +175,11 @@ fn avro_field_to_column_desc(
}
}

/// This function expects resolved schema (no `Ref`).
/// This function expects original schema (with `Ref`).
fn avro_type_mapping(
schema: &Schema,
ancestor_records: &mut Vec<String>,
refs: &NamesRef<'_>,
map_handling: Option<MapHandling>,
) -> anyhow::Result<DataType> {
let data_type = match schema {
Expand Down Expand Up @@ -190,16 +221,34 @@ fn avro_type_mapping(
return Ok(DataType::Decimal);
}

StructType::new(
let unique_name = name.fullname(None);
if ancestor_records.contains(&unique_name) {
bail!(
"circular reference detected in Avro schema: {} -> {}",
ancestor_records.join(" -> "),
unique_name
);
}

ancestor_records.push(unique_name);
let ty = StructType::new(
fields
.iter()
.map(|f| Ok((&f.name, avro_type_mapping(&f.schema, map_handling)?)))
.map(|f| {
Ok((
&f.name,
avro_type_mapping(&f.schema, ancestor_records, refs, map_handling)?,
))
})
.collect::<anyhow::Result<Vec<_>>>()?,
)
.into()
.into();
ancestor_records.pop();
ty
}
Schema::Array(item_schema) => {
let item_type = avro_type_mapping(item_schema.as_ref(), map_handling)?;
let item_type =
avro_type_mapping(item_schema.as_ref(), ancestor_records, refs, map_handling)?;
DataType::List(Box::new(item_type))
}
Schema::Union(union_schema) => {
Expand All @@ -219,7 +268,7 @@ fn avro_type_mapping(
"Union contains duplicate types: {union_schema:?}",
);
match get_nullable_union_inner(union_schema) {
Some(inner) => avro_type_mapping(inner, map_handling)?,
Some(inner) => avro_type_mapping(inner, ancestor_records, refs, map_handling)?,
None => {
// Convert the union to a struct, each field of the struct represents a variant of the union.
// Refer to https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2179761345 to see why it's not perfect.
Expand All @@ -232,10 +281,11 @@ fn avro_type_mapping(
// null will mean the whole struct is null
.filter(|variant| !matches!(variant, &&Schema::Null))
.map(|variant| {
avro_type_mapping(variant, map_handling).and_then(|t| {
let name = avro_schema_to_struct_field_name(variant)?;
Ok((name, t))
})
avro_type_mapping(variant, ancestor_records, refs, map_handling)
.and_then(|t| {
let name = avro_schema_to_struct_field_name(variant)?;
Ok((name, t))
})
})
.try_collect::<_, Vec<_>, _>()
.context("failed to convert Avro union to struct")?;
Expand All @@ -250,7 +300,12 @@ fn avro_type_mapping(
{
DataType::Decimal
} else {
bail_not_implemented!("Avro type: {:?}", schema);
avro_type_mapping(
refs[name], // `ResolvedSchema::try_from` already handles lookup failure
ancestor_records,
refs,
map_handling,
)?
}
}
Schema::Map(value_schema) => {
Expand All @@ -268,8 +323,13 @@ fn avro_type_mapping(
}
}
Some(MapHandling::Map) | None => {
let value = avro_type_mapping(value_schema.as_ref(), map_handling)
.context("failed to convert Avro map type")?;
let value = avro_type_mapping(
value_schema.as_ref(),
ancestor_records,
refs,
map_handling,
)
.context("failed to convert Avro map type")?;
DataType::Map(MapType::from_kv(DataType::Varchar, value))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/codec/tests/integration_tests/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn avro_schema_str_to_risingwave_schema(
ResolvedAvroSchema::create(avro_schema.into()).context("failed to resolve Avro schema")?;

let rw_schema =
avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling)
avro_schema_to_column_descs(&resolved_schema.original_schema, config.map_handling)
.context("failed to convert Avro schema to RisingWave schema")?
.iter()
.map(ColumnDesc::from)
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl AvroParserConfig {
}

pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling)
avro_schema_to_column_descs(&self.schema.original_schema, self.map_handling)
.map_err(Into::into)
}
}
Expand Down