From e6f9aec4a12ef3fbc9df8c67bbf5b7abb236f9a9 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Sun, 22 Sep 2024 23:20:32 -0400 Subject: [PATCH] fix(pg-cdc): check ancestors and descendants for pg partitions --- e2e_test/source/cdc/cdc.load.slt | 60 ++++++++++++++++++ e2e_test/source/cdc/cdc.share_stream.slt | 8 +++ e2e_test/source/cdc/postgres_cdc.sql | 8 ++- .../source/common/DbzSourceUtils.java | 4 +- .../source/common/PostgresValidator.java | 62 +++++++++++++++++++ .../main/resources/validate_sql.properties | 3 + 6 files changed, 143 insertions(+), 2 deletions(-) diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index a50a0b7c9458d..71b3ff05694e8 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -259,6 +259,66 @@ create table upper_orders ( slot.name = 'orders' ); +statement ok +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table_2023', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table' is already covered in the publication 'rw_publication_partition_2023'. +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table_2023_h1' is already covered in the publication 'rw_publication_partition_2023'. +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023_h1( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table_2023_h1', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement ok +DROP TABLE partitioned_timestamp_table_2023; + # for the partitioned table statement ok CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 83840239b623a..c8fcfecb5a8b0 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -296,6 +296,14 @@ CREATE TABLE partitioned_timestamp_table_shared( PRIMARY KEY (c_int, c_timestamp) ) from pg_source table 'public.partitioned_timestamp_table'; +statement error The ancestor or descendant partition 'partitioned_timestamp_table' of the table partition 'partitioned_timestamp_table_2023' is already covered in the publication 'rw_publication'. +CREATE TABLE partitioned_timestamp_table_2023_shared( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) from pg_source table 'public.partitioned_timestamp_table_2023'; + statement ok CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index f80dfa267432b..e3e40ab6514c9 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -122,7 +122,7 @@ CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( ) PARTITION BY RANGE (c_timestamp); CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table - FOR VALUES FROM ('2023-01-01') TO ('2023-12-31'); + FOR VALUES FROM ('2023-01-01') TO ('2023-12-31') PARTITION BY RANGE (c_timestamp); CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table FOR VALUES FROM ('2024-01-01') TO ('2024-12-31'); @@ -130,6 +130,12 @@ CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table FOR VALUES FROM ('2025-01-01') TO ('2025-12-31'); +CREATE TABLE partitioned_timestamp_table_2023_h1 PARTITION OF partitioned_timestamp_table_2023 + FOR VALUES FROM ('2023-01-01') TO ('2023-06-30'); + +CREATE TABLE partitioned_timestamp_table_2023_h2 PARTITION OF partitioned_timestamp_table_2023 + FOR VALUES FROM ('2023-07-01') TO ('2024-12-31'); + INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES (1, false, '2023-02-01 10:30:00'), (2, false, '2023-05-15 11:45:00'), diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index 6ab10bf18eb04..e0670cb7cb8ef 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -62,8 +62,10 @@ public static void createPostgresPublicationIfNeeded( ValidatorUtils.getSql("postgres.publication_exist"))) { stmt.setString(1, pubName); var res = stmt.executeQuery(); + // Note: the value returned here is `pubviaroot`, If there's more than one row, the + // publication exists if (res.next()) { - isPubExist = res.getBoolean(1); + isPubExist = true; } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 9d5b65c1d73af..5404ec6804d67 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -397,6 +397,68 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException + "'` in the upstream Postgres to check."); } + if (isPublicationExists) { + List family = new ArrayList<>(); + boolean findRoot = false; + String currentPartition = tableName; + System.out.println("WKXLOG before find root: " + family); + + while (!findRoot) { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_parent"))) { + stmt.setString( + 1, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); + stmt.setString( + 2, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); + stmt.setString( + 3, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); + var res = stmt.executeQuery(); + if (res.next()) { + String parent = res.getString(1); + family.add(parent); + currentPartition = parent; + } else { + findRoot = true; + } + } + } + System.out.println("WKXLOG after find root: " + family); + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_descendants"))) { + stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); + stmt.setString(2, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); + var res = stmt.executeQuery(); + while (res.next()) { + String descendant = res.getString(1); + family.add(descendant); + } + } + + System.out.println("WKXLOG after find descendants: " + family); + + for (String relative : family) { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_in_publication.check"))) { + stmt.setString(1, schemaName); + stmt.setString(2, relative); + stmt.setString(3, pubName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getBoolean(1)) { + throw ValidatorUtils.invalidArgument( + String.format( + "The ancestor or descendant partition '%s' of the table partition '%s' is already covered in the publication '%s'. Please use a new publication for '%s'", + relative, tableName, pubName, tableName)); + } + } + } + } + System.out.println("WKXLOG after publication.check: " + family); + } + // PG 15 and up supports partial publication of table // check whether publication covers all columns of the table schema if (isPartialPublicationEnabled) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 0288b7b37fcac..209da1f0ed56d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -20,6 +20,9 @@ postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE s postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ? postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?; +postgres.partition_parent=SELECT parentrelid FROM pg_partition_tree(?) WHERE relid = ?::regclass AND parentrelid <> ?::regclass +postgres.partition_descendants=SELECT relid FROM pg_partition_tree(?) WHERE relid <> ?::regclass +postgres.partition_in_publication.check=SELECT count(*) > 0 FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \ SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \ INNER JOIN pg_roles r1 ON r1.oid = am.roleid \