Skip to content

Commit

Permalink
fix(connector): warn on undefined nested field in struct when parsing…
Browse files Browse the repository at this point in the history
… JSON (#13384)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Nov 14, 2023
1 parent b569b92 commit 3fd8140
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 14 deletions.
10 changes: 5 additions & 5 deletions e2e_test/udf/udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,19 @@ query TTTTT rowsort
show functions
----
array_access character varying[], integer character varying (empty) http://localhost:8815
extract_tcp_info bytea struct<src_ip character varying,dst_ip character varying,src_port smallint,dst_port smallint> (empty) http://localhost:8815
extract_tcp_info bytea struct<src_ip character varying, dst_ip character varying, src_port smallint, dst_port smallint> (empty) http://localhost:8815
gcd integer, integer integer (empty) http://localhost:8815
gcd integer, integer, integer integer (empty) http://localhost:8815
hex_to_dec character varying numeric (empty) http://localhost:8815
int_42 (empty) integer (empty) http://localhost:8815
jsonb_access jsonb, integer jsonb (empty) http://localhost:8815
jsonb_array_identity jsonb[] jsonb[] (empty) http://localhost:8815
jsonb_array_struct_identity struct<v jsonb[],len integer> struct<v jsonb[],len integer> (empty) http://localhost:8815
jsonb_array_struct_identity struct<v jsonb[], len integer> struct<v jsonb[], len integer> (empty) http://localhost:8815
jsonb_concat jsonb[] jsonb (empty) http://localhost:8815
return_all boolean, smallint, integer, bigint, real, double precision, numeric, date, time without time zone, timestamp without time zone, interval, character varying, bytea, jsonb struct<bool boolean,i16 smallint,i32 integer,i64 bigint,f32 real,f64 double precision,decimal numeric,date date,time time without time zone,timestamp timestamp without time zone,interval interval,varchar character varying,bytea bytea,jsonb jsonb> (empty) http://localhost:8815
return_all_arrays boolean[], smallint[], integer[], bigint[], real[], double precision[], numeric[], date[], time without time zone[], timestamp without time zone[], interval[], character varying[], bytea[], jsonb[] struct<bool boolean[],i16 smallint[],i32 integer[],i64 bigint[],f32 real[],f64 double precision[],decimal numeric[],date date[],time time without time zone[],timestamp timestamp without time zone[],interval interval[],varchar character varying[],bytea bytea[],jsonb jsonb[]> (empty) http://localhost:8815
return_all boolean, smallint, integer, bigint, real, double precision, numeric, date, time without time zone, timestamp without time zone, interval, character varying, bytea, jsonb struct<bool boolean, i16 smallint, i32 integer, i64 bigint, f32 real, f64 double precision, decimal numeric, date date, time time without time zone, timestamp timestamp without time zone, interval interval, varchar character varying, bytea bytea, jsonb jsonb> (empty) http://localhost:8815
return_all_arrays boolean[], smallint[], integer[], bigint[], real[], double precision[], numeric[], date[], time without time zone[], timestamp without time zone[], interval[], character varying[], bytea[], jsonb[] struct<bool boolean[], i16 smallint[], i32 integer[], i64 bigint[], f32 real[], f64 double precision[], decimal numeric[], date date[], time time without time zone[], timestamp timestamp without time zone[], interval interval[], varchar character varying[], bytea bytea[], jsonb jsonb[]> (empty) http://localhost:8815
series integer integer (empty) http://localhost:8815
split character varying struct<word character varying,length integer> (empty) http://localhost:8815
split character varying struct<word character varying, length integer> (empty) http://localhost:8815

query I
select int_42();
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ mod tests {
);
assert_eq!(
format!("{}", d),
"struct<i integer,j character varying>".to_string()
"struct<i integer, j character varying>".to_string()
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/struct_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Display for StructType {
(self.0.field_types.iter())
.zip_eq_fast(self.0.field_names.iter())
.map(|(d, s)| format!("{} {}", s, d))
.join(",")
.join(", ")
)
}
}
Expand Down
50 changes: 50 additions & 0 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,56 @@ mod tests {
assert_eq!(row, expected.into());
}

#[cfg(not(madsim))] // Traced test does not work with madsim
#[tokio::test]
#[tracing_test::traced_test]
async fn test_json_parse_struct_missing_field_warning() {
let descs = vec![ColumnDesc::new_struct(
"struct",
0,
"",
vec![
ColumnDesc::new_atomic(DataType::Varchar, "varchar", 1),
ColumnDesc::new_atomic(DataType::Boolean, "boolean", 2),
],
)]
.iter()
.map(SourceColumnDesc::from)
.collect_vec();

let parser = JsonParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
descs.clone(),
Default::default(),
)
.unwrap();
let payload = br#"
{
"struct": {
"varchar": "varchar"
}
}
"#
.to_vec();
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
{
let writer = builder.row_writer();
parser.parse_inner(payload, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
let row = row.into_owned_row().into_inner();

let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
Some(ScalarImpl::Utf8("varchar".into())),
None,
])))];
assert_eq!(row, expected.into());

assert!(logs_contain("undefined nested field, padding with `NULL`"));
}

#[tokio::test]
async fn test_json_upsert_parser() {
let items = [
Expand Down
16 changes: 11 additions & 5 deletions src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,17 @@ impl JsonParseOptions {
.names()
.zip_eq_fast(struct_type_info.types())
.map(|(field_name, field_type)| {
self.parse(
json_object_get_case_insensitive(value, field_name)
.unwrap_or(&BorrowedValue::Static(simd_json::StaticNode::Null)),
Some(field_type),
)
let field_value = json_object_get_case_insensitive(value, field_name)
.unwrap_or_else(|| {
let error = AccessError::Undefined {
name: field_name.to_owned(),
path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
};
// TODO: is it possible to unify the logging with the one in `do_action`?
tracing::warn!(%error, "undefined nested field, padding with `NULL`");
&BorrowedValue::Static(simd_json::StaticNode::Null)
});
self.parse(field_value, Some(field_type))
})
.collect::<Result<_, _>>()?,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@
Failed to bind expression: avg(country)
Caused by:
Invalid input syntax: Invalid aggregation: avg(struct<address character varying,zipcode character varying>)
Invalid input syntax: Invalid aggregation: avg(struct<address character varying, zipcode character varying>)
create_source:
format: plain
encode: protobuf
Expand Down Expand Up @@ -422,7 +422,7 @@
- sql: |
CREATE TABLE a (c STRUCT<i STRUCT<a INTEGER>, j INTEGER>);
INSERT INTO a VALUES (1);
binder_error: 'Bind error: cannot cast type "integer" to "struct<i struct<a integer>,j integer>" in Assign context'
binder_error: 'Bind error: cannot cast type "integer" to "struct<i struct<a integer>, j integer>" in Assign context'
- name: test struct type alignment in CASE expression
sql: |
select CASE WHEN false THEN ROW(0, INTERVAL '1') WHEN true THEN ROW(1.1, INTERVAL '1') ELSE ROW(1, INTERVAL '1') END;
Expand Down

0 comments on commit 3fd8140

Please sign in to comment.