Skip to content

Commit

Permalink
refine current union code
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jul 2, 2024
1 parent 7b24228 commit 7f6220a
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 30 deletions.
82 changes: 69 additions & 13 deletions src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
mod schema;
use std::sync::LazyLock;

use apache_avro::schema::{DecimalSchema, RecordSchema};
use apache_avro::schema::{DecimalSchema, RecordSchema, UnionSchema};
use apache_avro::types::{Value, ValueKind};
use apache_avro::{Decimal as AvroDecimal, Schema};
use chrono::Datelike;
Expand Down Expand Up @@ -338,18 +338,35 @@ pub(crate) fn avro_decimal_to_rust_decimal(
))
}

pub fn avro_schema_skip_union(schema: &Schema) -> anyhow::Result<&Schema> {
/// If the union schema is `[null, T]` or `[T, null]`, returns `Some(T)`; otherwise returns `None`.
fn get_nullable_union_inner(union_schema: &UnionSchema) -> Option<&'_ Schema> {
let variants = union_schema.variants();
if variants.len() == 2
|| variants
.iter()
.filter(|s| matches!(s, &&Schema::Null))
.count()
== 1
{
let inner_schema = variants
.iter()
.find(|s| !matches!(s, &&Schema::Null))
.unwrap();
Some(inner_schema)
} else {
None
}
}

pub fn avro_schema_skip_nullable_union(schema: &Schema) -> anyhow::Result<&Schema> {
match schema {
Schema::Union(union_schema) => {
let inner_schema = union_schema
.variants()
.iter()
.find(|s| !matches!(s, &&Schema::Null))
.ok_or_else(|| {
anyhow::format_err!("illegal avro record schema {:?}", union_schema)
})?;
Ok(inner_schema)
}
Schema::Union(union_schema) => match get_nullable_union_inner(union_schema) {
Some(s) => Ok(s),
None => Err(anyhow::format_err!(
"illegal avro union schema, expected [null, T], got {:?}",
union_schema
)),
},
other => Ok(other),
}
}
Expand All @@ -372,7 +389,7 @@ pub fn avro_extract_field_schema<'a>(
Ok(&field.schema)
}
Schema::Array(schema) => Ok(schema),
Schema::Union(_) => avro_schema_skip_union(schema),
Schema::Union(_) => avro_schema_skip_nullable_union(schema),
Schema::Map(schema) => Ok(schema),
_ => bail!("avro schema does not have inner item, schema: {:?}", schema),
}
Expand Down Expand Up @@ -477,10 +494,49 @@ mod tests {
use std::str::FromStr;

use apache_avro::Decimal as AvroDecimal;
use expect_test::expect;
use risingwave_common::types::{Datum, Decimal};

use super::*;

/// Test the behavior of the Rust Avro lib for handling union with logical type.
#[test]
fn test_union_logical_type() {
let s = Schema::parse_str(r#"["null", {"type":"string","logicalType":"uuid"}]"#).unwrap();
expect![[r#"
Union(
UnionSchema {
schemas: [
Null,
Uuid,
],
variant_index: {
Null: 0,
Uuid: 1,
},
},
)
"#]]
.assert_debug_eq(&s);

let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
expect![[r#"
Union(
UnionSchema {
schemas: [
String,
Uuid,
],
variant_index: {
String: 0,
Uuid: 1,
},
},
)
"#]]
.assert_debug_eq(&s);
}

#[test]
fn test_convert_decimal() {
// 280
Expand Down
95 changes: 83 additions & 12 deletions src/connector/codec/src/decoder/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::{Arc, LazyLock};

use anyhow::Context;
use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema};
use apache_avro::AvroResult;
use itertools::Itertools;
Expand All @@ -22,6 +23,8 @@ use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Decimal};
use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion};

use super::get_nullable_union_inner;

/// Avro schema with `Ref` inlined. The newtype is used to indicate whether the schema is resolved.
///
/// TODO: Actually most of the place should use resolved schema, but currently they just happen to work (Some edge cases are not met yet).
Expand Down Expand Up @@ -198,20 +201,84 @@ fn avro_type_mapping(
DataType::List(Box::new(item_type))
}
Schema::Union(union_schema) => {
// Unions may not contain more than one schema with the same type, except for the named types record, fixed and enum.
// https://avro.apache.org/docs/1.11.1/specification/_print/#unions
debug_assert!(
union_schema
.variants()
.iter()
.map(Schema::canonical_form) // Schema doesn't implement Eq, but only PartialEq.
.duplicates()
.next()
.is_none(),
"Union contains duplicate types: {union_schema:?}",
);
// We only support using union to represent nullable fields, not general unions.
let variants = union_schema.variants();
if variants.len() != 2 || !variants.contains(&Schema::Null) {
bail!(
"unsupported Avro type, only unions like [null, T] is supported: {:?}",
schema
);
}
let nested_schema = variants
.iter()
.find_or_first(|s| !matches!(s, Schema::Null))
.unwrap();
match get_nullable_union_inner(union_schema) {
Some(inner) => avro_type_mapping(inner, map_handling)?,
None => {
// Convert the union to a struct, each field of the struct represents a variant of the union.
// Refer to https://github.com/risingwavelabs/risingwave/issues/16273#issuecomment-2179761345 to see why it's not perfect.
// Note: Avro union's variant tag is type name, not field name (unlike Rust enum, or Protobuf oneof).

// XXX: do we need to introduce union.handling.mode?

avro_type_mapping(nested_schema, map_handling)?
let (fields, field_names) = union_schema
.variants()
.iter()
.filter(|variant| !matches!(variant, &&Schema::Null))
.map(|variant| {
avro_type_mapping(variant, map_handling).map(|t| {
let name = match variant {
Schema::Null => unreachable!(),
Schema::Boolean => "boolean".to_string(),
Schema::Int => "integer".to_string(),
Schema::Long => "bigint".to_string(),
Schema::Float => "real".to_string(),
Schema::Double => "double precision".to_string(),
Schema::Bytes => "bytea".to_string(),
Schema::String => "text".to_string(),
Schema::Array(_) => "array".to_string(),
Schema::Map(_) =>"map".to_string(),
Schema::Union(_) => "union".to_string(),
// For logical types, should we use the real type or the logical type as the field name?
//
// Example about the representation:
// schema: ["null", {"type":"string","logicalType":"uuid"}]
// data: {"string": "67e55044-10b1-426f-9247-bb680e5fe0c8"}
//
// Note: for union with logical type AND the real type, e.g., ["string", {"type":"string","logicalType":"uuid"}]
// In this case, the uuid cannot be constructed. Some library
// https://issues.apache.org/jira/browse/AVRO-2380
Schema::Uuid => "uuid".to_string(),
Schema::Decimal(_) => todo!(),
Schema::Date => "date".to_string(),
Schema::TimeMillis => "time without time zone".to_string(),
Schema::TimeMicros => "time without time zone".to_string(),
Schema::TimestampMillis => "timestamp without time zone".to_string(),
Schema::TimestampMicros => "timestamp without time zone".to_string(),
Schema::LocalTimestampMillis => "timestamp without time zone".to_string(),
Schema::LocalTimestampMicros => "timestamp without time zone".to_string(),
Schema::Duration => "interval".to_string(),
Schema::Enum(_)
| Schema::Ref { name: _ }
| Schema::Fixed(_) => todo!(),
| Schema::Record(_) => variant.name().unwrap().fullname(None), // XXX: Is the namespace correct here?
};
(t, name)
})
})
.process_results(|it| it.unzip::<_, _, Vec<_>, Vec<_>>())
.context("failed to convert Avro union to struct")?;

DataType::new_struct(fields, field_names);

bail!(
"unsupported Avro type, only unions like [null, T] is supported: {:?}",
schema
);
}
}
}
Schema::Ref { name } => {
if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME
Expand All @@ -236,6 +303,10 @@ fn avro_type_mapping(
}
}
None => {
// We require it to be specified, because we don't want to have a bad default behavior.
// But perhaps changing the default behavior won't be a breaking change,
// because it affects only on creation time, what the result ColumnDesc will be, and the ColumnDesc will be persisted.
// This is unlike timestamp.handing.mode, which affects parser's behavior on the runtime.
bail!("`map.handling.mode` not specified in ENCODE AVRO (...). Currently supported modes: `jsonb`")
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use apache_avro::types::Value;
use apache_avro::{from_avro_datum, Schema};
use risingwave_common::try_match_expand;
use risingwave_connector_codec::decoder::avro::{
avro_extract_field_schema, avro_schema_skip_union, avro_schema_to_column_descs, AvroAccess,
AvroParseOptions, ResolvedAvroSchema,
avro_extract_field_schema, avro_schema_skip_nullable_union, avro_schema_to_column_descs,
AvroAccess, AvroParseOptions, ResolvedAvroSchema,
};
use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;
use risingwave_pb::plan_common::ColumnDesc;
Expand Down Expand Up @@ -126,7 +126,7 @@ impl DebeziumAvroParserConfig {

pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(
avro_schema_skip_union(avro_extract_field_schema(
avro_schema_skip_nullable_union(avro_extract_field_schema(
// FIXME: use resolved schema here.
// Currently it works because "after" refers to a subtree in "before",
// but in theory, inside "before" there could also be a reference.
Expand Down Expand Up @@ -227,7 +227,7 @@ mod tests {

let outer_schema = get_outer_schema();
let expected_inner_schema = Schema::parse_str(inner_shema_str).unwrap();
let extracted_inner_schema = avro_schema_skip_union(
let extracted_inner_schema = avro_schema_skip_nullable_union(
avro_extract_field_schema(&outer_schema, Some("before")).unwrap(),
)
.unwrap();
Expand Down Expand Up @@ -318,7 +318,7 @@ mod tests {
fn test_map_to_columns() {
let outer_schema = get_outer_schema();
let columns = avro_schema_to_column_descs(
avro_schema_skip_union(
avro_schema_skip_nullable_union(
avro_extract_field_schema(&outer_schema, Some("before")).unwrap(),
)
.unwrap(),
Expand Down

0 comments on commit 7f6220a

Please sign in to comment.