diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index c4b4713af81cc..154d9b7f0509f 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -59,6 +59,22 @@ export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_ createdb psql < ./e2e_test/source/cdc/postgres_cdc.sql +# to test upper case database +psql -c "CREATE DATABASE \"UpperDb\";" + +export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=UpperDb +psql -c " +CREATE SCHEMA \"UpperSchema\"; +CREATE TABLE \"UpperSchema\".\"Orders\" ( + id int PRIMARY KEY, + name varchar +); +INSERT INTO \"UpperSchema\".\"Orders\" VALUES (1, 'happy'); +" +risedev slt './e2e_test/source/cdc/cdc.upper_case.postgres.slt' + +export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=UpperDb + echo "--- starting risingwave cluster" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-1cn-1fe-with-recovery diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index 446fc7f04b092..3d6ea0e179391 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -75,8 +75,3 @@ select * from enum_to_varchar order by id; ---- 1 happy 2 ok - -query II -select * from orders order by id; ----- -1 happy diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index fb3d1b1d0d0ce..2c372cbd3ffdd 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -242,20 +242,3 @@ create table enum_to_varchar ( table.name = 'enum_table', slot.name = 'enum_to_varchar' ); - - -statement ok -create table orders ( - id int, - name varchar, - PRIMARY KEY (id) -) with ( - connector = 'postgres-cdc', - hostname = '${PGHOST:localhost}', - port = '${PGPORT:5432}', - username = '${PGUSER:$USER}', - password = '${PGPASSWORD:}', - database.name = '${PGDATABASE:postgres}', - table.name = 'Order', - slot.name = 'orders' -); diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 6091d8b3bc488..3dc26d98c6282 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -250,11 +250,6 @@ CREATE TABLE person_new ( city varchar ) FROM pg_source TABLE 'person'; -CREATE TABLE orders_shared ( - id int PRIMARY KEY, - name varchar, -) FROM pg_source TABLE 'Orders'; - statement ok CREATE TABLE person_new ( id int, @@ -496,8 +491,3 @@ select * from enum_to_varchar_shared order by id; ---- 1 happy 2 ok - -query II -select * from orders_shared order by id; ----- -1 happy diff --git a/e2e_test/source/cdc/cdc.upper_case.postgres.slt b/e2e_test/source/cdc/cdc.upper_case.postgres.slt new file mode 100644 index 0000000000000..88b059939ec74 --- /dev/null +++ b/e2e_test/source/cdc/cdc.upper_case.postgres.slt @@ -0,0 +1,44 @@ +# For upper case UpperDb +statement ok +create source pg_source_upper with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + slot.name = 'pg_slot' +); + +statement ok +CREATE TABLE orders_shared ( + id int PRIMARY KEY, + name varchar, +) FROM pg_source_upper TABLE 'UpperSchema.Orders'; + +query II +select * from orders_shared order by id; +---- +1 happy + +statement ok +create table orders ( + id int, + name varchar, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'UpperSchema', + table.name = 'Order', + slot.name = 'orders' +); + +query II +select * from orders order by id; +---- +1 happy diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 2831dac183da8..43a120a19d50f 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -106,9 +106,3 @@ CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}'); INSERT INTO list_with_null VALUES (2, '{NULL,3,4}', '{2.2,0,NULL}' , '{2.2,0,NULL}', '{2.2,0,NULL}', '{happy,ok,sad}', '{2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}', '{\\x00,\\x01,\\x02}'); INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL); - -CREATE TABLE "Orders" ( - id int PRIMARY KEY, - name varchar, -); -INSERT INTO "Orders" VALUES (1, 'happy'); \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index a0f59bbcf15e4..d642273b3d35f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -180,8 +180,7 @@ private void validateTableSchema() throws SQLException { // check primary key // reference: https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.pk"))) { - stmt.setString(1, schemaName); - stmt.setString(2, tableName); + stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); var res = stmt.executeQuery(); var pkFields = new HashSet(); while (res.next()) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 61f0868a2dbb8..769c3cb1c8fb0 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -6,7 +6,7 @@ mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_sc mysql.grants=SHOW GRANTS FOR CURRENT_USER() postgres.wal=show wal_level postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?) -postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) JOIN pg_class c ON c.oid = i.indrelid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = ? AND c.relname = ? AND i.indisprimary +postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary postgres.table_schema=SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name = ? and database = ? postgres.slot_limit.check=SELECT CASE WHEN (SELECT count(*) FROM pg_replication_slots) = (SELECT setting FROM pg_settings WHERE name='max_replication_slots')::int THEN 'true' ELSE 'false' END AS result;