Skip to content

Latest commit

 

History

History
370 lines (330 loc) · 11.9 KB

sqlserver-cdc.md

File metadata and controls

370 lines (330 loc) · 11.9 KB

SQLServer CDC Connector

The SQLServer CDC connector allows for reading snapshot data and incremental data from SQLServer database. This document describes how to setup the SQLServer CDC connector to run SQL queries against SQLServer databases.

Dependencies

In order to setup the SQLServer CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-sqlserver-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.3-SNAPSHOT</version>
</dependency>

SQL Client JAR

Download link is available only for stable releases.

Download flink-sql-connector-sqlserver-cdc-2.3-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/.

Note: flink-sql-connector-sqlserver-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-sqlserver-cdc-XXX.jar, the released version will be available in the Maven central warehouse.

How to create a SQLServer CDC table

The SqlServer CDC table can be defined as following:

-- register a SqlServer table 'orders' in Flink SQL
CREATE TABLE orders (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'localhost',
    'port' = '1433',
    'username' = 'sa',
    'password' = 'Password!',
    'database-name' = 'inventory',
    'schema-name' = 'dbo',
    'table-name' = 'orders'
);

-- read snapshot and binlogs from orders table
SELECT * FROM orders;

Connector Options

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'sqlserver-cdc'.
hostname required (none) String IP address or hostname of the SQLServer database.
username required (none) String Username to use when connecting to the SQLServer database.
password required (none) String Password to use when connecting to the SQLServer database.
database-name required (none) String Database name of the SQLServer database to monitor.
schema-name required (none) String Schema name of the SQLServer database to monitor.
table-name required (none) String Table name of the SQLServer database to monitor.
port optional 1433 Integer Integer port number of the SQLServer database.
server-time-zone optional UTC String The session time zone in database server, e.g. "Asia/Shanghai".
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SQLServer. For example: 'debezium.snapshot.mode' = 'initial_only'. See more about the Debezium's SQLServer Connector properties

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Key DataType Description
table_name STRING NOT NULL Name of the table that contain the row.
schema_name STRING NOT NULL Name of the schema that contain the row.
database_name STRING NOT NULL Name of the database that contain the row.
op_ts TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.

Limitation

Can't perform checkpoint during scanning snapshot of tables

During scanning snapshot of database tables, since there is no recoverable position, we can't perform checkpoints. In order to not perform checkpoints, SqlServer CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE products (
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    schema_name STRING METADATA  FROM 'schema_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    id INT NOT NULL,
    name STRING,
    description STRING,
    weight DECIMAL(10,3)
) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'localhost',
    'port' = '1433',
    'username' = 'sa',
    'password' = 'Password!',
    'database-name' = 'inventory',
    'schema-name' = 'dbo',
    'table-name' = 'products'
);

Features

Exactly-Once Processing

The SQLServer CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with exactly-once processing even failures happen. Please read How the connector works.

Startup Reading Position

The config option scan.startup.mode specifies the startup mode for SQLServer CDC consumer. The valid enumerations are:

  • initial (default): Takes a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables.
  • initial_only: Takes a snapshot of structure and data like initial but instead does not transition into streaming changes once the snapshot has completed.
  • latest_offset: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics.

Note: the mechanism of scan.startup.mode option relying on Debezium's snapshot.mode configuration. So please do not use them together. If you specific both scan.startup.mode and debezium.snapshot.mode options in the table DDL, it may make scan.startup.mode doesn't work.

Single Thread Reading

The SQLServer CDC source can't work in parallel reading, because there is only one task can receive change events.

DataStream Source

The SQLServer CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;

public class SqlServerSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
      .hostname("localhost")
      .port(1433)
      .database("sqlserver") // monitor sqlserver database
      .tableList("dbo.products") // monitor products table
      .username("sa")
      .password("Password!")
      .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}

Note: Please refer Deserialization for more details about the JSON deserialization.

Data Type Mapping

SQLServer type Flink SQL type
char(n) CHAR(n)
varchar(n)
nvarchar(n)
nchar(n)
VARCHAR(n)
text
ntext
xml
STRING
decimal(p, s)
money
smallmoney
DECIMAL(p, s)
numeric NUMERIC
float
real
FLOAT
bit BOOLEAN
int INT
tinyint TINYINT
smallint SMALLINT
bigint BIGINT
date DATE
time(n) TIME(n)
datetime2
datetime
smalldatetime
TIMESTAMP(n)
datetimeoffset TIMESTAMP_LTZ(3)