Skip to content

Commit

Permalink
fix(pg-cdc): check ancestors and descendants for pg partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Sep 23, 2024
1 parent bc65ffb commit e6f9aec
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 2 deletions.
60 changes: 60 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,66 @@ create table upper_orders (
slot.name = 'orders'
);

statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table_2023',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table' is already covered in the publication 'rw_publication_partition_2023'.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table_2023_h1' is already covered in the publication 'rw_publication_partition_2023'.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023_h1(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table_2023_h1',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement ok
DROP TABLE partitioned_timestamp_table_2023;

# for the partitioned table
statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
Expand Down
8 changes: 8 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ CREATE TABLE partitioned_timestamp_table_shared(
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table';

statement error The ancestor or descendant partition 'partitioned_timestamp_table' of the table partition 'partitioned_timestamp_table_2023' is already covered in the publication 'rw_publication'.
CREATE TABLE partitioned_timestamp_table_2023_shared(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table_2023';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;

Expand Down
8 changes: 7 additions & 1 deletion e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,20 @@ CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
) PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31');
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31') PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2024-01-01') TO ('2024-12-31');

CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2025-01-01') TO ('2025-12-31');

CREATE TABLE partitioned_timestamp_table_2023_h1 PARTITION OF partitioned_timestamp_table_2023
FOR VALUES FROM ('2023-01-01') TO ('2023-06-30');

CREATE TABLE partitioned_timestamp_table_2023_h2 PARTITION OF partitioned_timestamp_table_2023
FOR VALUES FROM ('2023-07-01') TO ('2024-12-31');

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(1, false, '2023-02-01 10:30:00'),
(2, false, '2023-05-15 11:45:00'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ public static void createPostgresPublicationIfNeeded(
ValidatorUtils.getSql("postgres.publication_exist"))) {
stmt.setString(1, pubName);
var res = stmt.executeQuery();
// Note: the value returned here is `pubviaroot`, If there's more than one row, the
// publication exists
if (res.next()) {
isPubExist = res.getBoolean(1);
isPubExist = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,68 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
+ "'` in the upstream Postgres to check.");
}

if (isPublicationExists) {
List<String> family = new ArrayList<>();
boolean findRoot = false;
String currentPartition = tableName;
System.out.println("WKXLOG before find root: " + family);

while (!findRoot) {
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_parent"))) {
stmt.setString(
1, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition));
stmt.setString(
2, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition));
stmt.setString(
3, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition));
var res = stmt.executeQuery();
if (res.next()) {
String parent = res.getString(1);
family.add(parent);
currentPartition = parent;
} else {
findRoot = true;
}
}
}
System.out.println("WKXLOG after find root: " + family);
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_descendants"))) {
stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName));
stmt.setString(2, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName));
var res = stmt.executeQuery();
while (res.next()) {
String descendant = res.getString(1);
family.add(descendant);
}
}

System.out.println("WKXLOG after find descendants: " + family);

for (String relative : family) {
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_in_publication.check"))) {
stmt.setString(1, schemaName);
stmt.setString(2, relative);
stmt.setString(3, pubName);
var res = stmt.executeQuery();
while (res.next()) {
if (res.getBoolean(1)) {
throw ValidatorUtils.invalidArgument(
String.format(
"The ancestor or descendant partition '%s' of the table partition '%s' is already covered in the publication '%s'. Please use a new publication for '%s'",
relative, tableName, pubName, tableName));
}
}
}
}
System.out.println("WKXLOG after publication.check: " + family);
}

// PG 15 and up supports partial publication of table
// check whether publication covers all columns of the table schema
if (isPartialPublicationEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE s
postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ?
postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?;
postgres.partition_parent=SELECT parentrelid FROM pg_partition_tree(?) WHERE relid = ?::regclass AND parentrelid <> ?::regclass
postgres.partition_descendants=SELECT relid FROM pg_partition_tree(?) WHERE relid <> ?::regclass
postgres.partition_in_publication.check=SELECT count(*) > 0 FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
Expand Down

0 comments on commit e6f9aec

Please sign in to comment.