diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index 0997f6efc71a..a9daaa5f458f 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -120,3 +120,35 @@ select * from enum_to_varchar order by id; 1 happy 2 ok 3 sad + +query II +select id, my_int from list_with_null_shared order by id; +---- +1 {1,2,NULL} +2 {NULL,-1,-2} + +# will fix in https://github.com/risingwavelabs/risingwave/pull/16416 +query II +select id, my_num from list_with_null_shared order by id; +---- +1 {1.1,POSITIVE_INFINITY,NULL} +2 NULL + +# Due to the bug in Debezium, if a enum list contains `NULL`, the list will be converted to `NULL` +query II +select id, my_mood from list_with_null_shared order by id; +---- +1 NULL +2 NULL + +query II +select id, my_uuid from list_with_null_shared order by id; +---- +1 {bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f,NULL} +2 {NULL,471acecf-a4b4-4ed3-a211-7fb2291f159f,9bc35adf-fb11-4130-944c-e7eadb96b829} + +query II +select id, my_bytea from list_with_null_shared order by id; +---- +1 {"\\x00","\\x01",NULL} +2 {NULL,"\\x99","\\xaa"} diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 2258be2fbb7f..8eb48a8c81dc 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -304,6 +304,18 @@ CREATE TABLE enum_to_varchar_shared ( PRIMARY KEY (id) ) FROM pg_source TABLE 'public.enum_table'; + +statement ok +CREATE TABLE list_with_null_shared ( + id int, + my_int int[], + my_num varchar[], + my_mood varchar[], + my_uuid varchar[], + my_bytea bytea[], + PRIMARY KEY (id) +) FROM pg_source TABLE 'public.list_with_null'; + system ok psql -c " insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967); diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 9344035096cf..2a34a5205109 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -96,3 +96,6 @@ CREATE TABLE enum_table ( current_mood mood ); INSERT INTO enum_table VALUES (1, 'happy'); + +CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], my_mood mood[], my_uuid uuid[], my_bytea bytea[]); +INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}'); diff --git a/e2e_test/source/cdc/postgres_cdc_insert.sql b/e2e_test/source/cdc/postgres_cdc_insert.sql index 5f98b6c015f3..6b4fde2e7124 100644 --- a/e2e_test/source/cdc/postgres_cdc_insert.sql +++ b/e2e_test/source/cdc/postgres_cdc_insert.sql @@ -22,3 +22,5 @@ insert into numeric_table values(106, 'NaN'::numeric); insert into numeric_table values(107, 'Infinity'::numeric); INSERT INTO enum_table VALUES (3, 'sad'); +--- to avoid escaping issues of psql -c "", we insert this row here and check the result in check_new_rows.slt +INSERT INTO list_with_null VALUES (2, '{NULL,-1,-2}', '{NULL,nan,-inf}', '{NULL,sad,ok}', '{NULL,471acecf-a4b4-4ed3-a211-7fb2291f159f,9bc35adf-fb11-4130-944c-e7eadb96b829}', '{NULL,\\x99,\\xAA}'); diff --git a/e2e_test/source_inline/kafka/shared_source.slt b/e2e_test/source_inline/kafka/shared_source.slt index 806369f2d630..164d983e7538 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt +++ b/e2e_test/source_inline/kafka/shared_source.slt @@ -166,7 +166,7 @@ cat < = LazyLock::new(LogSuppresser::de macro_rules! handle_list_data_type { ($row:expr, $i:expr, $name:expr, $type:ty, $builder:expr) => { - let res = $row.try_get::<_, Option>>($i); + let res = $row.try_get::<_, Option>>>($i); match res { Ok(val) => { if let Some(v) = val { v.into_iter() - .for_each(|val| $builder.append(Some(ScalarImpl::from(val)))) + .for_each(|val| $builder.append(val.map(ScalarImpl::from))) } } Err(err) => { @@ -49,12 +49,12 @@ macro_rules! handle_list_data_type { } }; ($row:expr, $i:expr, $name:expr, $type:ty, $builder:expr, $rw_type:ty) => { - let res = $row.try_get::<_, Option>>($i); + let res = $row.try_get::<_, Option>>>($i); match res { Ok(val) => { if let Some(v) = val { v.into_iter().for_each(|val| { - $builder.append(Some(ScalarImpl::from(<$rw_type>::from(val)))) + $builder.append(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))) }) } } @@ -212,17 +212,23 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O if let Kind::Array(item_type) = row.columns()[i].type_().kind() && let Kind::Enum(_) = item_type.kind() { + // FIXME(Kexiang): The null of enum list is not supported in Debezium. + // As `NULL` in enum list is not supported in Debezium, we use `EnumString` + // instead of `Option` to handle enum to keep the behaviors aligned. + // An enum list contains `NULL` will be converted to `NULL`. let res = row.try_get::<_, Option>>(i); match res { Ok(val) => { if let Some(v) = val { v.into_iter().for_each(|val| { - builder.append(Some(ScalarImpl::from(val.0))) + builder.append(Some(ScalarImpl::from(val.0))); }); } + Some(ScalarImpl::from(ListValue::new(builder.finish()))) } Err(err) => { log_error!(name, err, "parse enum column failed"); + None } } } else { @@ -255,14 +261,15 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match *row.columns()[i].type_() { // Since we don't support UUID natively, adapt it to a VARCHAR column Type::UUID_ARRAY => { - let res = row.try_get::<_, Option>>(i); + let res = + row.try_get::<_, Option>>>(i); match res { Ok(val) => { if let Some(v) = val { v.into_iter().for_each(|val| { - builder.append(Some(ScalarImpl::from( - val.to_string(), - ))) + builder.append(val.map(|v| { + ScalarImpl::from(v.to_string()) + })) }); } } @@ -272,14 +279,13 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O }; } Type::NUMERIC_ARRAY => { - let res = row.try_get::<_, Option>>(i); + let res = + row.try_get::<_, Option>>>(i); match res { Ok(val) => { if let Some(v) = val { v.into_iter().for_each(|val| { - builder.append(pg_numeric_to_varchar(Some( - val, - ))) + builder.append(pg_numeric_to_varchar(val)) }); } } @@ -341,14 +347,14 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O ); } DataType::Bytea => { - let res = row.try_get::<_, Option>>>(i); + let res = row.try_get::<_, Option>>>>(i); match res { Ok(val) => { if let Some(v) = val { v.into_iter().for_each(|val| { - builder.append(Some(ScalarImpl::from( - val.into_boxed_slice(), - ))) + builder.append(val.map(|v| { + ScalarImpl::from(v.into_boxed_slice()) + })) }) } } @@ -358,15 +364,12 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } } DataType::Int256 => { - let res = row.try_get::<_, Option>>(i); + let res = row.try_get::<_, Option>>>(i); match res { Ok(val) => { if let Some(v) = val { v.into_iter().for_each(|val| { - builder.append(pg_numeric_to_rw_int256( - Some(val), - name, - )) + builder.append(pg_numeric_to_rw_int256(val, name)) }); } } @@ -386,8 +389,8 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O ); } }; + Some(ScalarImpl::from(ListValue::new(builder.finish()))) } - Some(ScalarImpl::from(ListValue::new(builder.finish()))) } DataType::Struct(_) | DataType::Serial => { // Interval, Struct, List are not supported