Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Oct 18, 2023
1 parent 9d1fcde commit ffb0765
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 30 deletions.
5 changes: 0 additions & 5 deletions src/common/src/types/struct_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ impl StructType {
self.0.field_types.is_empty()
}

/// Returns `true` if the struct field is unnamed.
pub fn is_unnamed(&self) -> bool {
self.0.field_names.is_empty()
}

/// Gets an iterator over the names of the fields.
///
/// If the struct field is unnamed, the iterator returns **no names**.
Expand Down
158 changes: 139 additions & 19 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,25 +337,14 @@ fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
mapping.insert("optional".to_string(), json!("true"));
match rw_type {
DataType::Struct(struct_type) => {
let mut sub_fields = Vec::new();
let map_sub_struct = |(sub_name, sub_type): (String, &DataType)| {
let mut sub_mapping = type_as_json_schema(sub_type);
sub_mapping.insert("field".to_string(), json!(sub_name));
sub_fields.push(sub_mapping);
};
if struct_type.is_unnamed() {
// unnamed struct, rename with `f{idx+1}`
struct_type
.types()
.enumerate()
.map(|(idx, sub_type)| (format!("f{}", idx + 1), sub_type))
.for_each(map_sub_struct);
} else {
struct_type
.iter()
.map(|(sub_name, sub_type)| (sub_name.to_string(), sub_type))
.for_each(map_sub_struct);
}
let sub_fields = struct_type
.iter()
.map(|(sub_name, sub_type)| {
let mut sub_mapping = type_as_json_schema(sub_type);
sub_mapping.insert("field".to_string(), json!(sub_name));
sub_mapping
})
.collect_vec();
mapping.insert("fields".to_string(), json!(sub_fields));
}
DataType::List(sub_type) => {
Expand Down Expand Up @@ -553,4 +542,135 @@ mod tests {
.unwrap();
assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
}

#[test]
fn test_generate_json_converter_schema() {
let mock_field = Field {
data_type: DataType::Boolean,
name: Default::default(),
sub_fields: Default::default(),
type_name: Default::default(),
};
let fields = vec![
Field {
data_type: DataType::Boolean,
name: "v1".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Int16,
name: "v2".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Int32,
name: "v3".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Float32,
name: "v4".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Decimal,
name: "v5".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Date,
name: "v6".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Varchar,
name: "v7".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Time,
name: "v8".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Interval,
name: "v9".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Struct(StructType::new(vec![
("a", DataType::Timestamp),
("b", DataType::Timestamptz),
(
"c",
DataType::Struct(StructType::new(vec![
("aa", DataType::Int64),
("bb", DataType::Float64),
])),
),
])),
name: "v10".into(),
sub_fields: vec![
Field {
data_type: DataType::Timestamp,
name: "a".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Timestamptz,
name: "b".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Struct(StructType::new(vec![
("aa", DataType::Int64),
("bb", DataType::Float64),
])),
name: "c".into(),
sub_fields: vec![
Field {
data_type: DataType::Int64,
name: "aa".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Float64,
name: "bb".into(),
..mock_field.clone()
},
],
..mock_field.clone()
},
],
..mock_field.clone()
},
Field {
data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]),
))))),
name: "v11".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Jsonb,
name: "12".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Serial,
name: "13".into(),
..mock_field.clone()
},
Field {
data_type: DataType::Int256,
name: "14".into(),
..mock_field.clone()
},
];
let schema = json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())
["schema"]
.to_string();
let ans = r#"{"fields":[{"field":"v1","optional":"true","type":"boolean"},{"field":"v2","optional":"true","type":"int16"},{"field":"v3","optional":"true","type":"int32"},{"field":"v4","optional":"true","type":"float"},{"field":"v5","optional":"true","type":"string"},{"field":"v6","optional":"true","type":"int32"},{"field":"v7","optional":"true","type":"string"},{"field":"v8","optional":"true","type":"int64"},{"field":"v9","optional":"true","type":"string"},{"field":"v10","fields":[{"field":"a","optional":"true","type":"int64"},{"field":"b","optional":"true","type":"string"},{"field":"c","fields":[{"field":"aa","optional":"true","type":"int64"},{"field":"bb","optional":"true","type":"double"}],"optional":"true","type":"struct"}],"optional":"true","type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":"true","type":"int64"},{"field":"bb","optional":"true","type":"double"}],"optional":"true","type":"struct"},"optional":"true","type":"array"},"optional":"true","type":"array"},{"field":"12","optional":"true","type":"string"},{"field":"13","optional":"true","type":"int32"},{"field":"14","optional":"true","type":"string"}],"name":"test","optional":"false","type":"struct"}"#;
assert_eq!(schema, ans);
}
}
12 changes: 6 additions & 6 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,20 @@ impl SinkFormatterImpl {
let mut val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().as_str() {
"true" => {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder = key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
"false" => {}
other => {
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
other
)))
s
)));
}
}
};
Expand Down

0 comments on commit ffb0765

Please sign in to comment.