diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index a5804974fb29..faae0048649b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -146,21 +146,21 @@ public DbzConnectorConfig( // If cdc backfill enabled, the source only emit incremental changes, so we must // rewind to the given offset and continue binlog reading from there if (null != startOffset && !startOffset.isBlank()) { - mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); + mysqlProps.setProperty("snapshot.mode", "recovery"); mysqlProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } else { // read upstream table schemas and emit incremental changes only - mysqlProps.setProperty("snapshot.mode", "schema_only"); + mysqlProps.setProperty("snapshot.mode", "no_data"); } } else { // if snapshot phase is finished and offset is specified, we will continue binlog // reading from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - // 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is + // 'snapshot.mode=recovery' must be configured if binlog offset is // specified. It only snapshots the schemas, not the data, and continue binlog // reading from the specified offset - mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); + mysqlProps.setProperty("snapshot.mode", "recovery"); mysqlProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -180,7 +180,7 @@ public DbzConnectorConfig( } if (isCdcBackfill) { // skip the initial snapshot for cdc backfill - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); // if startOffset is specified, we should continue // reading changes from the given offset @@ -193,7 +193,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); postgresProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -224,7 +224,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); postgresProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -235,7 +235,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - mongodbProps.setProperty("snapshot.mode", "never"); + mongodbProps.setProperty("snapshot.mode", "no_data"); mongodbProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } diff --git a/java/connector-node/risingwave-source-cdc/pom.xml b/java/connector-node/risingwave-source-cdc/pom.xml index 5ee531ef805e..839de9c1f319 100644 --- a/java/connector-node/risingwave-source-cdc/pom.xml +++ b/java/connector-node/risingwave-source-cdc/pom.xml @@ -56,7 +56,8 @@ com.zendesk mysql-binlog-connector-java - 0.27.2 + + 0.29.0 org.postgresql diff --git a/java/pom.xml b/java/pom.xml index 644588c9d6b4..58f33e9da56d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -74,7 +74,7 @@ 2.11.0 1.10.0 3.12.0 - 2.4.2.Final + 2.6.2.Final 2.15.0 3.3.1 3.4.0