Skip to content

Commit

Permalink
fix(connector): warn on undefined nested field in struct
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 13, 2023
1 parent b10238c commit e723f6a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/common/src/types/struct_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Display for StructType {
(self.0.field_types.iter())
.zip_eq_fast(self.0.field_names.iter())
.map(|(d, s)| format!("{} {}", s, d))
.join(",")
.join(", ")
)
}
}
Expand Down
49 changes: 49 additions & 0 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,55 @@ mod tests {
assert_eq!(row, expected.into());
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_json_parse_struct_missing_field_warning() {
let descs = vec![ColumnDesc::new_struct(
"struct",
0,
"",
vec![
ColumnDesc::new_atomic(DataType::Varchar, "varchar", 1),
ColumnDesc::new_atomic(DataType::Boolean, "boolean", 2),
],
)]
.iter()
.map(SourceColumnDesc::from)
.collect_vec();

let parser = JsonParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
descs.clone(),
Default::default(),
)
.unwrap();
let payload = br#"
{
"struct": {
"varchar": "varchar"
}
}
"#
.to_vec();
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
{
let writer = builder.row_writer();
parser.parse_inner(payload, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row().into_inner();

let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
Some(ScalarImpl::Utf8("varchar".into())),
None,
])))];
assert_eq!(row, expected.into());

assert!(logs_contain("undefined nested field, padding with `NULL`"));
}

#[tokio::test]
async fn test_json_upsert_parser() {
let items = [
Expand Down
16 changes: 11 additions & 5 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,17 @@ impl JsonParseOptions {
.names()
.zip_eq_fast(struct_type_info.types())
.map(|(field_name, field_type)| {
self.parse(
json_object_get_case_insensitive(value, field_name)
.unwrap_or(&BorrowedValue::Static(simd_json::StaticNode::Null)),
Some(field_type),
)
let field_value = json_object_get_case_insensitive(value, field_name)
.unwrap_or_else(|| {
let error = AccessError::Undefined {
name: field_name.to_owned(),
path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
};
// TODO: is it possible to unify the logging with the one in `do_action`?
tracing::warn!(%error, "undefined nested field, padding with `NULL`");
&BorrowedValue::Static(simd_json::StaticNode::Null)
});
self.parse(field_value, Some(field_type))
})
.collect::<Result<_, _>>()?,
)
Expand Down

0 comments on commit e723f6a

Please sign in to comment.