From fdc57908b8622672f9f4f9ae68f3cf8feb56eb51 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Wed, 25 Sep 2024 09:56:37 -0500 Subject: [PATCH] fix(pg-cdc): fix compatibility issue of publish_via_partition_root (#18691) --- .../source/common/DbzSourceUtils.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index 6ab10bf18eb0..d333b00f0fdb 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -78,18 +78,30 @@ public static void createPostgresPublicationIfNeeded( Optional.of(quotePostgres(schemaName) + "." + quotePostgres(tableName)); } + String withClause = ""; + try { + // Always create with `WITH ( publish_via_partition_root = true )` to ensure the + // CDC of partitioned tables can work correctly. + // `publish_via_partition_root` is not supported before v13. Luckily, before + // v13, user cannot add a partitioned table into a publication either. + if (jdbcConnection.getMetaData().getDatabaseMajorVersion() >= 13) { + withClause = " WITH ( publish_via_partition_root = true )"; + } + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + // create the publication if it doesn't exist String createPublicationSql; if (schemaTableName.isPresent()) { createPublicationSql = String.format( - "CREATE PUBLICATION %s FOR TABLE %s WITH ( publish_via_partition_root = true );", - quotePostgres(pubName), schemaTableName.get()); + "CREATE PUBLICATION %s FOR TABLE %s%s;", + quotePostgres(pubName), schemaTableName.get(), withClause); } else { createPublicationSql = String.format( - "CREATE PUBLICATION %s WITH ( publish_via_partition_root = true );", - quotePostgres(pubName)); + "CREATE PUBLICATION %s%s;", quotePostgres(pubName), withClause); } try (var stmt = jdbcConnection.createStatement()) { LOG.info(