diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 084843672cb6f..31b016146e000 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -79,14 +79,19 @@ public PostgresValidator( @Override public void validateDbConfig() { - // TODO: check database server version - try (var stmt = jdbcConnection.createStatement()) { - // check whether wal has been enabled - var res = stmt.executeQuery(ValidatorUtils.getSql("postgres.wal")); - while (res.next()) { - if (!res.getString(1).equals("logical")) { - throw ValidatorUtils.invalidArgument( - "Postgres wal_level should be 'logical'.\nPlease modify the config and restart your Postgres server."); + try { + if (jdbcConnection.getMetaData().getDatabaseMajorVersion() > 16) { + throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16."); + } + + try (var stmt = jdbcConnection.createStatement()) { + // check whether wal has been enabled + var res = stmt.executeQuery(ValidatorUtils.getSql("postgres.wal")); + while (res.next()) { + if (!res.getString(1).equals("logical")) { + throw ValidatorUtils.invalidArgument( + "Postgres wal_level should be 'logical'.\nPlease modify the config and restart your Postgres server."); + } } } } catch (SQLException e) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties index 326138403d3b2..06c4210fcf468 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties @@ -17,6 +17,9 @@ publication.autocreate.mode=disabled publication.name=${publication.name:-rw_publication} # default heartbeat interval 5 mins heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000} +# emit a WAL message to the replication stream +# see https://github.com/risingwavelabs/risingwave/issues/16697 for more details +heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar) # In sharing cdc source mode, we will subscribe to multiple tables in the given database, # so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display. name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing} diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java index 477ae42899f16..5ccb24cef3ffc 100644 --- a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java @@ -145,7 +145,8 @@ public void testLines() throws Exception { int count = countResult.get(); LOG.info("number of cdc messages received: {}", count); try { - assertEquals(10000, count); + // 10000 rows plus one heartbeat message + assertTrue(count >= 10000); } catch (Exception e) { Assert.fail("validate rpc fail: " + e.getMessage()); } finally {