Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(connector): warn on undefined nested field in struct when parsing JSON #13384

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ mod tests {
);
assert_eq!(
format!("{}", d),
"struct<i integer,j character varying>".to_string()
"struct<i integer, j character varying>".to_string()
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/struct_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Display for StructType {
(self.0.field_types.iter())
.zip_eq_fast(self.0.field_names.iter())
.map(|(d, s)| format!("{} {}", s, d))
.join(",")
.join(", ")
)
}
}
Expand Down
50 changes: 50 additions & 0 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,56 @@ mod tests {
assert_eq!(row, expected.into());
}

#[cfg(not(madsim))] // Traced test does not work with madsim
#[tokio::test]
#[tracing_test::traced_test]
async fn test_json_parse_struct_missing_field_warning() {
let descs = vec![ColumnDesc::new_struct(
"struct",
0,
"",
vec![
ColumnDesc::new_atomic(DataType::Varchar, "varchar", 1),
ColumnDesc::new_atomic(DataType::Boolean, "boolean", 2),
],
)]
.iter()
.map(SourceColumnDesc::from)
.collect_vec();

let parser = JsonParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
descs.clone(),
Default::default(),
)
.unwrap();
let payload = br#"
{
"struct": {
"varchar": "varchar"
}
}
"#
.to_vec();
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
{
let writer = builder.row_writer();
parser.parse_inner(payload, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row().into_inner();

let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
Some(ScalarImpl::Utf8("varchar".into())),
None,
])))];
assert_eq!(row, expected.into());

assert!(logs_contain("undefined nested field, padding with `NULL`"));
}

#[tokio::test]
async fn test_json_upsert_parser() {
let items = [
Expand Down
16 changes: 11 additions & 5 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,17 @@ impl JsonParseOptions {
.names()
.zip_eq_fast(struct_type_info.types())
.map(|(field_name, field_type)| {
self.parse(
json_object_get_case_insensitive(value, field_name)
.unwrap_or(&BorrowedValue::Static(simd_json::StaticNode::Null)),
Some(field_type),
)
let field_value = json_object_get_case_insensitive(value, field_name)
.unwrap_or_else(|| {
let error = AccessError::Undefined {
name: field_name.to_owned(),
path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
};
// TODO: is it possible to unify the logging with the one in `do_action`?
tracing::warn!(%error, "undefined nested field, padding with `NULL`");
&BorrowedValue::Static(simd_json::StaticNode::Null)
});
self.parse(field_value, Some(field_type))
})
.collect::<Result<_, _>>()?,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@
Failed to bind expression: avg(country)

Caused by:
Invalid input syntax: Invalid aggregation: avg(struct<address character varying,zipcode character varying>)
Invalid input syntax: Invalid aggregation: avg(struct<address character varying, zipcode character varying>)
create_source:
format: plain
encode: protobuf
Expand Down Expand Up @@ -422,7 +422,7 @@
- sql: |
CREATE TABLE a (c STRUCT<i STRUCT<a INTEGER>, j INTEGER>);
INSERT INTO a VALUES (1);
binder_error: 'Bind error: cannot cast type "integer" to "struct<i struct<a integer>,j integer>" in Assign context'
binder_error: 'Bind error: cannot cast type "integer" to "struct<i struct<a integer>, j integer>" in Assign context'
- name: test struct type alignment in CASE expression
sql: |
select CASE WHEN false THEN ROW(0, INTERVAL '1') WHEN true THEN ROW(1.1, INTERVAL '1') ELSE ROW(1, INTERVAL '1') END;
Expand Down
Loading