From ffb0765629c5e404837e0df2d17d7a384fbac1cb Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Wed, 18 Oct 2023 19:22:02 +0800 Subject: [PATCH] refactor code --- src/common/src/types/struct_type.rs | 5 - src/connector/src/sink/encoder/json.rs | 158 +++++++++++++++++++++--- src/connector/src/sink/formatter/mod.rs | 12 +- 3 files changed, 145 insertions(+), 30 deletions(-) diff --git a/src/common/src/types/struct_type.rs b/src/common/src/types/struct_type.rs index c986c9a16299b..239f506db8267 100644 --- a/src/common/src/types/struct_type.rs +++ b/src/common/src/types/struct_type.rs @@ -94,11 +94,6 @@ impl StructType { self.0.field_types.is_empty() } - /// Returns `true` if the struct field is unnamed. - pub fn is_unnamed(&self) -> bool { - self.0.field_names.is_empty() - } - /// Gets an iterator over the names of the fields. /// /// If the struct field is unnamed, the iterator returns **no names**. diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index df8f37251ba8d..636cb6f4199d2 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -337,25 +337,14 @@ fn type_as_json_schema(rw_type: &DataType) -> Map { mapping.insert("optional".to_string(), json!("true")); match rw_type { DataType::Struct(struct_type) => { - let mut sub_fields = Vec::new(); - let map_sub_struct = |(sub_name, sub_type): (String, &DataType)| { - let mut sub_mapping = type_as_json_schema(sub_type); - sub_mapping.insert("field".to_string(), json!(sub_name)); - sub_fields.push(sub_mapping); - }; - if struct_type.is_unnamed() { - // unnamed struct, rename with `f{idx+1}` - struct_type - .types() - .enumerate() - .map(|(idx, sub_type)| (format!("f{}", idx + 1), sub_type)) - .for_each(map_sub_struct); - } else { - struct_type - .iter() - .map(|(sub_name, sub_type)| (sub_name.to_string(), sub_type)) - .for_each(map_sub_struct); - } + let sub_fields = struct_type + .iter() + .map(|(sub_name, sub_type)| { + let mut sub_mapping = type_as_json_schema(sub_type); + sub_mapping.insert("field".to_string(), json!(sub_name)); + sub_mapping + }) + .collect_vec(); mapping.insert("fields".to_string(), json!(sub_fields)); } DataType::List(sub_type) => { @@ -553,4 +542,135 @@ mod tests { .unwrap(); assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}")); } + + #[test] + fn test_generate_json_converter_schema() { + let mock_field = Field { + data_type: DataType::Boolean, + name: Default::default(), + sub_fields: Default::default(), + type_name: Default::default(), + }; + let fields = vec![ + Field { + data_type: DataType::Boolean, + name: "v1".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Int16, + name: "v2".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Int32, + name: "v3".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Float32, + name: "v4".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Decimal, + name: "v5".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Date, + name: "v6".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Varchar, + name: "v7".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Time, + name: "v8".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Interval, + name: "v9".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Struct(StructType::new(vec![ + ("a", DataType::Timestamp), + ("b", DataType::Timestamptz), + ( + "c", + DataType::Struct(StructType::new(vec![ + ("aa", DataType::Int64), + ("bb", DataType::Float64), + ])), + ), + ])), + name: "v10".into(), + sub_fields: vec![ + Field { + data_type: DataType::Timestamp, + name: "a".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Timestamptz, + name: "b".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Struct(StructType::new(vec![ + ("aa", DataType::Int64), + ("bb", DataType::Float64), + ])), + name: "c".into(), + sub_fields: vec![ + Field { + data_type: DataType::Int64, + name: "aa".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Float64, + name: "bb".into(), + ..mock_field.clone() + }, + ], + ..mock_field.clone() + }, + ], + ..mock_field.clone() + }, + Field { + data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct( + StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]), + ))))), + name: "v11".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Jsonb, + name: "12".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Serial, + name: "13".into(), + ..mock_field.clone() + }, + Field { + data_type: DataType::Int256, + name: "14".into(), + ..mock_field.clone() + }, + ]; + let schema = json_converter_with_schema(json!({}), "test".to_owned(), fields.iter()) + ["schema"] + .to_string(); + let ans = r#"{"fields":[{"field":"v1","optional":"true","type":"boolean"},{"field":"v2","optional":"true","type":"int16"},{"field":"v3","optional":"true","type":"int32"},{"field":"v4","optional":"true","type":"float"},{"field":"v5","optional":"true","type":"string"},{"field":"v6","optional":"true","type":"int32"},{"field":"v7","optional":"true","type":"string"},{"field":"v8","optional":"true","type":"int64"},{"field":"v9","optional":"true","type":"string"},{"field":"v10","fields":[{"field":"a","optional":"true","type":"int64"},{"field":"b","optional":"true","type":"string"},{"field":"c","fields":[{"field":"aa","optional":"true","type":"int64"},{"field":"bb","optional":"true","type":"double"}],"optional":"true","type":"struct"}],"optional":"true","type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":"true","type":"int64"},{"field":"bb","optional":"true","type":"double"}],"optional":"true","type":"struct"},"optional":"true","type":"array"},"optional":"true","type":"array"},{"field":"12","optional":"true","type":"string"},{"field":"13","optional":"true","type":"int32"},{"field":"14","optional":"true","type":"string"}],"name":"test","optional":"false","type":"struct"}"#; + assert_eq!(schema, ans); + } } diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 1865461a65e55..682907db447f8 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -115,20 +115,20 @@ impl SinkFormatterImpl { let mut val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); if let Some(s) = format_desc.options.get("schemas.enable") { - match s.to_lowercase().as_str() { - "true" => { + match s.to_lowercase().parse::() { + Ok(true) => { let kafka_connect = KafkaConnectParams { schema_name: format!("{}.{}", db_name, sink_from_name), }; key_encoder = key_encoder.with_kafka_connect(kafka_connect.clone()); val_encoder = val_encoder.with_kafka_connect(kafka_connect); } - "false" => {} - other => { + Ok(false) => {} + _ => { return Err(SinkError::Config(anyhow!( "schemas.enable is expected to be `true` or `false`, got {}", - other - ))) + s + ))); } } };