diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 66b3ea3eb4bc6..3ae6995514d26 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -12,7 +12,7 @@ services: interval: 5s timeout: 5s retries: 5 - command: [ "postgres", "-c", "wal_level=logical" ] + command: [ "postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30" ] mysql: image: mysql:8.0 diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index 3d6ea0e179391..b0559d948c0ef 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -75,3 +75,8 @@ select * from enum_to_varchar order by id; ---- 1 happy 2 ok + +query II +select * from upper_orders order by id; +---- +1 happy diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 2c372cbd3ffdd..599394c8a0047 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -242,3 +242,19 @@ create table enum_to_varchar ( table.name = 'enum_table', slot.name = 'enum_to_varchar' ); + +statement ok +create table upper_orders ( + id int, + name varchar, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + table.name = 'Orders', + slot.name = 'orders' +); diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 33c9c16a776c0..b180a67c2cb2a 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -268,6 +268,12 @@ CREATE TABLE person_new ( city varchar ) FROM pg_source TABLE 'person'; +statement ok +CREATE TABLE upper_orders_shared ( + id int PRIMARY KEY, + name varchar +) FROM pg_source TABLE 'public.Orders'; + statement ok CREATE TABLE person_new ( id int, @@ -509,3 +515,8 @@ select * from enum_to_varchar_shared order by id; ---- 1 happy 2 ok + +query II +select * from upper_orders_shared order by id; +---- +1 happy diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 43a120a19d50f..20b6e7b414469 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -106,3 +106,9 @@ CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}'); INSERT INTO list_with_null VALUES (2, '{NULL,3,4}', '{2.2,0,NULL}' , '{2.2,0,NULL}', '{2.2,0,NULL}', '{happy,ok,sad}', '{2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}', '{\\x00,\\x01,\\x02}'); INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + +CREATE TABLE "Orders" ( + id int PRIMARY KEY, + name varchar +); +INSERT INTO "Orders" VALUES (1, 'happy'); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 93d4fdee0bcd4..6e50cb0772ae7 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -180,7 +180,7 @@ private void validateTableSchema() throws SQLException { // check primary key // reference: https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.pk"))) { - stmt.setString(1, this.schemaName + "." + this.tableName); + stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); var res = stmt.executeQuery(); var pkFields = new HashSet(); while (res.next()) { @@ -521,7 +521,8 @@ protected void alterPublicationIfNeeded() throws SQLException { String alterPublicationSql = String.format( - "ALTER PUBLICATION %s ADD TABLE %s", pubName, schemaName + "." + tableName); + "ALTER PUBLICATION %s ADD TABLE %s", + pubName, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); try (var stmt = jdbcConnection.createStatement()) { LOG.info("Altered publication with statement: {}", alterPublicationSql); stmt.execute(alterPublicationSql);