diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index a5804974fb29c..432fc13097538 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -146,21 +146,21 @@ public DbzConnectorConfig( // If cdc backfill enabled, the source only emit incremental changes, so we must // rewind to the given offset and continue binlog reading from there if (null != startOffset && !startOffset.isBlank()) { - mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); + mysqlProps.setProperty("snapshot.mode", "recovery"); mysqlProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } else { // read upstream table schemas and emit incremental changes only - mysqlProps.setProperty("snapshot.mode", "schema_only"); + mysqlProps.setProperty("snapshot.mode", "never"); } } else { // if snapshot phase is finished and offset is specified, we will continue binlog // reading from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - // 'snapshot.mode=schema_only_recovery' must be configured if binlog offset is + // 'snapshot.mode=recovery' must be configured if binlog offset is // specified. It only snapshots the schemas, not the data, and continue binlog // reading from the specified offset - mysqlProps.setProperty("snapshot.mode", "schema_only_recovery"); + mysqlProps.setProperty("snapshot.mode", "recovery"); mysqlProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -180,7 +180,7 @@ public DbzConnectorConfig( } if (isCdcBackfill) { // skip the initial snapshot for cdc backfill - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); // if startOffset is specified, we should continue // reading changes from the given offset @@ -193,7 +193,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); postgresProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -224,7 +224,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - postgresProps.setProperty("snapshot.mode", "never"); + postgresProps.setProperty("snapshot.mode", "no_data"); postgresProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } @@ -235,7 +235,7 @@ public DbzConnectorConfig( // if snapshot phase is finished and offset is specified, we will continue reading // changes from the given offset if (snapshotDone && null != startOffset && !startOffset.isBlank()) { - mongodbProps.setProperty("snapshot.mode", "never"); + mongodbProps.setProperty("snapshot.mode", "no_data"); mongodbProps.setProperty( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); }