diff --git a/docs/content/docs/connectors/table/formats/debezium.md b/docs/content/docs/connectors/table/formats/debezium.md index 571992a214885..790196e258800 100644 --- a/docs/content/docs/connectors/table/formats/debezium.md +++ b/docs/content/docs/connectors/table/formats/debezium.md @@ -44,7 +44,7 @@ However, currently Flink can't combine UPDATE_BEFORE and UPDATE_AFTER into a sin Dependencies ------------ -#### Debezium Avro +#### Debezium Confluent Avro {{< sql_download_table "debezium-avro-confluent" >}} @@ -85,7 +85,9 @@ Debezium provides a unified format for changelog, here is a simple example for a *Note: please refer to [Debezium documentation](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium) about the meaning of each fields.* The MySQL `products` table has 4 columns (`id`, `name`, `description` and `weight`). The above JSON message is an update change event on the `products` table where the `weight` value of the row with `id = 111` is changed from `5.18` to `5.15`. -Assuming this messages is synchronized to Kafka topic `products_binlog`, then we can use the following DDL to consume this topic and interpret the change events. +Assuming this messages is synchronized to Kafka topic `products_binlog`, then we can use the following DDLs (for Debezium JSON and Debezium Confluent Avro) to consume this topic and interpret the change events. + +#### Debezium JSON DDL ```sql CREATE TABLE topic_products ( @@ -100,7 +102,6 @@ CREATE TABLE topic_products ( 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', -- using 'debezium-json' as the format to interpret Debezium JSON messages - -- please use 'debezium-avro-confluent' if Debezium encodes messages in Avro format 'format' = 'debezium-json' ) ``` @@ -133,7 +134,30 @@ In some cases, users may setup the Debezium Kafka Connect with the Kafka configu In order to interpret such messages, you need to add the option `'debezium-json.schema-include' = 'true'` into above DDL WITH clause (`false` by default). Usually, this is not recommended to include schema because this makes the messages very verbose and reduces parsing performance. -After registering the topic as a Flink table, then you can consume the Debezium messages as a changelog source. +#### Debezium Confluent Avro DDL + +```sql +CREATE TABLE topic_products ( + -- schema is totally the same to the MySQL "products" table + id BIGINT, + name STRING, + description STRING, + weight DECIMAL(10, 2) +) WITH ( + 'connector' = 'kafka', + 'topic' = 'products_binlog', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + -- using 'debezium-avro-confluent' as the format to interpret Debezium Avro messages + 'format' = 'debezium-avro-confluent', + -- the URL to the schema registry for Kafka + 'debezium-avro-confluent.url' = 'http://localhost:8081' +) +``` + +#### Producing Results + +For every data format, after registering the topic as a Flink table, you can consume the Debezium messages as a changelog source. ```sql -- a real-time materialized view on the MySQL "products" @@ -234,8 +258,8 @@ CREATE TABLE KafkaTable ( Format Options ---------------- -Flink provides `debezium-avro-confluent` and `debezium-json` formats to interpret Avro or Json messages produced by Debezium. -Use format `debezium-avro-confluent` to interpret Debezium Avro messages and format `debezium-json` to interpret Debezium Json messages. +Flink provides `debezium-avro-confluent` and `debezium-json` formats to interpret Avro or JSON messages produced by Debezium. +Use format `debezium-avro-confluent` to interpret Debezium Avro messages and format `debezium-json` to interpret Debezium JSON messages. {{< tabs "a8edce02-58d5-4e0b-bc4b-75d05a98a0f9" >}} {{< tab "Debezium Avro" >}}