Skip to content

Commit

Permalink
refactor(source): resolve avro Ref
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Nov 27, 2024
1 parent 3faa0bf commit 3e1c4a6
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 21 deletions.
98 changes: 79 additions & 19 deletions src/connector/codec/src/decoder/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::{Arc, LazyLock};

use anyhow::Context;
use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema};
use apache_avro::schema::{DecimalSchema, NamesRef, RecordSchema, ResolvedSchema, Schema};
use apache_avro::AvroResult;
use itertools::Itertools;
use risingwave_common::error::NotImplemented;
Expand Down Expand Up @@ -76,20 +76,28 @@ impl MapHandling {
}
}

/// This function expects resolved schema (no `Ref`).
/// FIXME: require passing resolved schema here.
/// This function expects original schema (with `Ref`).
/// TODO: change `map_handling` to some `Config`, and also unify debezium.
/// TODO: use `ColumnDesc` in common instead of PB.
pub fn avro_schema_to_column_descs(
schema: &Schema,
map_handling: Option<MapHandling>,
) -> anyhow::Result<Vec<ColumnDesc>> {
let resolved = ResolvedSchema::try_from(schema)?;
if let Schema::Record(RecordSchema { fields, .. }) = schema {
let mut index = 0;
let mut ancestor_records: Vec<String> = vec![];
let fields = fields
.iter()
.map(|field| {
avro_field_to_column_desc(&field.name, &field.schema, &mut index, map_handling)
avro_field_to_column_desc(
&field.name,
&field.schema,
&mut index,
&mut ancestor_records,
resolved.get_names(),
map_handling,
)
})
.collect::<anyhow::Result<_>>()?;
Ok(fields)
Expand All @@ -105,18 +113,39 @@ fn avro_field_to_column_desc(
name: &str,
schema: &Schema,
index: &mut i32,
ancestor_records: &mut Vec<String>,
refs: &NamesRef<'_>,
map_handling: Option<MapHandling>,
) -> anyhow::Result<ColumnDesc> {
let data_type = avro_type_mapping(schema, map_handling)?;
let data_type = avro_type_mapping(schema, ancestor_records, refs, map_handling)?;
match schema {
Schema::Ref { name: ref_name } => {
avro_field_to_column_desc(
name,
refs[ref_name], // `ResolvedSchema::try_from` already handles lookup failure
index,
ancestor_records,
refs,
map_handling,
)
}
Schema::Record(RecordSchema {
name: schema_name,
fields,
..
}) => {
let vec_column = fields
.iter()
.map(|f| avro_field_to_column_desc(&f.name, &f.schema, index, map_handling))
.map(|f| {
avro_field_to_column_desc(
&f.name,
&f.schema,
index,
ancestor_records,
refs,
map_handling,
)
})
.collect::<anyhow::Result<_>>()?;
*index += 1;
Ok(ColumnDesc {
Expand Down Expand Up @@ -146,9 +175,11 @@ fn avro_field_to_column_desc(
}
}

/// This function expects resolved schema (no `Ref`).
/// This function expects original schema (with `Ref`).
fn avro_type_mapping(
schema: &Schema,
ancestor_records: &mut Vec<String>,
refs: &NamesRef<'_>,
map_handling: Option<MapHandling>,
) -> anyhow::Result<DataType> {
let data_type = match schema {
Expand Down Expand Up @@ -190,16 +221,34 @@ fn avro_type_mapping(
return Ok(DataType::Decimal);
}

StructType::new(
let unique_name = name.fullname(None);
if ancestor_records.contains(&unique_name) {
bail!(
"circular reference detected: {} -> {}",
ancestor_records.join(" -> "),
unique_name
);
}

ancestor_records.push(unique_name);
let ty = StructType::new(
fields
.iter()
.map(|f| Ok((&f.name, avro_type_mapping(&f.schema, map_handling)?)))
.map(|f| {
Ok((
&f.name,
avro_type_mapping(&f.schema, ancestor_records, refs, map_handling)?,
))
})
.collect::<anyhow::Result<Vec<_>>>()?,
)
.into()
.into();
ancestor_records.pop();
ty
}
Schema::Array(item_schema) => {
let item_type = avro_type_mapping(item_schema.as_ref(), map_handling)?;
let item_type =
avro_type_mapping(item_schema.as_ref(), ancestor_records, refs, map_handling)?;
DataType::List(Box::new(item_type))
}
Schema::Union(union_schema) => {
Expand All @@ -219,7 +268,7 @@ fn avro_type_mapping(
"Union contains duplicate types: {union_schema:?}",
);
match get_nullable_union_inner(union_schema) {
Some(inner) => avro_type_mapping(inner, map_handling)?,
Some(inner) => avro_type_mapping(inner, ancestor_records, refs, map_handling)?,
None => {
// Convert the union to a struct, each field of the struct represents a variant of the union.
// Refer to https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2179761345 to see why it's not perfect.
Expand All @@ -232,10 +281,11 @@ fn avro_type_mapping(
// null will mean the whole struct is null
.filter(|variant| !matches!(variant, &&Schema::Null))
.map(|variant| {
avro_type_mapping(variant, map_handling).and_then(|t| {
let name = avro_schema_to_struct_field_name(variant)?;
Ok((name, t))
})
avro_type_mapping(variant, ancestor_records, refs, map_handling)
.and_then(|t| {
let name = avro_schema_to_struct_field_name(variant)?;
Ok((name, t))
})
})
.try_collect::<_, Vec<_>, _>()
.context("failed to convert Avro union to struct")?;
Expand All @@ -250,7 +300,12 @@ fn avro_type_mapping(
{
DataType::Decimal
} else {
bail_not_implemented!("Avro type: {:?}", schema);
avro_type_mapping(
refs[name], // `ResolvedSchema::try_from` already handles lookup failure
ancestor_records,
refs,
map_handling,
)?
}
}
Schema::Map(value_schema) => {
Expand All @@ -268,8 +323,13 @@ fn avro_type_mapping(
}
}
Some(MapHandling::Map) | None => {
let value = avro_type_mapping(value_schema.as_ref(), map_handling)
.context("failed to convert Avro map type")?;
let value = avro_type_mapping(
value_schema.as_ref(),
ancestor_records,
refs,
map_handling,
)
.context("failed to convert Avro map type")?;
DataType::Map(MapType::from_kv(DataType::Varchar, value))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/codec/tests/integration_tests/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn avro_schema_str_to_risingwave_schema(
ResolvedAvroSchema::create(avro_schema.into()).context("failed to resolve Avro schema")?;

let rw_schema =
avro_schema_to_column_descs(&resolved_schema.resolved_schema, config.map_handling)
avro_schema_to_column_descs(&resolved_schema.original_schema, config.map_handling)
.context("failed to convert Avro schema to RisingWave schema")?
.iter()
.map(ColumnDesc::from)
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl AvroParserConfig {
}

pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling)
avro_schema_to_column_descs(&self.schema.original_schema, self.map_handling)
.map_err(Into::into)
}
}
Expand Down

0 comments on commit 3e1c4a6

Please sign in to comment.