diff --git a/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt b/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt index f23ff824f249..1a313a761b68 100644 --- a/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt +++ b/e2e_test/source_legacy/cdc_inline/postgres_create_drop.slt @@ -4,7 +4,7 @@ control substitution on system ok psql -c " DROP TABLE IF EXISTS tt1; - CREATE TABLE tt1 (v1 int primary key, v2 timestamp); + CREATE TABLE tt1 (v1 int primary key, v2 timestamptz); INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');" statement ok diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index 00a5f9fc03ad..b74654b5f348 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -292,7 +292,6 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam && val <= Data.DataType.TypeName.INT64_VALUE; case "bigint": return val == Data.DataType.TypeName.INT64_VALUE; - case "float": case "real": return val == Data.DataType.TypeName.FLOAT_VALUE @@ -303,6 +302,12 @@ private boolean isDataTypeCompatible(String mysqlDataType, Data.DataType.TypeNam return val == Data.DataType.TypeName.DECIMAL_VALUE; case "varchar": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "date": + return val == Data.DataType.TypeName.DATE_VALUE; + case "time": + return val == Data.DataType.TypeName.TIME_VALUE; + case "datetime": + return val == Data.DataType.TypeName.TIMESTAMP_VALUE; case "timestamp": return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; default: diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index b3da549798f4..2199dd6168d6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -679,7 +679,25 @@ private boolean isDataTypeCompatible(String pgDataType, Data.DataType.TypeName t || val == Data.DataType.TypeName.VARCHAR_VALUE; case "varchar": case "character varying": + case "uuid": + case "enum": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "bytea": + return val == Data.DataType.TypeName.BYTEA_VALUE; + case "date": + return val == Data.DataType.TypeName.DATE_VALUE; + case "time": + return val == Data.DataType.TypeName.TIME_VALUE; + case "timestamp": + case "timestamp without time zone": + return val == Data.DataType.TypeName.TIMESTAMP_VALUE; + case "timestamptz": + case "timestamp with time zone": + return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; + case "interval": + return val == Data.DataType.TypeName.INTERVAL_VALUE; + case "jsonb": + return val == Data.DataType.TypeName.JSONB_VALUE; default: return true; // true for other uncovered types }