Skip to content

Commit

Permalink
fix(pg-cdc): fix compatibility issue of publish_via_partition_root (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Sep 25, 2024
1 parent 45ef6c4 commit fdc5790
Showing 1 changed file with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,30 @@ public static void createPostgresPublicationIfNeeded(
Optional.of(quotePostgres(schemaName) + "." + quotePostgres(tableName));
}

String withClause = "";
try {
// Always create with `WITH ( publish_via_partition_root = true )` to ensure the
// CDC of partitioned tables can work correctly.
// `publish_via_partition_root` is not supported before v13. Luckily, before
// v13, user cannot add a partitioned table into a publication either.
if (jdbcConnection.getMetaData().getDatabaseMajorVersion() >= 13) {
withClause = " WITH ( publish_via_partition_root = true )";
}
} catch (SQLException e) {
throw ValidatorUtils.internalError(e.getMessage());
}

// create the publication if it doesn't exist
String createPublicationSql;
if (schemaTableName.isPresent()) {
createPublicationSql =
String.format(
"CREATE PUBLICATION %s FOR TABLE %s WITH ( publish_via_partition_root = true );",
quotePostgres(pubName), schemaTableName.get());
"CREATE PUBLICATION %s FOR TABLE %s%s;",
quotePostgres(pubName), schemaTableName.get(), withClause);
} else {
createPublicationSql =
String.format(
"CREATE PUBLICATION %s WITH ( publish_via_partition_root = true );",
quotePostgres(pubName));
"CREATE PUBLICATION %s%s;", quotePostgres(pubName), withClause);
}
try (var stmt = jdbcConnection.createStatement()) {
LOG.info(
Expand Down

0 comments on commit fdc5790

Please sign in to comment.