Skip to content

Commit

Permalink
fix(cdc): add heartbeat action query to solve WAL accumulation on AWS (
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored May 24, 2024
1 parent e7c1035 commit 1ba0d5c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,19 @@ public PostgresValidator(

@Override
public void validateDbConfig() {
// TODO: check database server version
try (var stmt = jdbcConnection.createStatement()) {
// check whether wal has been enabled
var res = stmt.executeQuery(ValidatorUtils.getSql("postgres.wal"));
while (res.next()) {
if (!res.getString(1).equals("logical")) {
throw ValidatorUtils.invalidArgument(
"Postgres wal_level should be 'logical'.\nPlease modify the config and restart your Postgres server.");
try {
if (jdbcConnection.getMetaData().getDatabaseMajorVersion() > 16) {
throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16.");
}

try (var stmt = jdbcConnection.createStatement()) {
// check whether wal has been enabled
var res = stmt.executeQuery(ValidatorUtils.getSql("postgres.wal"));
while (res.next()) {
if (!res.getString(1).equals("logical")) {
throw ValidatorUtils.invalidArgument(
"Postgres wal_level should be 'logical'.\nPlease modify the config and restart your Postgres server.");
}
}
}
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ publication.autocreate.mode=disabled
publication.name=${publication.name:-rw_publication}
# default heartbeat interval 5 mins
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000}
# emit a WAL message to the replication stream
# see https://github.com/risingwavelabs/risingwave/issues/16697 for more details
heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)
# In sharing cdc source mode, we will subscribe to multiple tables in the given database,
# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display.
name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public void testLines() throws Exception {
int count = countResult.get();
LOG.info("number of cdc messages received: {}", count);
try {
assertEquals(10000, count);
// 10000 rows plus one heartbeat message
assertTrue(count >= 10000);
} catch (Exception e) {
Assert.fail("validate rpc fail: " + e.getMessage());
} finally {
Expand Down

0 comments on commit 1ba0d5c

Please sign in to comment.