diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 12ab86cdb8d3c..7cc9cf14c1f84 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -139,6 +139,18 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result { avro_type_mapping(nested_schema)? } + Schema::Ref { name } => { + if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME + && name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into()) + { + DataType::Decimal + } else { + return Err(anyhow::format_err!( + "unsupported type in Avro: {:?}", + schema + )); + } + } _ => { return Err(anyhow::format_err!( "unsupported type in Avro: {:?}", diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index b4d1f0d145a75..e0a486fe18c98 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -151,6 +151,7 @@ mod tests { use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::data::data_type::TypeName; use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; use super::*; @@ -256,6 +257,64 @@ mod tests { assert_eq!(names, vec!["id".to_owned()]) } + #[test] + fn test_ref_avro_type() { + let test_schema_str = r#"{ + "type": "record", + "name": "Key", + "namespace": "dbserver1.inventory.customers", + "fields": [{ + "name": "id", + "type": "int" + }, + { + "name": "unconstrained_decimal", + "type": [ + "null", + { + "type": "record", + "name": "VariableScaleDecimal", + "namespace": "io.debezium.data", + "fields": [ + { + "name": "scale", + "type": "int" + }, + { + "name": "value", + "type": "bytes" + } + ], + "connect.doc": "Variable scaled decimal", + "connect.name": "io.debezium.data.VariableScaleDecimal", + "connect.version": 1 + } + ], + "default": null + }, + { + "name": "unconstrained_numeric", + "type": [ + "null", + "io.debezium.data.VariableScaleDecimal" + ], + "default": null + } + ], + "connect.name": "dbserver1.inventory.customers.Key" +} +"#; + let schema = Schema::parse_str(test_schema_str).unwrap(); + let columns = avro_schema_to_column_descs(&schema).unwrap(); + for col in &columns { + let dtype = col.column_type.as_ref().unwrap(); + println!("name = {}, type = {:?}", col.name, dtype.type_name); + if col.name.contains("unconstrained") { + assert_eq!(dtype.type_name, TypeName::Decimal as i32); + } + } + } + #[test] fn test_map_to_columns() { let outer_schema = get_outer_schema();