Skip to content

Commit

Permalink
ban logical type
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jul 9, 2024
1 parent 9d08add commit a305c1c
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 30 deletions.
84 changes: 82 additions & 2 deletions src/connector/codec/src/decoder/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl<'a> AvroParseOptions<'a> {
// ---- Union (with >=2 non null variants), and nullable Union ([null, record]) -----
(DataType::Struct(struct_type_info), Value::Union(variant, v)) => {
let Some(Schema::Union(u)) = self.schema else {
// XXX: Is this branch actually unreachable? (if self.schema is correctly used)
return Err(create_error());
};

Expand All @@ -131,7 +132,8 @@ impl<'a> AvroParseOptions<'a> {
// Here we compare the field name, instead of using the variant idx to find the field idx.
// The latter approach might also work, but might be more error-prone.
// We will need to get the index of the "null" variant, and then re-map the variant index to the field index.
let expected_field_name = avro_schema_to_struct_field_name(variant_schema);
// XXX: probably we can unwrap here (if self.schema is correctly used)
let expected_field_name = avro_schema_to_struct_field_name(variant_schema)?;

let mut fields = Vec::with_capacity(struct_type_info.len());
for (field_name, field_type) in struct_type_info
Expand Down Expand Up @@ -635,6 +637,54 @@ mod tests {
)
"#]]
.assert_debug_eq(&s);
// multiple named types
let s = Schema::parse_str(
r#"[
"null",
{"type":"fixed","name":"a","size":16},
{"type":"fixed","name":"b","size":32}
]
"#,
);
expect![[r#"
Ok(
Union(
UnionSchema {
schemas: [
Null,
Fixed(
FixedSchema {
name: Name {
name: "a",
namespace: None,
},
aliases: None,
doc: None,
size: 16,
attributes: {},
},
),
Fixed(
FixedSchema {
name: Name {
name: "b",
namespace: None,
},
aliases: None,
doc: None,
size: 32,
attributes: {},
},
),
],
variant_index: {
Null: 0,
},
},
),
)
"#]]
.assert_debug_eq(&s);

// union in union
let s = Schema::parse_str(r#"["int", ["null", "int"]]"#);
Expand Down Expand Up @@ -662,7 +712,7 @@ mod tests {
)
"#]]
.assert_debug_eq(&s);

// Note: Java Avro lib rejects this (logical type unions with its physical type)
let s = Schema::parse_str(r#"["string", {"type":"string","logicalType":"uuid"}]"#).unwrap();
expect![[r#"
Union(
Expand All @@ -679,6 +729,36 @@ mod tests {
)
"#]]
.assert_debug_eq(&s);
// Note: Java Avro lib rejects this (logical type unions with its physical type)
let s = Schema::parse_str(r#"["int", {"type":"int", "logicalType": "date"}]"#).unwrap();
expect![[r#"
Union(
UnionSchema {
schemas: [
Int,
Date,
],
variant_index: {
Int: 0,
Date: 1,
},
},
)
"#]]
.assert_debug_eq(&s);
// Note: Java Avro lib allows this (2 decimal with different "name")
let s = Schema::parse_str(
r#"[
{"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
{"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
]"#,
);
expect![[r#"
Err(
Unions cannot contain duplicate types,
)
"#]]
.assert_debug_eq(&s);
}

#[test]
Expand Down
72 changes: 44 additions & 28 deletions src/connector/codec/src/decoder/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use anyhow::Context;
use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema};
use apache_avro::AvroResult;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::error::NotImplemented;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Decimal};
use risingwave_common::{bail, bail_not_implemented};
use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion};

use super::get_nullable_union_inner;
Expand Down Expand Up @@ -230,9 +231,9 @@ fn avro_type_mapping(
// null will mean the whole struct is null
.filter(|variant| !matches!(variant, &&Schema::Null))
.map(|variant| {
avro_type_mapping(variant, map_handling).map(|t| {
let name = avro_schema_to_struct_field_name(variant);
(t, name)
avro_type_mapping(variant, map_handling).and_then(|t| {
let name = avro_schema_to_struct_field_name(variant)?;
Ok((t, name))
})
})
.process_results(|it| it.unzip::<_, _, Vec<_>, Vec<_>>())
Expand All @@ -248,7 +249,7 @@ fn avro_type_mapping(
{
DataType::Decimal
} else {
bail!("unsupported Avro type: {:?}", schema);
bail_not_implemented!("Avro type: {:?}", schema);
}
}
Schema::Map(value_schema) => {
Expand All @@ -258,10 +259,11 @@ fn avro_type_mapping(
if supported_avro_to_json_type(value_schema) {
DataType::Jsonb
} else {
bail!(
"unsupported Avro type, cannot convert map to jsonb: {:?}",
bail_not_implemented!(
issue = 16963,
"Avro map type to jsonb: {:?}",
schema
)
);
}
}
None => {
Expand All @@ -275,7 +277,7 @@ fn avro_type_mapping(
}
Schema::Uuid => DataType::Varchar,
Schema::Null | Schema::Fixed(_) => {
bail!("unsupported Avro type: {:?}", schema)
bail_not_implemented!("Avro type: {:?}", schema);
}
};

Expand Down Expand Up @@ -315,8 +317,8 @@ fn supported_avro_to_json_type(schema: &Schema) -> bool {
}

/// The field name when converting Avro union type to RisingWave struct type.
pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> String {
match schema {
pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> Result<String, NotImplemented> {
Ok(match schema {
Schema::Null => unreachable!(),
Schema::Union(_) => unreachable!(),
// Primitive types
Expand All @@ -332,23 +334,24 @@ pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> String {
Schema::Map(_) => "map".to_string(),
// Named Complex types
// TODO: Verify is the namespace correct here
Schema::Enum(_) | Schema::Ref { name: _ } | Schema::Fixed(_) => todo!(),
Schema::Record(_) => schema.name().unwrap().fullname(None),
// Logical types
// XXX: should we use the real type or the logical type as the field name?
// It seems not to matter much, as we always have the index of the field when we get a Union Value.
//
// Currently choose the logical type because it might be more user-friendly.
//
// Example about the representation:
// schema: ["null", {"type":"string","logicalType":"uuid"}]
// data: {"string": "67e55044-10b1-426f-9247-bb680e5fe0c8"}
//
// Note: for union with logical type AND the real type, e.g., ["string", {"type":"string","logicalType":"uuid"}]
// In this case, the uuid cannot be constructed.
// Actually this should be an invalid schema according to the spec. https://issues.apache.org/jira/browse/AVRO-2380
// But some library like Python and Rust both allow it. See `risingwave_connector_codec::decoder::avro::tests::test_avro_lib_union`
Schema::Enum(_) | Schema::Ref { name: _ } | Schema::Fixed(_) | Schema::Record(_) => {
schema.name().unwrap().fullname(None)
}

// Logical types are currently banned. See https://github.com/risingwavelabs/risingwave/issues/17616

/*
Schema::Uuid => "uuid".to_string(),
// Decimal is the most tricky. https://avro.apache.org/docs/1.11.1/specification/_print/#decimal
// - A decimal logical type annotates Avro bytes _or_ fixed types.
// - It has attributes `precision` and `scale`.
// "For the purposes of schema resolution, two schemas that are decimal logical types match if their scales and precisions match."
// - When the physical type is fixed, it's a named type. And a schema containing 2 decimals is possible:
// [
// {"type":"fixed","name":"Decimal128","size":16,"logicalType":"decimal","precision":38,"scale":2},
// {"type":"fixed","name":"Decimal256","size":32,"logicalType":"decimal","precision":50,"scale":2}
// ]
// In this case (a logical type's physical type is a named type), perhaps we should use the physical type's `name`.
Schema::Decimal(_) => "decimal".to_string(),
Schema::Date => "date".to_string(),
// Note: in Avro, the name style is "time-millis", etc.
Expand All @@ -362,5 +365,18 @@ pub(super) fn avro_schema_to_struct_field_name(schema: &Schema) -> String {
Schema::LocalTimestampMillis => "local_timestamp_millis".to_string(),
Schema::LocalTimestampMicros => "local_timestamp_micros".to_string(),
Schema::Duration => "duration".to_string(),
}
*/
Schema::Uuid
| Schema::Decimal(_)
| Schema::Date
| Schema::TimeMillis
| Schema::TimeMicros
| Schema::TimestampMillis
| Schema::TimestampMicros
| Schema::LocalTimestampMillis
| Schema::LocalTimestampMicros
| Schema::Duration => {
bail_not_implemented!(issue=17616, "Avro logicalType used in Union type: {:?}", schema)
}
})
}
4 changes: 4 additions & 0 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod avro;
pub mod json;
pub mod utils;

use risingwave_common::error::NotImplemented;
use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum};
use thiserror::Error;
use thiserror_ext::Macro;
Expand All @@ -40,6 +41,9 @@ pub enum AccessError {
/// Errors that are not categorized into variants above.
#[error("{message}")]
Uncategorized { message: String },

#[error(transparent)]
NotImplemented(#[from] NotImplemented),
}

pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;
Expand Down

0 comments on commit a305c1c

Please sign in to comment.