Skip to content

Commit

Permalink
fix(pg-cdc): check ancestors and descendants for pg partitions (#18648)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Sep 24, 2024
1 parent c467dcd commit c7975e0
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 20 deletions.
60 changes: 60 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,66 @@ create table upper_orders (
slot.name = 'orders'
);

statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table_2023',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table' is already covered in the publication 'rw_publication_partition_2023'.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table_2023_h1' is already covered in the publication 'rw_publication_partition_2023'.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023_h1(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table_2023_h1',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement ok
DROP TABLE partitioned_timestamp_table_2023;

# for the partitioned table
statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
Expand Down
8 changes: 8 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ CREATE TABLE partitioned_timestamp_table_shared(
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table';

statement error The ancestor or descendant partition 'partitioned_timestamp_table' of the table partition 'partitioned_timestamp_table_2023' is already covered in the publication 'rw_publication'.
CREATE TABLE partitioned_timestamp_table_2023_shared(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table_2023';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;

Expand Down
8 changes: 7 additions & 1 deletion e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,20 @@ CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
) PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31');
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31') PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2024-01-01') TO ('2024-12-31');

CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2025-01-01') TO ('2025-12-31');

CREATE TABLE partitioned_timestamp_table_2023_h1 PARTITION OF partitioned_timestamp_table_2023
FOR VALUES FROM ('2023-01-01') TO ('2023-06-30');

CREATE TABLE partitioned_timestamp_table_2023_h2 PARTITION OF partitioned_timestamp_table_2023
FOR VALUES FROM ('2023-07-01') TO ('2024-12-31');

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(1, false, '2023-02-01 10:30:00'),
(2, false, '2023-05-15 11:45:00'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ public static void createPostgresPublicationIfNeeded(
ValidatorUtils.getSql("postgres.publication_exist"))) {
stmt.setString(1, pubName);
var res = stmt.executeQuery();
// Note: the value returned here is `pubviaroot`, If there's more than one row, the
// publication exists
if (res.next()) {
isPubExist = true;
isPubExist = res.getBoolean(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl
// Whether the properties to validate is shared by multiple tables.
// If true, we will skip validation check for table
private final boolean isCdcSourceJob;
private final int pgVersion;

public PostgresValidator(
Map<String, String> userProps, TableSchema tableSchema, boolean isCdcSourceJob)
Expand Down Expand Up @@ -75,12 +76,17 @@ public PostgresValidator(
this.pubAutoCreate =
userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true");
this.isCdcSourceJob = isCdcSourceJob;
try {
this.pgVersion = jdbcConnection.getMetaData().getDatabaseMajorVersion();
} catch (SQLException e) {
throw ValidatorUtils.internalError(e.getMessage());
}
}

@Override
public void validateDbConfig() {
try {
if (jdbcConnection.getMetaData().getDatabaseMajorVersion() > 16) {
if (pgVersion > 16) {
throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16.");
}

Expand Down Expand Up @@ -331,7 +337,6 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
private void validatePublicationConfig(boolean isSuperUser) throws SQLException {
boolean isPublicationCoversTable = false;
boolean isPublicationExists = false;
boolean isPublicationViaRoot = false;
boolean isPartialPublicationEnabled = false;

// Check whether publication exists
Expand All @@ -341,8 +346,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
stmt.setString(1, pubName);
var res = stmt.executeQuery();
while (res.next()) {
isPublicationViaRoot = res.getBoolean(1);
isPublicationExists = true;
isPublicationExists = res.getBoolean(1);
}
}

Expand Down Expand Up @@ -384,17 +388,97 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
partitions.add(res.getString(1));
}
}
if (!partitions.isEmpty() && isPublicationExists && !isPublicationViaRoot) {
// make sure the publication are created with `publish_via_partition_root = true`, which
// is required by partitioned tables.
throw ValidatorUtils.invalidArgument(
"Table '"
+ tableName
+ "' has partitions, which requires publication '"
+ pubName
+ "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '"
+ pubName
+ "'` in the upstream Postgres to check.");

if (!partitions.isEmpty() && isPublicationExists) {
// `pubviaroot` in `pg_publication` is added after PG v13, before which PG does not
// allow adding partitioned table to a publication. So here, if partitions.isEmpty() is
// false, which means the PG version is >= v13, we can safely check the value of
// `pubviaroot` of the publication here.
boolean isPublicationViaRoot = false;
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.publication_pubviaroot"))) {
stmt.setString(1, pubName);
var res = stmt.executeQuery();
if (res.next()) {
isPublicationViaRoot = res.getBoolean(1);
}
}
if (!isPublicationViaRoot) {
// Make sure the publication are created with `publish_via_partition_root = true`,
// which is required by partitioned tables.
throw ValidatorUtils.invalidArgument(
"Table '"
+ tableName
+ "' has partitions, which requires publication '"
+ pubName
+ "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '"
+ pubName
+ "'` in the upstream Postgres to check.");
}
}
// Only after v13, PG allows adding a partitioned table to a publication. So, if the
// version is before v13, the tables in a publication are always partition leaves, we don't
// check their ancestors and descendants anymore.
if (isPublicationExists && pgVersion >= 13) {
List<String> family = new ArrayList<>();
boolean findRoot = false;
String currentPartition = tableName;
while (!findRoot) {
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_parent"))) {
String schemaPartitionName =
String.format("\"%s\".\"%s\"", this.schemaName, currentPartition);
stmt.setString(1, schemaPartitionName);
stmt.setString(2, schemaPartitionName);
stmt.setString(3, schemaPartitionName);
var res = stmt.executeQuery();
if (res.next()) {
String parent = res.getString(1);
family.add(parent);
currentPartition = parent;
} else {
findRoot = true;
}
}
}
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_descendants"))) {
String schemaTableName =
String.format("\"%s\".\"%s\"", this.schemaName, this.tableName);
stmt.setString(1, schemaTableName);
stmt.setString(2, schemaTableName);
var res = stmt.executeQuery();
while (res.next()) {
String descendant = res.getString(1);
family.add(descendant);
}
}
// The check here was added based on experimental observations. We found that if a table
// is added to a publication where its ancestor or descendant is already included, the
// table cannot be read data from the slot correctly. Therefore, we must verify whether
// its ancestors or descendants are already in the publication. If yes, we deny the
// request.
for (String relative : family) {
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_in_publication.check"))) {
stmt.setString(1, schemaName);
stmt.setString(2, relative);
stmt.setString(3, pubName);
var res = stmt.executeQuery();
while (res.next()) {
if (res.getBoolean(1)) {
throw ValidatorUtils.invalidArgument(
String.format(
"The ancestor or descendant partition '%s' of the table partition '%s' is already covered in the publication '%s'. Please use a new publication for '%s'",
relative, tableName, pubName, tableName));
}
}
}
}
}

// PG 15 and up supports partial publication of table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schem
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ?
postgres.publication_exist=SELECT count(*) > 0 from pg_publication WHERE pubname = ?
postgres.publication_pubviaroot=SELECT pubviaroot from pg_publication WHERE pubname = ?
postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?;
postgres.partition_parent=SELECT parentrelid FROM pg_partition_tree(?) WHERE relid = ?::regclass AND parentrelid <> ?::regclass
postgres.partition_descendants=SELECT relid FROM pg_partition_tree(?) WHERE relid <> ?::regclass
postgres.partition_in_publication.check=SELECT count(*) > 0 FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
Expand Down

0 comments on commit c7975e0

Please sign in to comment.