From 167e938f9a3817870a1688ac972fb17c2a2d4a57 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Thu, 18 Jul 2024 22:26:31 -0400 Subject: [PATCH 1/8] fix(cdc): fix uppercase-identifier of pgcdc --- .../risingwave/connector/source/common/PostgresValidator.java | 3 ++- .../src/main/resources/validate_sql.properties | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 93d4fdee0bcd4..a0f59bbcf15e4 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -180,7 +180,8 @@ private void validateTableSchema() throws SQLException { // check primary key // reference: https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.pk"))) { - stmt.setString(1, this.schemaName + "." + this.tableName); + stmt.setString(1, schemaName); + stmt.setString(2, tableName); var res = stmt.executeQuery(); var pkFields = new HashSet(); while (res.next()) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 769c3cb1c8fb0..61f0868a2dbb8 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -6,7 +6,7 @@ mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_sc mysql.grants=SHOW GRANTS FOR CURRENT_USER() postgres.wal=show wal_level postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?) -postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary +postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) JOIN pg_class c ON c.oid = i.indrelid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = ? AND c.relname = ? AND i.indisprimary postgres.table_schema=SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name = ? and database = ? postgres.slot_limit.check=SELECT CASE WHEN (SELECT count(*) FROM pg_replication_slots) = (SELECT setting FROM pg_settings WHERE name='max_replication_slots')::int THEN 'true' ELSE 'false' END AS result; From 3ea9d5b3a4fb0377a4062a9aa3dabbfa2dc3b51c Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Thu, 18 Jul 2024 22:35:26 -0400 Subject: [PATCH 2/8] add test --- e2e_test/source/cdc/cdc.check.slt | 5 +++++ e2e_test/source/cdc/cdc.load.slt | 17 +++++++++++++++++ e2e_test/source/cdc/cdc.share_stream.slt | 10 ++++++++++ e2e_test/source/cdc/postgres_cdc.sql | 6 ++++++ 4 files changed, 38 insertions(+) diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index 3d6ea0e179391..446fc7f04b092 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -75,3 +75,8 @@ select * from enum_to_varchar order by id; ---- 1 happy 2 ok + +query II +select * from orders order by id; +---- +1 happy diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 2c372cbd3ffdd..fb3d1b1d0d0ce 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -242,3 +242,20 @@ create table enum_to_varchar ( table.name = 'enum_table', slot.name = 'enum_to_varchar' ); + + +statement ok +create table orders ( + id int, + name varchar, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + table.name = 'Order', + slot.name = 'orders' +); diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 3dc26d98c6282..6091d8b3bc488 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -250,6 +250,11 @@ CREATE TABLE person_new ( city varchar ) FROM pg_source TABLE 'person'; +CREATE TABLE orders_shared ( + id int PRIMARY KEY, + name varchar, +) FROM pg_source TABLE 'Orders'; + statement ok CREATE TABLE person_new ( id int, @@ -491,3 +496,8 @@ select * from enum_to_varchar_shared order by id; ---- 1 happy 2 ok + +query II +select * from orders_shared order by id; +---- +1 happy diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 43a120a19d50f..2831dac183da8 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -106,3 +106,9 @@ CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}'); INSERT INTO list_with_null VALUES (2, '{NULL,3,4}', '{2.2,0,NULL}' , '{2.2,0,NULL}', '{2.2,0,NULL}', '{happy,ok,sad}', '{2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}', '{\\x00,\\x01,\\x02}'); INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + +CREATE TABLE "Orders" ( + id int PRIMARY KEY, + name varchar, +); +INSERT INTO "Orders" VALUES (1, 'happy'); \ No newline at end of file From 06705701e03e323be49f275dc833c7bb8dca50b9 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Thu, 18 Jul 2024 23:12:39 -0400 Subject: [PATCH 3/8] use uppercase identifier --- ci/scripts/e2e-source-test.sh | 16 +++++++ e2e_test/source/cdc/cdc.check.slt | 5 --- e2e_test/source/cdc/cdc.load.slt | 17 ------- e2e_test/source/cdc/cdc.share_stream.slt | 10 ----- .../source/cdc/cdc.upper_case.postgres.slt | 44 +++++++++++++++++++ e2e_test/source/cdc/postgres_cdc.sql | 6 --- .../source/common/PostgresValidator.java | 3 +- .../main/resources/validate_sql.properties | 2 +- 8 files changed, 62 insertions(+), 41 deletions(-) create mode 100644 e2e_test/source/cdc/cdc.upper_case.postgres.slt diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index c4b4713af81cc..d26365cf3053d 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -59,6 +59,22 @@ export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_ createdb psql < ./e2e_test/source/cdc/postgres_cdc.sql +# to test upper case database +psql -c "CREATE DATABASE \"UpperDb\";" + +export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=UpperDb +psql -c " +CREATE SCHEMA \"UpperSchema\"; +CREATE TABLE \"UpperSchema\".\"Orders\" ( + id int PRIMARY KEY, + name varchar +); +INSERT INTO \"UpperSchema\".\"Orders\" VALUES (1, 'happy'); +" +risedev slt './e2e_test/source/cdc/cdc.upper_case.postgres.slt' + +export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test + echo "--- starting risingwave cluster" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-1cn-1fe-with-recovery diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index 446fc7f04b092..3d6ea0e179391 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -75,8 +75,3 @@ select * from enum_to_varchar order by id; ---- 1 happy 2 ok - -query II -select * from orders order by id; ----- -1 happy diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index fb3d1b1d0d0ce..2c372cbd3ffdd 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -242,20 +242,3 @@ create table enum_to_varchar ( table.name = 'enum_table', slot.name = 'enum_to_varchar' ); - - -statement ok -create table orders ( - id int, - name varchar, - PRIMARY KEY (id) -) with ( - connector = 'postgres-cdc', - hostname = '${PGHOST:localhost}', - port = '${PGPORT:5432}', - username = '${PGUSER:$USER}', - password = '${PGPASSWORD:}', - database.name = '${PGDATABASE:postgres}', - table.name = 'Order', - slot.name = 'orders' -); diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 6091d8b3bc488..3dc26d98c6282 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -250,11 +250,6 @@ CREATE TABLE person_new ( city varchar ) FROM pg_source TABLE 'person'; -CREATE TABLE orders_shared ( - id int PRIMARY KEY, - name varchar, -) FROM pg_source TABLE 'Orders'; - statement ok CREATE TABLE person_new ( id int, @@ -496,8 +491,3 @@ select * from enum_to_varchar_shared order by id; ---- 1 happy 2 ok - -query II -select * from orders_shared order by id; ----- -1 happy diff --git a/e2e_test/source/cdc/cdc.upper_case.postgres.slt b/e2e_test/source/cdc/cdc.upper_case.postgres.slt new file mode 100644 index 0000000000000..88b059939ec74 --- /dev/null +++ b/e2e_test/source/cdc/cdc.upper_case.postgres.slt @@ -0,0 +1,44 @@ +# For upper case UpperDb +statement ok +create source pg_source_upper with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + slot.name = 'pg_slot' +); + +statement ok +CREATE TABLE orders_shared ( + id int PRIMARY KEY, + name varchar, +) FROM pg_source_upper TABLE 'UpperSchema.Orders'; + +query II +select * from orders_shared order by id; +---- +1 happy + +statement ok +create table orders ( + id int, + name varchar, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'UpperSchema', + table.name = 'Order', + slot.name = 'orders' +); + +query II +select * from orders order by id; +---- +1 happy diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 2831dac183da8..43a120a19d50f 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -106,9 +106,3 @@ CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}'); INSERT INTO list_with_null VALUES (2, '{NULL,3,4}', '{2.2,0,NULL}' , '{2.2,0,NULL}', '{2.2,0,NULL}', '{happy,ok,sad}', '{2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}', '{\\x00,\\x01,\\x02}'); INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL); - -CREATE TABLE "Orders" ( - id int PRIMARY KEY, - name varchar, -); -INSERT INTO "Orders" VALUES (1, 'happy'); \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index a0f59bbcf15e4..d642273b3d35f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -180,8 +180,7 @@ private void validateTableSchema() throws SQLException { // check primary key // reference: https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.pk"))) { - stmt.setString(1, schemaName); - stmt.setString(2, tableName); + stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); var res = stmt.executeQuery(); var pkFields = new HashSet(); while (res.next()) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 61f0868a2dbb8..769c3cb1c8fb0 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -6,7 +6,7 @@ mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_sc mysql.grants=SHOW GRANTS FOR CURRENT_USER() postgres.wal=show wal_level postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?) -postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) JOIN pg_class c ON c.oid = i.indrelid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = ? AND c.relname = ? AND i.indisprimary +postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary postgres.table_schema=SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name = ? and database = ? postgres.slot_limit.check=SELECT CASE WHEN (SELECT count(*) FROM pg_replication_slots) = (SELECT setting FROM pg_settings WHERE name='max_replication_slots')::int THEN 'true' ELSE 'false' END AS result; From 7cf41c927bb2c7eb907565560ec73c7d43152eab Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 19 Jul 2024 16:09:30 -0400 Subject: [PATCH 4/8] fix test --- ci/scripts/e2e-source-test.sh | 3 ++- e2e_test/source/cdc/cdc.upper_case.postgres.slt | 2 +- .../risingwave/connector/source/common/PostgresValidator.java | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index d26365cf3053d..69f3a7088570f 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -63,7 +63,7 @@ psql < ./e2e_test/source/cdc/postgres_cdc.sql psql -c "CREATE DATABASE \"UpperDb\";" export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=UpperDb -psql -c " +psql -d UpperDb -c " CREATE SCHEMA \"UpperSchema\"; CREATE TABLE \"UpperSchema\".\"Orders\" ( id int PRIMARY KEY, @@ -71,6 +71,7 @@ CREATE TABLE \"UpperSchema\".\"Orders\" ( ); INSERT INTO \"UpperSchema\".\"Orders\" VALUES (1, 'happy'); " + risedev slt './e2e_test/source/cdc/cdc.upper_case.postgres.slt' export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test diff --git a/e2e_test/source/cdc/cdc.upper_case.postgres.slt b/e2e_test/source/cdc/cdc.upper_case.postgres.slt index 88b059939ec74..c471552421e80 100644 --- a/e2e_test/source/cdc/cdc.upper_case.postgres.slt +++ b/e2e_test/source/cdc/cdc.upper_case.postgres.slt @@ -7,7 +7,7 @@ create source pg_source_upper with ( username = '${PGUSER:$USER}', password = '${PGPASSWORD:}', database.name = '${PGDATABASE:postgres}', - slot.name = 'pg_slot' + slot.name = 'pg_slot_upper' ); statement ok diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index d642273b3d35f..6e50cb0772ae7 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -521,7 +521,8 @@ protected void alterPublicationIfNeeded() throws SQLException { String alterPublicationSql = String.format( - "ALTER PUBLICATION %s ADD TABLE %s", pubName, schemaName + "." + tableName); + "ALTER PUBLICATION %s ADD TABLE %s", + pubName, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); try (var stmt = jdbcConnection.createStatement()) { LOG.info("Altered publication with statement: {}", alterPublicationSql); stmt.execute(alterPublicationSql); From 938f610c3a23183c266e65c550764bf16c9316fa Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 22 Jul 2024 17:55:31 -0400 Subject: [PATCH 5/8] fix test --- ci/scripts/e2e-source-test.sh | 17 ------- e2e_test/source/cdc/cdc.check.slt | 5 +++ e2e_test/source/cdc/cdc.load.slt | 16 +++++++ e2e_test/source/cdc/cdc.share_stream.slt | 10 +++++ .../source/cdc/cdc.upper_case.postgres.slt | 44 ------------------- e2e_test/source/cdc/postgres_cdc.sql | 6 +++ 6 files changed, 37 insertions(+), 61 deletions(-) delete mode 100644 e2e_test/source/cdc/cdc.upper_case.postgres.slt diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 69f3a7088570f..c4b4713af81cc 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -59,23 +59,6 @@ export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_ createdb psql < ./e2e_test/source/cdc/postgres_cdc.sql -# to test upper case database -psql -c "CREATE DATABASE \"UpperDb\";" - -export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=UpperDb -psql -d UpperDb -c " -CREATE SCHEMA \"UpperSchema\"; -CREATE TABLE \"UpperSchema\".\"Orders\" ( - id int PRIMARY KEY, - name varchar -); -INSERT INTO \"UpperSchema\".\"Orders\" VALUES (1, 'happy'); -" - -risedev slt './e2e_test/source/cdc/cdc.upper_case.postgres.slt' - -export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test - echo "--- starting risingwave cluster" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-1cn-1fe-with-recovery diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index 3d6ea0e179391..446fc7f04b092 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -75,3 +75,8 @@ select * from enum_to_varchar order by id; ---- 1 happy 2 ok + +query II +select * from orders order by id; +---- +1 happy diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 2c372cbd3ffdd..13c9c69ed7fa7 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -242,3 +242,19 @@ create table enum_to_varchar ( table.name = 'enum_table', slot.name = 'enum_to_varchar' ); + +statement ok +create table orders ( + id int, + name varchar, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + table.name = 'Order', + slot.name = 'orders' +); diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 33c9c16a776c0..72c6ffda0d9d5 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -268,6 +268,11 @@ CREATE TABLE person_new ( city varchar ) FROM pg_source TABLE 'person'; +CREATE TABLE orders_shared ( + id int PRIMARY KEY, + name varchar, +) FROM pg_source TABLE 'Orders'; + statement ok CREATE TABLE person_new ( id int, @@ -509,3 +514,8 @@ select * from enum_to_varchar_shared order by id; ---- 1 happy 2 ok + +query II +select * from orders_shared order by id; +---- +1 happy diff --git a/e2e_test/source/cdc/cdc.upper_case.postgres.slt b/e2e_test/source/cdc/cdc.upper_case.postgres.slt deleted file mode 100644 index c471552421e80..0000000000000 --- a/e2e_test/source/cdc/cdc.upper_case.postgres.slt +++ /dev/null @@ -1,44 +0,0 @@ -# For upper case UpperDb -statement ok -create source pg_source_upper with ( - connector = 'postgres-cdc', - hostname = '${PGHOST:localhost}', - port = '${PGPORT:5432}', - username = '${PGUSER:$USER}', - password = '${PGPASSWORD:}', - database.name = '${PGDATABASE:postgres}', - slot.name = 'pg_slot_upper' -); - -statement ok -CREATE TABLE orders_shared ( - id int PRIMARY KEY, - name varchar, -) FROM pg_source_upper TABLE 'UpperSchema.Orders'; - -query II -select * from orders_shared order by id; ----- -1 happy - -statement ok -create table orders ( - id int, - name varchar, - PRIMARY KEY (id) -) with ( - connector = 'postgres-cdc', - hostname = '${PGHOST:localhost}', - port = '${PGPORT:5432}', - username = '${PGUSER:$USER}', - password = '${PGPASSWORD:}', - database.name = '${PGDATABASE:postgres}', - schema.name = 'UpperSchema', - table.name = 'Order', - slot.name = 'orders' -); - -query II -select * from orders order by id; ----- -1 happy diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 43a120a19d50f..9dff91c179acc 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -106,3 +106,9 @@ CREATE TABLE list_with_null(id int primary key, my_int int[], my_num numeric[], INSERT INTO list_with_null VALUES (1, '{1,2,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{1.1,inf,NULL}', '{happy,ok,NULL}', '{bb488f9b-330d-4012-b849-12adeb49e57e,bb488f9b-330d-4012-b849-12adeb49e57f, NULL}', '{\\x00,\\x01,NULL}'); INSERT INTO list_with_null VALUES (2, '{NULL,3,4}', '{2.2,0,NULL}' , '{2.2,0,NULL}', '{2.2,0,NULL}', '{happy,ok,sad}', '{2de296df-eda7-4202-a81f-1036100ef4f6,2977afbc-0b12-459c-a36f-f623fc9e9840}', '{\\x00,\\x01,\\x02}'); INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + +CREATE TABLE "Orders" ( + id int PRIMARY KEY, + name varchar, +); +INSERT INTO "Orders" VALUES (1, 'happy'); From 4c3b194c65c35d3e635bccb0930f8f3ac2c2fc1f Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 22 Jul 2024 19:30:16 -0400 Subject: [PATCH 6/8] fix test --- e2e_test/source/cdc/cdc.share_stream.slt | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 72c6ffda0d9d5..a0ae6510609ff 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -268,6 +268,7 @@ CREATE TABLE person_new ( city varchar ) FROM pg_source TABLE 'person'; +statement ok CREATE TABLE orders_shared ( id int PRIMARY KEY, name varchar, From c20713b2c754cdb7d2dc10fc4de33ed7197eff41 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 22 Jul 2024 19:55:55 -0400 Subject: [PATCH 7/8] fix test --- e2e_test/source/cdc/cdc.share_stream.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index a0ae6510609ff..314555e67a3fe 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -272,7 +272,7 @@ statement ok CREATE TABLE orders_shared ( id int PRIMARY KEY, name varchar, -) FROM pg_source TABLE 'Orders'; +) FROM pg_source TABLE 'public.Orders'; statement ok CREATE TABLE person_new ( From 8aa28b08ed3ef561190199cbef367e378490d14b Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 22 Jul 2024 20:26:24 -0400 Subject: [PATCH 8/8] fix test --- ci/docker-compose.yml | 2 +- e2e_test/source/cdc/cdc.check.slt | 2 +- e2e_test/source/cdc/cdc.load.slt | 4 ++-- e2e_test/source/cdc/cdc.share_stream.slt | 6 +++--- e2e_test/source/cdc/postgres_cdc.sql | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 66b3ea3eb4bc6..3ae6995514d26 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -12,7 +12,7 @@ services: interval: 5s timeout: 5s retries: 5 - command: [ "postgres", "-c", "wal_level=logical" ] + command: [ "postgres", "-c", "wal_level=logical", "-c", "max_replication_slots=30" ] mysql: image: mysql:8.0 diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index 446fc7f04b092..b0559d948c0ef 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -77,6 +77,6 @@ select * from enum_to_varchar order by id; 2 ok query II -select * from orders order by id; +select * from upper_orders order by id; ---- 1 happy diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 13c9c69ed7fa7..599394c8a0047 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -244,7 +244,7 @@ create table enum_to_varchar ( ); statement ok -create table orders ( +create table upper_orders ( id int, name varchar, PRIMARY KEY (id) @@ -255,6 +255,6 @@ create table orders ( username = '${PGUSER:$USER}', password = '${PGPASSWORD:}', database.name = '${PGDATABASE:postgres}', - table.name = 'Order', + table.name = 'Orders', slot.name = 'orders' ); diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 314555e67a3fe..b180a67c2cb2a 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -269,9 +269,9 @@ CREATE TABLE person_new ( ) FROM pg_source TABLE 'person'; statement ok -CREATE TABLE orders_shared ( +CREATE TABLE upper_orders_shared ( id int PRIMARY KEY, - name varchar, + name varchar ) FROM pg_source TABLE 'public.Orders'; statement ok @@ -517,6 +517,6 @@ select * from enum_to_varchar_shared order by id; 2 ok query II -select * from orders_shared order by id; +select * from upper_orders_shared order by id; ---- 1 happy diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 9dff91c179acc..20b6e7b414469 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -109,6 +109,6 @@ INSERT INTO list_with_null VALUES (5, NULL, NULL, NULL, NULL, NULL, NULL, NULL); CREATE TABLE "Orders" ( id int PRIMARY KEY, - name varchar, + name varchar ); INSERT INTO "Orders" VALUES (1, 'happy');