Skip to content

Commit

Permalink
fix(cdc): allow null in list in pg-cdc backfill (#16457)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Apr 29, 2024
1 parent 82d847c commit 1e55914
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 24 deletions.
32 changes: 32 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,35 @@ select * from enum_to_varchar order by id;
1 happy
2 ok
3 sad

query II
select id, my_int from list_with_null_shared order by id;
----
1 {1,2,NULL}
2 {NULL,-1,-2}

# will fix in https://github.com/risingwavelabs/risingwave/pull/16416
query II
select id, my_num from list_with_null_shared order by id;
----
1 {1.1,POSITIVE_INFINITY,NULL}
2 NULL

# Due to the bug in Debezium, if a enum list contains `NULL`, the list will be converted to `NULL`
query II
select id, my_mood from list_with_null_shared order by id;
----
1 NULL
2 NULL

query II
select id, my_uuid from list_with_null_shared order by id;
----
1 {bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f,NULL}
2 {NULL,471acecf-a4b4-4ed3-a211-7fb2291f159f,9bc35adf-fb11-4130-944c-e7eadb96b829}

query II
select id, my_bytea from list_with_null_shared order by id;
----
1 {"\\x00","\\x01",NULL}
2 {NULL,"\\x99","\\xaa"}
12 changes: 12 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@ CREATE TABLE enum_to_varchar_shared (
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.enum_table';


statement ok
CREATE TABLE list_with_null_shared (
id int,
my_int int[],
my_num varchar[],
my_mood varchar[],
my_uuid varchar[],
my_bytea bytea[],
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.list_with_null';

system ok
psql -c "
insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967);
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,6 @@ CREATE TABLE enum_table (
current_mood mood
);
INSERT INTO enum_table VALUES (1, 'happy');

CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], my_mood mood[], my_uuid uuid[], my_bytea bytea[]);
INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}');
2 changes: 2 additions & 0 deletions e2e_test/source/cdc/postgres_cdc_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ insert into numeric_table values(106, 'NaN'::numeric);
insert into numeric_table values(107, 'Infinity'::numeric);

INSERT INTO enum_table VALUES (3, 'sad');
--- to avoid escaping issues of psql -c "", we insert this row here and check the result in check_new_rows.slt
INSERT INTO list_with_null VALUES (2, '{NULL,-1,-2}', '{NULL,nan,-inf}', '{NULL,sad,ok}', '{NULL,471acecf-a4b4-4ed3-a211-7fb2291f159f,9bc35adf-fb11-4130-944c-e7eadb96b829}', '{NULL,\\x99,\\xAA}');
2 changes: 1 addition & 1 deletion e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ cat <<EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0
EOF
done'

sleep 2s
sleep 3s

query IT rowsort
select v1, count(*) from s0 group by v1;
Expand Down
49 changes: 26 additions & 23 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::de

macro_rules! handle_list_data_type {
($row:expr, $i:expr, $name:expr, $type:ty, $builder:expr) => {
let res = $row.try_get::<_, Option<Vec<$type>>>($i);
let res = $row.try_get::<_, Option<Vec<Option<$type>>>>($i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter()
.for_each(|val| $builder.append(Some(ScalarImpl::from(val))))
.for_each(|val| $builder.append(val.map(ScalarImpl::from)))
}
}
Err(err) => {
Expand All @@ -49,12 +49,12 @@ macro_rules! handle_list_data_type {
}
};
($row:expr, $i:expr, $name:expr, $type:ty, $builder:expr, $rw_type:ty) => {
let res = $row.try_get::<_, Option<Vec<$type>>>($i);
let res = $row.try_get::<_, Option<Vec<Option<$type>>>>($i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
$builder.append(Some(ScalarImpl::from(<$rw_type>::from(val))))
$builder.append(val.map(|v| ScalarImpl::from(<$rw_type>::from(v))))
})
}
}
Expand Down Expand Up @@ -212,17 +212,23 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
if let Kind::Array(item_type) = row.columns()[i].type_().kind()
&& let Kind::Enum(_) = item_type.kind()
{
// FIXME(Kexiang): The null of enum list is not supported in Debezium.
// As `NULL` in enum list is not supported in Debezium, we use `EnumString`
// instead of `Option<EnumString>` to handle enum to keep the behaviors aligned.
// An enum list contains `NULL` will be converted to `NULL`.
let res = row.try_get::<_, Option<Vec<EnumString>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(Some(ScalarImpl::from(val.0)))
builder.append(Some(ScalarImpl::from(val.0)));
});
}
Some(ScalarImpl::from(ListValue::new(builder.finish())))
}
Err(err) => {
log_error!(name, err, "parse enum column failed");
None
}
}
} else {
Expand Down Expand Up @@ -255,14 +261,15 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match *row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
Type::UUID_ARRAY => {
let res = row.try_get::<_, Option<Vec<uuid::Uuid>>>(i);
let res =
row.try_get::<_, Option<Vec<Option<uuid::Uuid>>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(Some(ScalarImpl::from(
val.to_string(),
)))
builder.append(val.map(|v| {
ScalarImpl::from(v.to_string())
}))
});
}
}
Expand All @@ -272,14 +279,13 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
};
}
Type::NUMERIC_ARRAY => {
let res = row.try_get::<_, Option<Vec<PgNumeric>>>(i);
let res =
row.try_get::<_, Option<Vec<Option<PgNumeric>>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(pg_numeric_to_varchar(Some(
val,
)))
builder.append(pg_numeric_to_varchar(val))
});
}
}
Expand Down Expand Up @@ -341,14 +347,14 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
);
}
DataType::Bytea => {
let res = row.try_get::<_, Option<Vec<Vec<u8>>>>(i);
let res = row.try_get::<_, Option<Vec<Option<Vec<u8>>>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(Some(ScalarImpl::from(
val.into_boxed_slice(),
)))
builder.append(val.map(|v| {
ScalarImpl::from(v.into_boxed_slice())
}))
})
}
}
Expand All @@ -358,15 +364,12 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
DataType::Int256 => {
let res = row.try_get::<_, Option<Vec<PgNumeric>>>(i);
let res = row.try_get::<_, Option<Vec<Option<PgNumeric>>>>(i);
match res {
Ok(val) => {
if let Some(v) = val {
v.into_iter().for_each(|val| {
builder.append(pg_numeric_to_rw_int256(
Some(val),
name,
))
builder.append(pg_numeric_to_rw_int256(val, name))
});
}
}
Expand All @@ -386,8 +389,8 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
);
}
};
Some(ScalarImpl::from(ListValue::new(builder.finish())))
}
Some(ScalarImpl::from(ListValue::new(builder.finish())))
}
DataType::Struct(_) | DataType::Serial => {
// Interval, Struct, List are not supported
Expand Down

0 comments on commit 1e55914

Please sign in to comment.