From 43730aa0734f54aef96888af03c60fe72c9ca2dd Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Sun, 17 Nov 2024 14:41:12 -0600 Subject: [PATCH] fix(direct-cdc): improve type match for pg-cdc (#19409) --- .../cdc_inline/postgres_create_drop.slt | 2 +- .../source/common/MySqlValidator.java | 7 ++++++- .../source/common/PostgresValidator.java | 18 ++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt b/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt index f23ff824f249..1a313a761b68 100644 --- a/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt +++ b/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt @@ -4,7 +4,7 @@ control substitution on system ok psql -c " DROP TABLE IF EXISTS tt1; - CREATE TABLE tt1 (v1 int primary key, v2 timestamp); + CREATE TABLE tt1 (v1 int primary key, v2 timestamptz); INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');" statement ok diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index 00a5f9fc03ad..b74654b5f348 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -292,7 +292,6 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam && val <= Data.DataType.TypeName.INT64_VALUE; case "bigint": return val == Data.DataType.TypeName.INT64_VALUE; - case "float": case "real": return val == Data.DataType.TypeName.FLOAT_VALUE @@ -303,6 +302,12 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam return val == Data.DataType.TypeName.DECIMAL_VALUE; case "varchar": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "date": + return val == Data.DataType.TypeName.DATE_VALUE; + case "time": + return val == Data.DataType.TypeName.TIME_VALUE; + case "datetime": + return val == Data.DataType.TypeName.TIMESTAMP_VALUE; case "timestamp": return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; default: diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index b3da549798f4..2199dd6168d6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -679,7 +679,25 @@ private boolean isDataTypeCompatible(String pgDataType, Data.DataType.TypeName t || val == Data.DataType.TypeName.VARCHAR_VALUE; case "varchar": case "character varying": + case "uuid": + case "enum": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "bytea": + return val == Data.DataType.TypeName.BYTEA_VALUE; + case "date": + return val == Data.DataType.TypeName.DATE_VALUE; + case "time": + return val == Data.DataType.TypeName.TIME_VALUE; + case "timestamp": + case "timestamp without time zone": + return val == Data.DataType.TypeName.TIMESTAMP_VALUE; + case "timestamptz": + case "timestamp with time zone": + return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; + case "interval": + return val == Data.DataType.TypeName.INTERVAL_VALUE; + case "jsonb": + return val == Data.DataType.TypeName.JSONB_VALUE; default: return true; // true for other uncovered types }