Skip to content

Commit

Permalink
[Feature][CDC] Add 'schema-changes.enabled' options (apache#8252)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Dec 10, 2024
1 parent 2f3c54c commit d783f94
Show file tree
Hide file tree
Showing 23 changed files with 144 additions and 115 deletions.
35 changes: 19 additions & 16 deletions docs/en/concept/schema-evolution.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
# Schema evolution
Schema Evolution means that the schema of a data table can be changed and the data synchronization task can automatically adapt to the changes of the new table structure without any other operations.
Now we only support the operation about `add column``drop column``rename column` and `modify column` of the table in CDC source. This feature is only support zeta engine at now.

## Supported engines

- Zeta

## Supported schema change event types

- `ADD COLUMN`
- `DROP COLUMN`
- `RENAME COLUMN`
- `MODIFY COLUMN`

## Supported connectors

Expand All @@ -21,7 +30,7 @@ When you use the Oracle-CDC,you can not use the username named `SYS` or `SYSTE
Otherwise, If your table name start with `ORA_TEMP_` will also has the same problem.

## Enable schema evolution
Schema evolution is disabled by default in CDC source. You need configure `debezium.include.schema.changes = true` which is only supported in CDC to enable it. When you use Oracle-CDC with schema-evolution enabled, you must specify `redo_log_catalog` as `log.mining.strategy` in the `debezium` attribute.
Schema evolution is disabled by default in CDC source. You need configure `schema-changes.enabled = true` which is only supported in CDC to enable it.

## Examples

Expand All @@ -43,9 +52,8 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
}
schema-changes.enabled = true
}
}
Expand Down Expand Up @@ -86,10 +94,8 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
log.mining.strategy = redo_log_catalog
}
schema-changes.enabled = true
}
}
Expand Down Expand Up @@ -131,10 +137,8 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
log.mining.strategy = redo_log_catalog
}
schema-changes.enabled = true
}
}
Expand Down Expand Up @@ -169,9 +173,8 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
}
schema-changes.enabled = true
}
}
Expand Down
2 changes: 0 additions & 2 deletions docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ source {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
# include ddl
include.schema.changes = true
# topic prefix
database.server.name = "mysql_cdc_1"
}
Expand Down
5 changes: 2 additions & 3 deletions docs/en/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
}
schema-changes.enabled = true
}
}
Expand Down
8 changes: 4 additions & 4 deletions docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ When an initial consistent snapshot is made for large databases, your establishe
| inverse-sampling.rate | Integer | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. |
| exactly_once | Boolean | No | false | Enable exactly once semantic. |
| format | Enum | No | DEFAULT | Optional output format for MySQL CDC, valid enumerations are `DEFAULT``COMPATIBLE_DEBEZIUM_JSON`. |
| debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/mysql.adoc#connector-properties) to Debezium Embedded Engine which is used to capture data changes from MySQL server. Schema evolution is disabled by default. You need configure `debezium.include.schema.changes = true` to enable it. Now we only support `add column``drop column``rename column` and `modify column`. |
| schema-changes.enabled | Boolean | No | false | Schema evolution is disabled by default. Now we only support `add column``drop column``rename column` and `modify column`. |
| debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/mysql.adoc#connector-properties) to Debezium Embedded Engine which is used to capture data changes from MySQL server. |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details |

## Task Example
Expand Down Expand Up @@ -281,9 +282,8 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
}
schema-changes.enabled = true
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/Oracle-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ exit;
| use_select_count | Boolean | No | false | Use select count for table count rather then other methods in full stage.In this scenario, select count directly is used when it is faster to update statistics using sql from analysis table |
| skip_analyze | Boolean | No | false | Skip the analysis of table count in full stage.In this scenario, you schedule analysis table sql to update related table statistics periodically or your table data does not change frequently |
| format | Enum | No | DEFAULT | Optional output format for Oracle CDC, valid enumerations are `DEFAULT``COMPATIBLE_DEBEZIUM_JSON`. |
| schema-changes.enabled | Boolean | No | false | Schema evolution is disabled by default. Now we only support `add column``drop column``rename column` and `modify column`. |
| debezium | Config | No | - | Pass-through [Debezium's properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/oracle.adoc#connector-properties) to Debezium Embedded Engine which is used to capture data changes from Oracle server. |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details |

Expand Down
36 changes: 20 additions & 16 deletions docs/zh/concept/schema-evolution.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# 模式演进
模式演进是指数据表的Schema可以改变,数据同步任务可以自动适应新的表结构的变化而无需其他操作。
现在我们只支持对CDC源中的表进行“添加列”、“删除列”、“重命名列”和“修改列”的操作。目前这个功能只支持zeta引擎。

## 已支持的引擎

- Zeta

## 已支持的模式变更事件类型

- `ADD COLUMN`
- `DROP COLUMN`
- `RENAME COLUMN`
- `MODIFY COLUMN`

## 已支持的连接器

Expand All @@ -20,7 +30,7 @@
另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。

## 启用Schema evolution功能
在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`debezium.include.schema.changes = true`来启用它。当你使用Oracle-CDC并且启用schema-evolution时,你必须将`debezium`属性中的`log.mining.strategy`指定为`redo_log_catalog`
在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`schema-changes.enabled = true`来启用它。

## 示例

Expand All @@ -42,9 +52,8 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
}
schema-changes.enabled = true
}
}
Expand Down Expand Up @@ -85,10 +94,8 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
log.mining.strategy = redo_log_catalog
}
schema-changes.enabled = true
}
}
Expand Down Expand Up @@ -130,10 +137,8 @@ source {
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
log.mining.strategy = redo_log_catalog
}
schema-changes.enabled = true
}
}
Expand Down Expand Up @@ -168,9 +173,8 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
}
schema-changes.enabled = true
}
}
Expand Down
2 changes: 0 additions & 2 deletions docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ source {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
# include ddl
include.schema.changes = true
# topic prefix
database.server.name = "mysql_cdc_1"
}
Expand Down
5 changes: 2 additions & 3 deletions docs/zh/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ source {
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
}
schema-changes.enabled = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
protected List<String> tableList;
protected StartupConfig startupConfig;
protected StopConfig stopConfig;
protected boolean includeSchemaChanges = false;
protected double distributionFactorUpper =
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
protected double distributionFactorLower =
Expand All @@ -60,6 +59,10 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
protected int connectMaxRetries = JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
@Setter protected boolean exactlyOnce = JdbcSourceOptions.EXACTLY_ONCE.defaultValue();

@Setter
protected boolean schemaChangeEnabled = JdbcSourceOptions.SCHEMA_CHANGES_ENABLED.defaultValue();

protected Properties dbzProperties;

/** String hostname of the database server. */
Expand Down Expand Up @@ -210,8 +213,8 @@ public JdbcSourceConfigFactory connectMaxRetries(int connectMaxRetries) {
}

/** Whether the {@link SourceConfig} should output the schema changes or not. */
public JdbcSourceConfigFactory includeSchemaChanges(boolean includeSchemaChanges) {
this.includeSchemaChanges = includeSchemaChanges;
public JdbcSourceConfigFactory schemaChangeEnabled(boolean schemaChangeEnabled) {
this.schemaChangeEnabled = schemaChangeEnabled;
return this;
}

Expand Down Expand Up @@ -264,6 +267,7 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
this.connectMaxRetries = config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
this.connectionPoolSize = config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
this.exactlyOnce = config.get(JdbcSourceOptions.EXACTLY_ONCE);
this.schemaChangeEnabled = config.get(JdbcSourceOptions.SCHEMA_CHANGES_ENABLED);
this.dbzProperties = new Properties();
config.getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
.ifPresent(map -> dbzProperties.putAll(map));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ public class SourceOptions {
.defaultValue(false)
.withDescription("Enable exactly once semantic.");

public static final Option<Boolean> SCHEMA_CHANGES_ENABLED =
Options.key("schema-changes.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Enable send schema change events, by default is false. If set to true, the schema changes will be sent to downstream.");

public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
.optional(FORMAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
/** A factory to initialize {@link MySqlSourceConfig}. */
public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
public static final String SCHEMA_CHANGE_KEY = "include.schema.changes";
public static final Boolean SCHEMA_CHANGE_DEFAULT = true;

private ServerIdRange serverIdRange;

Expand Down Expand Up @@ -78,9 +77,8 @@ public MySqlSourceConfig create(int subtaskId) {
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
// only DataStream API program need to emit the schema record, the Table API need not

// Some scenarios do not require automatic capture of table structure changes, so the
// default setting is true.
props.setProperty(SCHEMA_CHANGE_KEY, SCHEMA_CHANGE_DEFAULT.toString());
// setting debezium capture mysql ddl
props.setProperty(SCHEMA_CHANGE_KEY, String.valueOf(schemaChangeEnabled));
// disable the offset flush totally
props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
// disable tombstones
Expand Down
Loading

0 comments on commit d783f94

Please sign in to comment.