From a6620340ada7ee70cb88fb277eb3522c67231218 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 24 Jan 2024 20:13:23 +0800 Subject: [PATCH 1/2] fix --- src/connector/src/parser/avro/util.rs | 12 ++++ .../src/parser/debezium/avro_parser.rs | 59 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index f81f73fd1b7e..5657c906a323 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -139,6 +139,18 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result { avro_type_mapping(nested_schema)? } + Schema::Ref { name } => { + if name.name == DBZ_VARIABLE_SCALE_DECIMAL_NAME + && name.namespace == Some(DBZ_VARIABLE_SCALE_DECIMAL_NAMESPACE.into()) + { + DataType::Decimal + } else { + return Err(anyhow::format_err!( + "unsupported type in Avro: {:?}", + schema + )); + } + } _ => { return Err(anyhow::format_err!( "unsupported type in Avro: {:?}", diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index b4d1f0d145a7..f82ec9419cce 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -151,6 +151,7 @@ mod tests { use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::data::data_type::TypeName; use risingwave_pb::plan_common::{PbEncodeType, PbFormatType}; use super::*; @@ -256,6 +257,64 @@ mod tests { assert_eq!(names, vec!["id".to_owned()]) } + #[test] + fn test_ref_avro_type() { + let test_schema_str = r#"{ + "type": "record", + "name": "Key", + "namespace": "dbserver1.inventory.customers", + "fields": [{ + "name": "id", + "type": "int" + }, + { + "name": "unconstrained_decimal", + "type": [ + "null", + { + "type": "record", + "name": "VariableScaleDecimal", + "namespace": "io.debezium.data", + "fields": [ + { + "name": "scale", + "type": "int" + }, + { + "name": "value", + "type": "bytes" + } + ], + "connect.doc": "Variable scaled decimal", + "connect.name": "io.debezium.data.VariableScaleDecimal", + "connect.version": 1 + } + ], + "default": null + }, + { + "name": "unconstrained_numeric", + "type": [ + "null", + "io.debezium.data.VariableScaleDecimal" + ], + "default": null + } + ], + "connect.name": "dbserver1.inventory.customers.Key" +} +"#; + let schema = Schema::parse_str(test_schema_str).unwrap(); + let columns = avro_schema_to_column_descs(&schema).unwrap(); + for col in columns.iter() { + let dtype = col.column_type.as_ref().unwrap(); + println!("name = {}, type = {:?}", col.name, dtype.type_name); + if col.name.contains("unconstrained") { + assert_eq!(dtype.type_name, TypeName::Decimal as i32); + } + } + } + #[test] fn test_map_to_columns() { let outer_schema = get_outer_schema(); From 69f238ba444341fa9d864e1cdc87f11c5206b7ed Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 24 Jan 2024 20:21:26 +0800 Subject: [PATCH 2/2] minor --- src/connector/src/parser/debezium/avro_parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index f82ec9419cce..e0a486fe18c9 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -306,7 +306,7 @@ mod tests { "#; let schema = Schema::parse_str(test_schema_str).unwrap(); let columns = avro_schema_to_column_descs(&schema).unwrap(); - for col in columns.iter() { + for col in &columns { let dtype = col.column_type.as_ref().unwrap(); println!("name = {}, type = {:?}", col.name, dtype.type_name); if col.name.contains("unconstrained") {