diff --git a/docs/en/concept/schema-evolution.md b/docs/en/concept/schema-evolution.md index b1db0573870..5de26ea94e2 100644 --- a/docs/en/concept/schema-evolution.md +++ b/docs/en/concept/schema-evolution.md @@ -1,7 +1,16 @@ # Schema evolution Schema Evolution means that the schema of a data table can be changed and the data synchronization task can automatically adapt to the changes of the new table structure without any other operations. -Now we only support the operation about `add column`、`drop column`、`rename column` and `modify column` of the table in CDC source. This feature is only support zeta engine at now. +## Supported engines + +- Zeta + +## Supported schema change event types + +- `ADD COLUMN` +- `DROP COLUMN` +- `RENAME COLUMN` +- `MODIFY COLUMN` ## Supported connectors @@ -21,7 +30,7 @@ When you use the Oracle-CDC,you can not use the username named `SYS` or `SYSTE Otherwise, If your table name start with `ORA_TEMP_` will also has the same problem. ## Enable schema evolution -Schema evolution is disabled by default in CDC source. You need configure `debezium.include.schema.changes = true` which is only supported in CDC to enable it. When you use Oracle-CDC with schema-evolution enabled, you must specify `redo_log_catalog` as `log.mining.strategy` in the `debezium` attribute. +Schema evolution is disabled by default in CDC source. You need configure `schema-changes.enabled = true` which is only supported in CDC to enable it. ## Examples @@ -43,9 +52,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } @@ -86,10 +94,8 @@ source { base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" source.reader.close.timeout = 120000 connection.pool.size = 1 - debezium { - include.schema.changes = true - log.mining.strategy = redo_log_catalog - } + + schema-changes.enabled = true } } @@ -131,10 +137,8 @@ source { base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" source.reader.close.timeout = 120000 connection.pool.size = 1 - debezium { - include.schema.changes = true - log.mining.strategy = redo_log_catalog - } + + schema-changes.enabled = true } } @@ -169,9 +173,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md index 564eb2356ce..59f9981d718 100644 --- a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md +++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md @@ -33,8 +33,6 @@ source { # include schema into kafka message key.converter.schemas.enable = false value.converter.schemas.enable = false - # include ddl - include.schema.changes = true # topic prefix database.server.name = "mysql_cdc_1" } diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index 29598551207..f2a68ae3b81 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -107,9 +107,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index cc58ec44596..0114d5c1d5b 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -196,7 +196,8 @@ When an initial consistent snapshot is made for large databases, your establishe | inverse-sampling.rate | Integer | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. | | exactly_once | Boolean | No | false | Enable exactly once semantic. | | format | Enum | No | DEFAULT | Optional output format for MySQL CDC, valid enumerations are `DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`. | -| debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/mysql.adoc#connector-properties) to Debezium Embedded Engine which is used to capture data changes from MySQL server. Schema evolution is disabled by default. You need configure `debezium.include.schema.changes = true` to enable it. Now we only support `add column`、`drop column`、`rename column` and `modify column`. | +| schema-changes.enabled | Boolean | No | false | Schema evolution is disabled by default. Now we only support `add column`、`drop column`、`rename column` and `modify column`. | +| debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/mysql.adoc#connector-properties) to Debezium Embedded Engine which is used to capture data changes from MySQL server. | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | ## Task Example @@ -281,9 +282,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } diff --git a/docs/en/connector-v2/source/Oracle-CDC.md b/docs/en/connector-v2/source/Oracle-CDC.md index 8e5c332bef0..28aeef2a50d 100644 --- a/docs/en/connector-v2/source/Oracle-CDC.md +++ b/docs/en/connector-v2/source/Oracle-CDC.md @@ -249,6 +249,7 @@ exit; | use_select_count | Boolean | No | false | Use select count for table count rather then other methods in full stage.In this scenario, select count directly is used when it is faster to update statistics using sql from analysis table | | skip_analyze | Boolean | No | false | Skip the analysis of table count in full stage.In this scenario, you schedule analysis table sql to update related table statistics periodically or your table data does not change frequently | | format | Enum | No | DEFAULT | Optional output format for Oracle CDC, valid enumerations are `DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`. | +| schema-changes.enabled | Boolean | No | false | Schema evolution is disabled by default. Now we only support `add column`、`drop column`、`rename column` and `modify column`. | | debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/oracle.adoc#connector-properties) to Debezium Embedded Engine which is used to capture data changes from Oracle server. | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | diff --git a/docs/zh/concept/schema-evolution.md b/docs/zh/concept/schema-evolution.md index f8770abed56..200259f518a 100644 --- a/docs/zh/concept/schema-evolution.md +++ b/docs/zh/concept/schema-evolution.md @@ -1,6 +1,16 @@ # 模式演进 模式演进是指数据表的Schema可以改变,数据同步任务可以自动适应新的表结构的变化而无需其他操作。 -现在我们只支持对CDC源中的表进行“添加列”、“删除列”、“重命名列”和“修改列”的操作。目前这个功能只支持zeta引擎。 + +## 已支持的引擎 + +- Zeta + +## 已支持的模式变更事件类型 + +- `ADD COLUMN` +- `DROP COLUMN` +- `RENAME COLUMN` +- `MODIFY COLUMN` ## 已支持的连接器 @@ -20,7 +30,7 @@ 另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。 ## 启用Schema evolution功能 -在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`debezium.include.schema.changes = true`来启用它。当你使用Oracle-CDC并且启用schema-evolution时,你必须将`debezium`属性中的`log.mining.strategy`指定为`redo_log_catalog`。 +在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`schema-changes.enabled = true`来启用它。 ## 示例 @@ -42,9 +52,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } @@ -85,10 +94,8 @@ source { base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" source.reader.close.timeout = 120000 connection.pool.size = 1 - debezium { - include.schema.changes = true - log.mining.strategy = redo_log_catalog - } + + schema-changes.enabled = true } } @@ -130,10 +137,8 @@ source { base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" source.reader.close.timeout = 120000 connection.pool.size = 1 - debezium { - include.schema.changes = true - log.mining.strategy = redo_log_catalog - } + + schema-changes.enabled = true } } @@ -168,9 +173,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } diff --git a/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md index 8febab18fb0..6c5b57b2787 100644 --- a/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md +++ b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md @@ -33,8 +33,6 @@ source { # include schema into kafka message key.converter.schemas.enable = false value.converter.schemas.enable = false - # include ddl - include.schema.changes = true # topic prefix database.server.name = "mysql_cdc_1" } diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md index 4d83dcb6c76..1faa5dc9b01 100644 --- a/docs/zh/connector-v2/sink/Paimon.md +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -105,9 +105,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java index 99ddb3bd175..87dd7d3a8f7 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java @@ -44,7 +44,6 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory tableList; protected StartupConfig startupConfig; protected StopConfig stopConfig; - protected boolean includeSchemaChanges = false; protected double distributionFactorUpper = JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(); protected double distributionFactorLower = @@ -60,6 +59,10 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory dbzProperties.putAll(map)); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java index 6c83088ef28..7fcd4d3448e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java @@ -107,6 +107,13 @@ public class SourceOptions { .defaultValue(false) .withDescription("Enable exactly once semantic."); + public static final Option SCHEMA_CHANGES_ENABLED = + Options.key("schema-changes.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Enable send schema change events, by default is false. If set to true, the schema changes will be sent to downstream."); + public static OptionRule.Builder getBaseRule() { return OptionRule.builder() .optional(FORMAT) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java index fd5d7deadfb..db63e4e4dce 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java @@ -31,7 +31,6 @@ /** A factory to initialize {@link MySqlSourceConfig}. */ public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory { public static final String SCHEMA_CHANGE_KEY = "include.schema.changes"; - public static final Boolean SCHEMA_CHANGE_DEFAULT = true; private ServerIdRange serverIdRange; @@ -78,9 +77,8 @@ public MySqlSourceConfig create(int subtaskId) { // Note: the includeSchemaChanges parameter is used to control emitting the schema record, // only DataStream API program need to emit the schema record, the Table API need not - // Some scenarios do not require automatic capture of table structure changes, so the - // default setting is true. - props.setProperty(SCHEMA_CHANGE_KEY, SCHEMA_CHANGE_DEFAULT.toString()); + // setting debezium capture mysql ddl + props.setProperty(SCHEMA_CHANGE_KEY, String.valueOf(schemaChangeEnabled)); // disable the offset flush totally props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); // disable tombstones diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index 8de399b587a..c11f9e72d4d 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -69,7 +69,8 @@ public OptionRule optionRule() { JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND, JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD, JdbcSourceOptions.INVERSE_SAMPLING_RATE, - JdbcSourceOptions.TABLE_NAMES_CONFIG) + JdbcSourceOptions.TABLE_NAMES_CONFIG, + JdbcSourceOptions.SCHEMA_CHANGES_ENABLED) .optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE) .conditional( MySqlSourceOptions.STARTUP_MODE, @@ -103,15 +104,25 @@ TableSource restoreSource( context.getOptions(), context.getClassLoader()); boolean enableSchemaChange = context.getOptions() - .getOptional(SourceOptions.DEBEZIUM_PROPERTIES) - .map( - e -> - e.getOrDefault( - MySqlSourceConfigFactory.SCHEMA_CHANGE_KEY, - MySqlSourceConfigFactory.SCHEMA_CHANGE_DEFAULT - .toString())) - .map(Boolean::parseBoolean) - .orElse(MySqlSourceConfigFactory.SCHEMA_CHANGE_DEFAULT); + .getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED) + .orElse( + // TODO remove this after all users used the new schema change + // option + context.getOptions() + .getOptional(SourceOptions.DEBEZIUM_PROPERTIES) + .map( + e -> + e.getOrDefault( + MySqlSourceConfigFactory + .SCHEMA_CHANGE_KEY, + SourceOptions + .SCHEMA_CHANGES_ENABLED + .defaultValue() + .toString())) + .map(Boolean::parseBoolean) + .orElse( + SourceOptions.SCHEMA_CHANGES_ENABLED + .defaultValue())); if (!restoreTables.isEmpty() && enableSchemaChange) { catalogTables = mergeTableStruct(catalogTables, restoreTables); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java index b08d4e4dad7..3786fb937c9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java @@ -37,7 +37,8 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory { private static final String DRIVER_CLASS_NAME = "oracle.jdbc.driver.OracleDriver"; public static final String SCHEMA_CHANGE_KEY = "include.schema.changes"; - public static final Boolean SCHEMA_CHANGE_DEFAULT = true; + public static final String LOG_MINING_STRATEGY_KEY = "log.mining.strategy"; + public static final String LOG_MINING_STRATEGY_DEFAULT = "online_catalog"; private List schemaList; @@ -94,17 +95,16 @@ public OracleSourceConfig create(int subtask) { props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true)); props.setProperty("database.history.refer.ddl", String.valueOf(true)); - // Some scenarios do not require automatic capture of table structure changes, so the - // default setting is true. - props.setProperty(SCHEMA_CHANGE_KEY, SCHEMA_CHANGE_DEFAULT.toString()); + // setting debezium capture oracle ddl + props.setProperty(SCHEMA_CHANGE_KEY, String.valueOf(schemaChangeEnabled)); + props.setProperty( + LOG_MINING_STRATEGY_KEY, + schemaChangeEnabled ? "redo_log_catalog" : LOG_MINING_STRATEGY_DEFAULT); props.setProperty("connect.timeout.ms", String.valueOf(connectTimeoutMillis)); // disable tombstones props.setProperty("tombstones.on.delete", String.valueOf(false)); - // Optimize logminer latency - props.setProperty("log.mining.strategy", "online_catalog"); - if (originUrl != null) { props.setProperty("database.url", originUrl); } else { @@ -139,6 +139,16 @@ public OracleSourceConfig create(int subtask) { // override the user-defined debezium properties if (dbzProperties != null) { + String debeziumSchemaChanges = + dbzProperties.getProperty( + SCHEMA_CHANGE_KEY, String.valueOf(schemaChangeEnabled)); + String debeziumLogMiningStrategy = + dbzProperties.getProperty(LOG_MINING_STRATEGY_KEY, LOG_MINING_STRATEGY_DEFAULT); + if (Boolean.parseBoolean(debeziumSchemaChanges) + && LOG_MINING_STRATEGY_DEFAULT.equals(debeziumLogMiningStrategy)) { + throw new IllegalArgumentException( + "Debezium log mining strategy must be set to redo_log_catalog when schema changes are enabled"); + } props.putAll(dbzProperties); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java index 80b4a0b3c00..eb602aa418c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java @@ -105,22 +105,12 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema( } String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE); - - boolean enableDDL = - Boolean.parseBoolean( - debeziumProperties.getOrDefault( - OracleSourceConfigFactory.SCHEMA_CHANGE_KEY, - OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT.toString())); - return (DebeziumDeserializationSchema) SeaTunnelRowDebeziumDeserializeSchema.builder() .setTables(catalogTables) .setServerTimeZone(ZoneId.of(zoneId)) .setSchemaChangeResolver( - enableDDL - ? new OracleSchemaChangeResolver( - createSourceConfigFactory(config)) - : null) + new OracleSchemaChangeResolver(createSourceConfigFactory(config))) .setTableIdTableChangeMap(tableIdStructMap) .build(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java index d790107cf1f..01690bc3dc9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java @@ -69,7 +69,8 @@ public OptionRule optionRule() { JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND, JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND, JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD, - JdbcSourceOptions.TABLE_NAMES_CONFIG) + JdbcSourceOptions.TABLE_NAMES_CONFIG, + JdbcSourceOptions.SCHEMA_CHANGES_ENABLED) .optional(OracleSourceOptions.STARTUP_MODE, OracleSourceOptions.STOP_MODE) .conditional( OracleSourceOptions.STARTUP_MODE, @@ -109,15 +110,25 @@ TableSource restoreSource( context.getOptions(), context.getClassLoader()); boolean enableSchemaChange = context.getOptions() - .getOptional(SourceOptions.DEBEZIUM_PROPERTIES) - .map( - e -> - e.getOrDefault( - OracleSourceConfigFactory.SCHEMA_CHANGE_KEY, - OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT - .toString())) - .map(Boolean::parseBoolean) - .orElse(OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT); + .getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED) + .orElse( + // TODO remove this after all users used the new schema change + // option + context.getOptions() + .getOptional(SourceOptions.DEBEZIUM_PROPERTIES) + .map( + e -> + e.getOrDefault( + OracleSourceConfigFactory + .SCHEMA_CHANGE_KEY, + SourceOptions + .SCHEMA_CHANGES_ENABLED + .defaultValue() + .toString())) + .map(Boolean::parseBoolean) + .orElse( + SourceOptions.SCHEMA_CHANGES_ENABLED + .defaultValue())); if (!restoreTables.isEmpty() && enableSchemaChange) { catalogTables = mergeTableStruct(catalogTables, restoreTables); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf index 632b643bb21..7e93474d5e7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf @@ -34,9 +34,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf index 8aa06c85bd9..275ecf4464d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf @@ -34,9 +34,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf index 58acb86f83f..70c9aedb4f6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf @@ -37,10 +37,11 @@ source { base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" source.reader.close.timeout = 120000 connection.pool.size = 1 + + schema-changes.enabled = true debezium { database.oracle.jdbc.timezoneAsRegion = false - include.schema.changes = true - log.mining.strategy = redo_log_catalog + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf index 80fcc8c7961..76903a6e007 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf @@ -37,10 +37,11 @@ source { base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" source.reader.close.timeout = 120000 connection.pool.size = 1 + + schema-changes.enabled = true debezium { database.oracle.jdbc.timezoneAsRegion = false - include.schema.changes = true - log.mining.strategy = redo_log_catalog + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf index 949e62ef710..9554a4fd49b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf @@ -37,9 +37,11 @@ source { base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" source.reader.close.timeout = 120000 connection.pool.size = 1 + + schema-changes.enabled = true debezium { database.oracle.jdbc.timezoneAsRegion = false - include.schema.changes = true + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf index 68102192a0a..c8353c9e4c6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf @@ -27,13 +27,9 @@ env { source { MySQL-CDC { plugin_output="customer_result_table" - catalog { - factory = Mysql - } - debezium = { - # include ddl - "include.schema.changes" = true - } + + schema-changes.enabled = true + database-names=["mysql_cdc"] table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] format=DEFAULT diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf index 714c4be81c0..a214430dd02 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf @@ -34,9 +34,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf index ba3c03db1e1..76d86a4e8ca 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf @@ -32,9 +32,8 @@ source { password = "mysqlpw" table-names = ["shop.products"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" - debezium = { - include.schema.changes = true - } + + schema-changes.enabled = true } }