diff --git a/Cargo.lock b/Cargo.lock index cb4a7d0303f0..463e66e00e7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4274,70 +4274,6 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" -[[package]] -name = "encoding" -version = "0.2.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b0d943856b990d12d3b55b359144ff341533e516d94098b1d3fc1ac666d36ec" -dependencies = [ - "encoding-index-japanese", - "encoding-index-korean", - "encoding-index-simpchinese", - "encoding-index-singlebyte", - "encoding-index-tradchinese", -] - -[[package]] -name = "encoding-index-japanese" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04e8b2ff42e9a05335dbf8b5c6f7567e5591d0d916ccef4e0b1710d32a0d0c91" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding-index-korean" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dc33fb8e6bcba213fe2f14275f0963fd16f0a02c878e3095ecfdf5bee529d81" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding-index-simpchinese" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d87a7194909b9118fc707194baa434a4e3b0fb6a5a757c73c3adb07aa25031f7" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding-index-singlebyte" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3351d5acffb224af9ca265f435b859c7c01537c0849754d3db3fdf2bfe2ae84a" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding-index-tradchinese" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd0e20d5688ce3cab59eb3ef3a2083a5c77bf496cb798dc6fcdb75f323890c18" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding_index_tests" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a246d82be1c9d791c5dfde9a2bd045fc3cbba3fa2b11ad558f27d01712f00569" - [[package]] name = "encoding_rs" version = "0.8.33" @@ -7915,6 +7851,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-src" +version = "300.3.1+3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7259953d42a81bf137fbbd73bd30a8e1914d6dce43c2b90ed575783a22608b91" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.103" @@ -7923,6 +7868,7 @@ checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -7999,6 +7945,20 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "opentls" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f561874f8d6ecfb674fc08863414040c93cc90c0b6963fe679895fab8b65560" +dependencies = [ + "futures-util", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "url", +] + [[package]] name = "ordered-float" version = "2.10.0" @@ -11892,18 +11852,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "log", - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.21.11" @@ -13776,8 +13724,7 @@ dependencies = [ [[package]] name = "tiberius" version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc6e2bf3e4b5be181a2a2ceff4b9b12e2684010d436a6958bd564fbc8094d44d" +source = "git+https://github.com/risingwavelabs/tiberius.git?rev=f834f2deeb9e2fb08afaf73865f330cf31a3876a#f834f2deeb9e2fb08afaf73865f330cf31a3876a" dependencies = [ "async-trait", "asynchronous-codec", @@ -13786,19 +13733,18 @@ dependencies = [ "bytes", "chrono", "connection-string", - "encoding", + "encoding_rs", "enumflags2", "futures-util", "num-traits", "once_cell", + "opentls", "pin-project-lite", "pretty-hex", "rust_decimal", - "rustls-native-certs 0.6.3", - "rustls-pemfile 1.0.4", "thiserror", "time", - "tokio-rustls 0.23.4", + "tokio", "tokio-util", "tracing", "uuid", @@ -14009,17 +13955,6 @@ dependencies = [ "rand", ] -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls 0.20.9", - "tokio", - "webpki", -] - [[package]] name = "tokio-rustls" version = "0.24.1" @@ -15338,16 +15273,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring 0.17.5", - "untrusted 0.9.0", -] - [[package]] name = "webpki-roots" version = "0.25.2" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 83cb000566d4..78ad69c0995a 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -74,6 +74,7 @@ services: image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: - mysql + - sqlserver-server - db - message_queue - schemaregistry @@ -215,6 +216,7 @@ services: environment: ACCEPT_EULA: 'Y' SA_PASSWORD: 'SomeTestOnly@SA' + MSSQL_AGENT_ENABLED: "true" starrocks-fe-server: container_name: starrocks-fe-server diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index c4b4713af81c..b6eff317a79e 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -63,6 +63,14 @@ echo "--- starting risingwave cluster" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-1cn-1fe-with-recovery +echo "--- Install sql server client" +curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add - +curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list +apt-get update -y +ACCEPT_EULA=Y DEBIAN_FRONTEND=noninteractive apt-get install -y mssql-tools unixodbc-dev +export PATH="/opt/mssql-tools/bin/:$PATH" +sleep 2 + echo "--- mongodb cdc test" # install the mongo shell wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb @@ -79,6 +87,7 @@ risedev slt './e2e_test/source/cdc/mongodb/**/*.slt' echo "--- inline cdc test" export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 +export SQLCMDSERVER=sqlserver-server SQLCMDUSER=SA SQLCMDPASSWORD="SomeTestOnly@SA" SQLCMDDBNAME=mydb SQLCMDPORT=1433 risedev slt './e2e_test/source/cdc_inline/**/*.slt' echo "--- opendal source test" diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt new file mode 100644 index 000000000000..05321ea9db38 --- /dev/null +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt @@ -0,0 +1,258 @@ + +control substitution on + +# ------------ data prepare stage ------------ +system ok +sqlcmd -C -d master -Q 'create database mydb;' -b + +system ok +sqlcmd -C -i e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql -b + +# ------------ validate stage ------------ +# TODO(kexiang): add more tests here +# invalid username +# invalid password +# invalid table name +# invalid primary key +# column name mismatch +# column data type mismatch +# format & encode provided and match with debezium json, this is okay +# format & encode provided but mismatch with debezium json, this is not allowed + +statement error Protocol error: connector sqlserver-cdc does not support `CREATE TABLE`, please use `CREATE SOURCE` instead +CREATE TABLE orders ( + order_id INT PRIMARY KEY, + order_date BIGINT, + customer_name VARCHAR, + price DECIMAL, + product_id INT, + order_status SMALLINT +) WITH ( + connector = 'sqlserver-cdc', + hostname = '${SQLCMDSERVER:sqlserver-server}', + port = '${SQLCMDPORT:1433}', + username = '${SQLCMDUSER:SA}', + password = '${SQLCMDPASSWORD}', + table.name = 'orders', + database.name = '${SQLCMDDBNAME}', +); + +statement error Protocol error: connector sqlserver-cdc does not support `CREATE TABLE`, please use `CREATE SOURCE` instead +CREATE TABLE single_type ( + id INT, + c_time time, + PRIMARY KEY (id) +) WITH ( + connector = 'sqlserver-cdc', + hostname = '${SQLCMDSERVER:sqlserver-server}', + port = '${SQLCMDPORT:1433}', + username = '${SQLCMDUSER:SA}', + password = '${SQLCMDPASSWORD}', + table.name = 'single_type', + database.name = '${SQLCMDDBNAME}', +); + +statement error Protocol error: connector sqlserver-cdc does not support `CREATE TABLE`, please use `CREATE SOURCE` instead +CREATE TABLE sqlserver_all_data_types ( + id INT PRIMARY KEY, + c_bit BOOLEAN, + c_tinyint SMALLINT, + c_smallint SMALLINT, + c_int INTEGER, + c_bigint BIGINT, + c_decimal DECIMAL, + c_real REAL, + c_float FLOAT, + c_varchar VARCHAR, + c_varbinary BYTEA, + c_date DATE, + c_time TIME, + c_datetime2 TIMESTAMP, + c_datetimeoffset TIMESTAMPTZ +) WITH ( + connector = 'sqlserver-cdc', + hostname = '${SQLCMDSERVER:sqlserver-server}', + port = '${SQLCMDPORT:1433}', + username = '${SQLCMDUSER:SA}', + password = '${SQLCMDPASSWORD}', + table.name = 'sqlserver_all_data_types', + database.name = '${SQLCMDDBNAME}', +); + + +# ------------ Create source/table/mv stage ------------ +# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON` +statement ok +CREATE SOURCE mssql_source WITH ( + connector = 'sqlserver-cdc', + hostname = '${SQLCMDSERVER:sqlserver-server}', + port = '${SQLCMDPORT:1433}', + username = '${SQLCMDUSER:SA}', + password = '${SQLCMDPASSWORD}', + database.name = '${SQLCMDDBNAME}', +); + +statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source +create materialized view mv as select * from mssql_source; + +statement error The upstream table name must contain schema name prefix* +CREATE TABLE shared_orders ( + order_id INT, + order_date BIGINT, + customer_name VARCHAR, + price DECIMAL, + product_id INT, + order_status SMALLINT, + PRIMARY KEY (order_id) +) from mssql_source table 'orders'; + +statement ok +CREATE TABLE shared_orders ( + order_id INT, + order_date BIGINT, + customer_name VARCHAR, + price DECIMAL, + product_id INT, + order_status SMALLINT, + PRIMARY KEY (order_id) +) from mssql_source table 'dbo.orders'; + +statement ok +CREATE TABLE shared_single_type ( + id INT, + c_time time, + PRIMARY KEY (id) +) from mssql_source table 'dbo.single_type'; + +statement ok +CREATE TABLE shared_sqlserver_all_data_types ( + id INT, + c_bit BOOLEAN, + c_tinyint SMALLINT, + c_smallint SMALLINT, + c_int INTEGER, + c_bigint BIGINT, + c_decimal DECIMAL, + c_real REAL, + c_float FLOAT, + c_varchar VARCHAR, + c_varbinary BYTEA, + c_date DATE, + c_time TIME, + c_datetime2 TIMESTAMP, + c_datetimeoffset TIMESTAMPTZ, + PRIMARY KEY (id) +) from mssql_source table 'dbo.sqlserver_all_data_types'; + +statement ok +create materialized view shared_orders_cnt as select count(*) as cnt from shared_orders; + +statement ok +create materialized view shared_single_type_cnt as select count(*) as cnt from shared_single_type; + +statement ok +create materialized view shared_sqlserver_all_data_types_cnt as select count(*) as cnt from shared_sqlserver_all_data_types; + +# sleep to ensure the data in mssql tables is consumed from Debezium message instead of backfill. +sleep 20s + +# ------------ check stage ------------ +query I +select cnt from shared_orders_cnt; +---- +3 + +query I +select cnt from shared_single_type_cnt; +---- +1 + +query I +select cnt from shared_sqlserver_all_data_types_cnt; +---- +3 + +query III +select * from shared_orders order by order_id; +---- +1 1558430840000 Bob 11 1 1 +2 1558430840001 Alice 21 2 1 +3 1558430840002 Alice 19 2 1 + +query I +SELECT * from shared_single_type order by id; +---- +3 23:59:59.999 + +query TTTTTTT +SELECT * from shared_sqlserver_all_data_types order by id; +---- +1 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +2 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +3 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 + +# ------------ kill cluster ------------ +# system ok +# risedev kill + +# sleep 30s + +# ------------ add rows stage ------------ +system ok +sqlcmd -C -i e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql -b + +sleep 10s + +# ------------ recover cluster ------------ +# system ok +# risedev dev ci-1cn-1fe-with-recovery + +# sleep 30s + +# ------------ check after recovery stage ------------ + +query I +select cnt from shared_orders_cnt; +---- +6 + +query I +select cnt from shared_single_type_cnt; +---- +2 + +query I +select cnt from shared_sqlserver_all_data_types_cnt; +---- +6 + + +query III +select * from shared_orders order by order_id; +---- +1 1558430840000 Bob 11 1 1 +2 1558430840001 Alice 21 2 1 +3 1558430840002 Alice 19 2 1 +11 1558430840000 Bob 11 1 1 +12 1558430840001 Alice 21 2 1 +13 1558430840002 Alice 19 2 1 + +query I +SELECT * from shared_single_type order by id; +---- +3 23:59:59.999 +13 23:59:59.999 + +query TTTTTTT +SELECT * from shared_sqlserver_all_data_types order by id; +---- +1 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +2 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +3 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 +11 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +12 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +13 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 + +# ------------ drop stage ------------ +statement ok +drop source mssql_source cascade; diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql new file mode 100644 index 000000000000..e5089aa86d02 --- /dev/null +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql @@ -0,0 +1,24 @@ + +INSERT INTO + orders ( + order_id, + order_date, + customer_name, + price, + product_id, + order_status + ) +VALUES + (11, 1558430840000, 'Bob', 10.50, 1, 1), + (12, 1558430840001, 'Alice', 20.50, 2, 1), + (13, 1558430840002, 'Alice', 18.50, 2, 1); + + +INSERT INTO single_type VALUES (13, '23:59:59.999') + + +INSERT INTO sqlserver_all_data_types VALUES (11, 'False', 0, 0, 0, 0, 0, 0, 0, '', NULL, '2001-01-01', '00:00:00', '2001-01-01 00:00:00', '2001-01-01 00:00:00'); + +INSERT INTO sqlserver_all_data_types VALUES (12, 'True', 255, -32768, -2147483648, -9223372036854775808, -10.0, -9999.999999, -10000.0, 'aa', 0xff, '1990-01-01', '13:59:59.123', '2000-01-01 11:00:00.123', '1990-01-01 00:00:01.123'); + +INSERT INTO sqlserver_all_data_types VALUES (13, 'True', 127, 32767, 2147483647, 9223372036854775807, -10.0, 9999.999999, 10000.0, 'zzzz', 0xffffffff, '2999-12-31', '23:59:59.999', '2099-12-31 23:59:59.999', '2999-12-31 23:59:59.999') diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql new file mode 100644 index 000000000000..bec368ab00d4 --- /dev/null +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql @@ -0,0 +1,72 @@ +EXEC sys.sp_cdc_enable_db; + +CREATE TABLE orders ( + order_id INT PRIMARY KEY, + order_date BIGINT, + customer_name NVARCHAR(200), + price DECIMAL, + product_id INT, + order_status SMALLINT +); + +EXEC sys.sp_cdc_enable_table + @source_schema = 'dbo', + @source_name = 'orders', + @role_name = NULL; + + +INSERT INTO + orders ( + order_id, + order_date, + customer_name, + price, + product_id, + order_status + ) +VALUES + (1, 1558430840000, 'Bob', 10.50, 1, 1), + (2, 1558430840001, 'Alice', 20.50, 2, 1), + (3, 1558430840002, 'Alice', 18.50, 2, 1); + +CREATE TABLE single_type ( + id INT PRIMARY KEY, + c_time time, +); + +EXEC sys.sp_cdc_enable_table + @source_schema = 'dbo', + @source_name = 'single_type', + @role_name = NULL; + +INSERT INTO single_type VALUES (3, '23:59:59.999') + + +CREATE TABLE sqlserver_all_data_types ( + id INT PRIMARY KEY, + c_bit bit, + c_tinyint tinyint, + c_smallint smallint, + c_int int, + c_bigint bigint, + c_decimal DECIMAL(28), + c_real real, + c_float float, + c_varchar varchar(4), + c_varbinary varbinary(4), + c_date date, + c_time time, + c_datetime2 datetime2, + c_datetimeoffset datetimeoffset +); + +EXEC sys.sp_cdc_enable_table + @source_schema = 'dbo', + @source_name = 'sqlserver_all_data_types', + @role_name = NULL; + +INSERT INTO sqlserver_all_data_types VALUES (1, 'False', 0, 0, 0, 0, 0, 0, 0, '', NULL, '2001-01-01', '00:00:00', '2001-01-01 00:00:00', '2001-01-01 00:00:00'); + +INSERT INTO sqlserver_all_data_types VALUES (2, 'True', 255, -32768, -2147483648, -9223372036854775808, -10.0, -9999.999999, -10000.0, 'aa', 0xff, '1990-01-01', '13:59:59.123', '2000-01-01 11:00:00.123', '1990-01-01 00:00:01.123'); + +INSERT INTO sqlserver_all_data_types VALUES (3, 'True', 127, 32767, 2147483647, 9223372036854775807, -10.0, 9999.999999, 10000.0, 'zzzz', 0xffffffff, '2999-12-31', '23:59:59.999', '2099-12-31 23:59:59.999', '2999-12-31 23:59:59.999') diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceTypeE.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceTypeE.java index 0c9858ab4fd5..21a548bcb825 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceTypeE.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceTypeE.java @@ -21,6 +21,7 @@ public enum SourceTypeE { POSTGRES, CITUS, MONGODB, + SQL_SERVER, INVALID; public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) { @@ -33,6 +34,8 @@ public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) { return SourceTypeE.CITUS; case MONGODB: return SourceTypeE.MONGODB; + case SQL_SERVER: + return SourceTypeE.SQL_SERVER; default: return SourceTypeE.INVALID; } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index 309ab8db7af4..458ed8a6d7a3 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -157,6 +157,14 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re var validator = new MongoDbValidator(props); validator.validateDbConfig(); break; + case SQL_SERVER: + ensureRequiredProps(props, isCdcSourceJob); + ensurePropNotBlank(props, DbzConnectorConfig.SQL_SERVER_SCHEMA_NAME); + try (var sqlServerValidator = + new SqlServerValidator(props, tableSchema, isCdcSourceJob)) { + sqlServerValidator.validateAll(); + } + break; default: LOG.warn("Unknown source type"); throw ValidatorUtils.invalidArgument("Unknown source type"); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index faae0048649b..fb8aa62916f6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -55,11 +55,15 @@ public class DbzConnectorConfig { public static final String PG_PUB_CREATE = "publication.create.enable"; public static final String PG_SCHEMA_NAME = "schema.name"; + /* Sql Server configs */ + public static final String SQL_SERVER_SCHEMA_NAME = "schema.name"; + /* RisingWave configs */ private static final String DBZ_CONFIG_FILE = "debezium.properties"; private static final String MYSQL_CONFIG_FILE = "mysql.properties"; private static final String POSTGRES_CONFIG_FILE = "postgres.properties"; private static final String MONGODB_CONFIG_FILE = "mongodb.properties"; + private static final String SQL_SERVER_CONFIG_FILE = "sql_server.properties"; private static final String DBZ_PROPERTY_PREFIX = "debezium."; @@ -249,7 +253,38 @@ public DbzConnectorConfig( mongodbProps.setProperty("name", connectorName); dbzProps.putAll(mongodbProps); + } else if (source == SourceTypeE.SQL_SERVER) { + var sqlServerProps = initiateDbConfig(SQL_SERVER_CONFIG_FILE, substitutor); + // disable snapshot locking at all + sqlServerProps.setProperty("snapshot.locking.mode", "none"); + if (isCdcBackfill) { + // if startOffset is specified, we should continue + // reading changes from the given offset + if (null != startOffset && !startOffset.isBlank()) { + // skip the initial snapshot for cdc backfill + sqlServerProps.setProperty("snapshot.mode", "recovery"); + sqlServerProps.setProperty( + ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); + } else { + sqlServerProps.setProperty("snapshot.mode", "no_data"); + } + } else { + // if snapshot phase is finished and offset is specified, we will continue reading + // changes from the given offset + if (snapshotDone && null != startOffset && !startOffset.isBlank()) { + sqlServerProps.setProperty("snapshot.mode", "recovery"); + sqlServerProps.setProperty( + ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); + } + } + dbzProps.putAll(sqlServerProps); + if (isCdcSourceJob) { + // remove table filtering for the shared Sql Server source, since we + // allow user to ingest tables in different schemas + LOG.info("Disable table filtering for the shared Sql Server source"); + dbzProps.remove("table.include.list"); + } } else { throw new RuntimeException("unsupported source type: " + source); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index 83c6d59fac92..13ca00261287 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -19,6 +19,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import javax.management.JMException; @@ -112,6 +113,9 @@ public static boolean waitForStreamingRunning( } else if (sourceType == SourceTypeE.POSTGRES) { return waitForStreamingRunningInner( "postgres", dbServerName, waitStreamingStartTimeout); + } else if (sourceType == SourceTypeE.SQL_SERVER) { + return waitForStreamingRunningInner( + "sql_server", dbServerName, waitStreamingStartTimeout); } else { LOG.info("Unsupported backfill source, just return true for {}", dbServerName); return true; @@ -162,12 +166,23 @@ private static boolean isStreamingRunning(String connector, String server, Strin private static ObjectName getStreamingMetricsObjectName( String connector, String server, String context) throws MalformedObjectNameException { - return new ObjectName( - "debezium." - + connector - + ":type=connector-metrics,context=" - + context - + ",server=" - + server); + if (Objects.equals(connector, "sql_server")) { + // TODO: fulfill the task id here, by WKX + return new ObjectName( + "debezium." + + connector + + ":type=connector-metrics,task=0,context=" + + context + + ",server=" + + server); + } else { + return new ObjectName( + "debezium." + + connector + + ":type=connector-metrics,context=" + + context + + ",server=" + + server); + } } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java new file mode 100644 index 000000000000..1c0e8176add7 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java @@ -0,0 +1,310 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.common; + +import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.api.source.SourceTypeE; +import com.risingwave.proto.Data; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SqlServerValidator extends DatabaseValidator implements AutoCloseable { + static final Logger LOG = LoggerFactory.getLogger(SqlServerValidator.class); + + private final TableSchema tableSchema; + + private final Connection jdbcConnection; + + private final String user; + private final String dbName; + private final String schemaName; + private final String tableName; + + // Whether the properties to validate is shared by multiple tables. + // If true, we will skip validation check for table + private final boolean isCdcSourceJob; + + public SqlServerValidator( + Map userProps, TableSchema tableSchema, boolean isCdcSourceJob) + throws SQLException { + this.tableSchema = tableSchema; + + var dbHost = userProps.get(DbzConnectorConfig.HOST); + var dbPort = userProps.get(DbzConnectorConfig.PORT); + var dbName = userProps.get(DbzConnectorConfig.DB_NAME); + var user = userProps.get(DbzConnectorConfig.USER); + var password = userProps.get(DbzConnectorConfig.PASSWORD); + + var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.SQL_SERVER, dbHost, dbPort, dbName); + this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); + + this.dbName = dbName; + this.user = user; + this.schemaName = userProps.get(DbzConnectorConfig.SQL_SERVER_SCHEMA_NAME); + this.tableName = userProps.get(DbzConnectorConfig.TABLE_NAME); + this.isCdcSourceJob = isCdcSourceJob; + } + + @Override + public void validateDbConfig() { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("sqlserver.db.cdc.enabled"))) { + // check whether cdc has been enabled + var res = stmt.executeQuery(); + while (res.next()) { + if (!res.getString(1).equals(dbName)) { + throw ValidatorUtils.invalidArgument( + "Sql Server's DB_NAME() '" + + res.getString(1) + + "' does not match db_name'" + + dbName + + "'."); + } + if (res.getInt(2) != 1) { + throw ValidatorUtils.invalidArgument( + "Sql Server's '" + + dbName + + "' has not enabled CDC.\nPlease modify the config your Sql Server with 'EXEC sys.sp_cdc_enable_db'."); + } + } + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + if (isCdcSourceJob) { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("sqlserver.sql.agent.enabled"))) { + // check whether sql server agent is enabled. It's required to run + // fn_cdc_get_max_lsn + var res = stmt.executeQuery(); + while (res.next()) { + if (res.wasNull()) { + throw ValidatorUtils.invalidArgument( + "Sql Server's sql server agent is not activated.\nYou can check it by running `SELECT servicename, startup_type_desc, status_desc FROM sys.dm_server_services WHERE servicename LIKE 'SQL Server Agent%'` in Sql Server."); + } + } + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + } + } + + @Override + public void validateUserPrivilege() { + try { + validatePrivileges(); + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + } + + @Override + public void validateTable() { + try { + validateTableSchema(); + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + } + + @Override + boolean isCdcSourceJob() { + return isCdcSourceJob; + } + + private void validateTableSchema() throws SQLException { + if (isCdcSourceJob) { + return; + } + // check whether table exist + try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("sqlserver.table"))) { + stmt.setString(1, schemaName); + stmt.setString(2, tableName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getInt(1) == 0) { + throw ValidatorUtils.invalidArgument( + String.format( + "Sql Server table '%s'.'%s' doesn't exist", + schemaName, tableName)); + } + } + } + + // check cdc enabled + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("sqlserver.table.cdc.enabled"))) { + stmt.setString(1, schemaName); + stmt.setString(2, tableName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getInt(1) != 1) { + throw ValidatorUtils.invalidArgument( + "Table '" + + schemaName + + "." + + tableName + + "' has not enabled CDC.\nPlease ensure CDC is enabled."); + } + } + } + + // check primary key + try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("sqlserver.pk"))) { + stmt.setString(1, this.schemaName); + stmt.setString(2, this.tableName); + var res = stmt.executeQuery(); + var pkFields = new HashSet(); + while (res.next()) { + var name = res.getString(1); + pkFields.add(name); + } + + if (!isPrimaryKeyMatch(tableSchema, pkFields)) { + throw ValidatorUtils.invalidArgument("Primary key mismatch"); + } + } + + // Check whether source schema match table schema on upstream + // All columns defined must exist in upstream database + try (var stmt = + jdbcConnection.prepareStatement(ValidatorUtils.getSql("sqlserver.table_schema"))) { + stmt.setString(1, this.schemaName); + stmt.setString(2, this.tableName); + var res = stmt.executeQuery(); + + // Field names in lower case -> data type + Map schema = new HashMap<>(); + while (res.next()) { + var field = res.getString(1); + var dataType = res.getString(2); + schema.put(field.toLowerCase(), dataType); + } + + for (var e : tableSchema.getColumnTypes().entrySet()) { + // skip validate internal columns + if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) { + continue; + } + var dataType = schema.get(e.getKey().toLowerCase()); + if (dataType == null) { + throw ValidatorUtils.invalidArgument( + "Column '" + e.getKey() + "' not found in the upstream database"); + } + if (!isDataTypeCompatible(dataType, e.getValue())) { + throw ValidatorUtils.invalidArgument( + "Incompatible data type of column " + e.getKey()); + } + } + } + } + + private void validatePrivileges() throws SQLException { + if (isCdcSourceJob) { + return; + } + + try (var stmt = + jdbcConnection.prepareStatement(ValidatorUtils.getSql("sqlserver.has.perms"))) { + stmt.setString(1, this.schemaName); + stmt.setString(2, this.tableName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getInt(1) != 1) { + throw ValidatorUtils.invalidArgument( + "Sql Server user '" + + user + + "' must have select privilege on table '" + + schemaName + + "." + + tableName + + "''s CDC table."); + } + } + } + } + + @Override + public void close() throws Exception { + if (null != jdbcConnection) { + jdbcConnection.close(); + } + } + + public static boolean isPrimaryKeyMatch(TableSchema sourceSchema, Set pkFields) { + if (sourceSchema.getPrimaryKeys().size() != pkFields.size()) { + return false; + } + for (var colName : sourceSchema.getPrimaryKeys()) { + if (!pkFields.contains(colName)) { + return false; + } + } + return true; + } + + private boolean isDataTypeCompatible(String ssDataType, Data.DataType.TypeName typeName) { + // TODO: add more data type compatibility check, by WKX + int val = typeName.getNumber(); + switch (ssDataType) { + case "bit": + return val == Data.DataType.TypeName.BOOLEAN_VALUE; + case "tinyint": + case "smallint": + return Data.DataType.TypeName.INT16_VALUE <= val + && val <= Data.DataType.TypeName.INT64_VALUE; + case "integer": + return Data.DataType.TypeName.INT32_VALUE <= val + && val <= Data.DataType.TypeName.INT64_VALUE; + case "bigint": + return val == Data.DataType.TypeName.INT64_VALUE; + case "float": + case "real": + return val == Data.DataType.TypeName.FLOAT_VALUE + || val == Data.DataType.TypeName.DOUBLE_VALUE; + case "boolean": + return val == Data.DataType.TypeName.BOOLEAN_VALUE; + case "double": + case "double precision": + return val == Data.DataType.TypeName.DOUBLE_VALUE; + case "decimal": + case "numeric": + return val == Data.DataType.TypeName.DECIMAL_VALUE; + case "varchar": + case "character varying": + return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "varbinary": + return val == Data.DataType.TypeName.BYTEA_VALUE; + case "date": + return val == Data.DataType.TypeName.DATE_VALUE; + case "time": + return val == Data.DataType.TypeName.TIME_VALUE; + case "datetime": + case "datetime2": + case "smalldatetime": + return val == Data.DataType.TypeName.TIMESTAMP_VALUE; + case "datetimeoffset": + return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; + default: + return true; // true for other uncovered types + } + } +} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java index 20d631a3267c..4b79280e62da 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java @@ -66,6 +66,10 @@ public static String getJdbcUrl( case POSTGRES: case CITUS: return String.format("jdbc:postgresql://%s:%s/%s", host, port, database); + case SQL_SERVER: + return String.format( + "jdbc:sqlserver://%s:%s;databaseName=%s;encrypt=false", + host, port, database); default: throw ValidatorUtils.invalidArgument("Unknown source type: " + sourceType); } diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties b/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties new file mode 100644 index 000000000000..0e0c55c939ef --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties @@ -0,0 +1,24 @@ +# configs for sql server conneoctor +connector.class=io.debezium.connector.sqlserver.SqlServerConnector +# default snapshot mode to initial +snapshot.mode=${debezium.snapshot.mode:-initial} +database.hostname=${hostname} +database.port=${port} +database.user=${username} +database.password=${password} +database.names=${database.name} +table.include.list=${schema.name}.${table.name:-*} +# only read table schema of the captured tables in the specified database +schema.history.internal.store.only.captured.tables.ddl=true +schema.history.internal.store.only.captured.databases.ddl=true +# default to disable schema change events +include.schema.changes=${debezium.include.schema.changes:-false} +# default heartbeat interval 5 mins +heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000} +# In sharing cdc source mode, we will subscribe to multiple tables in the given database, +# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display. +name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing} +# In sharing cdc mode, transaction metadata will be enabled in frontend. +# For sql server, it's always false actually. +provide.transaction.metadata=${transactional:-false} +database.encrypt=false diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 769c3cb1c8fb..54214b6b4619 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -37,5 +37,12 @@ GROUP BY r1.rolname \ ), \ tmp AS (SELECT DISTINCT(UNNEST(m)) AS members FROM base) \ SELECT ARRAY_AGG(members) AS members FROM tmp +sqlserver.db.cdc.enabled=SELECT name, is_cdc_enabled FROM sys.databases WHERE name = DB_NAME() +sqlserver.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? +sqlserver.table.cdc.enabled=SELECT COUNT(*) FROM cdc.change_tables AS ct INNER JOIN sys.tables AS t ON ct.source_object_id = t.object_id INNER JOIN sys.schemas AS s ON t.schema_id = s.schema_id WHERE s.name = ? AND t.name = ? +sqlserver.pk=SELECT k.column_name FROM information_schema.table_constraints t INNER JOIN information_schema.key_column_usage k ON t.constraint_name = k.constraint_name AND t.table_name = k.table_name WHERE t.constraint_type = 'PRIMARY KEY' AND t.table_schema = ? AND t.table_name = ? +sqlserver.table_schema=SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION +sqlserver.has.perms=SELECT HAS_PERMS_BY_NAME('cdc.' + ct.capture_instance + '_CT', 'OBJECT', 'SELECT') FROM cdc.change_tables AS ct INNER JOIN sys.tables AS t ON ct.source_object_id = t.object_id INNER JOIN sys.schemas AS s ON t.schema_id = s.schema_id WHERE s.name = ? AND t.name = ? +sqlserver.sql.agent.enabled=SELECT sys.fn_cdc_get_max_lsn() citus.distributed_table=select citus_table_type from citus_tables where table_name=?::regclass postgres.rds.role.check=SELECT r.rolname, r.rolsuper, r.rolinherit, r.rolcreaterole, r.rolcreatedb, r.rolcanlogin, r.rolconnlimit, r.rolvaliduntil, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid) as memberof , r.rolreplication , r.rolbypassrls FROM pg_catalog.pg_roles r WHERE r.rolname = ? diff --git a/java/connector-node/risingwave-source-cdc/pom.xml b/java/connector-node/risingwave-source-cdc/pom.xml index 839de9c1f319..fb1519f6a183 100644 --- a/java/connector-node/risingwave-source-cdc/pom.xml +++ b/java/connector-node/risingwave-source-cdc/pom.xml @@ -47,6 +47,10 @@ io.debezium debezium-connector-mongodb + + io.debezium + debezium-connector-sqlserver + diff --git a/java/pom.xml b/java/pom.xml index 58f33e9da56d..c0521bd6dfc4 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -173,6 +173,11 @@ debezium-connector-mongodb ${debezium.version} + + io.debezium + debezium-connector-sqlserver + ${debezium.version} + org.postgresql postgresql diff --git a/proto/catalog.proto b/proto/catalog.proto index e60c99814d9e..d407dbe3936a 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -77,7 +77,7 @@ message StreamSourceInfo { // // Currently, the following sources can be shared: // - // - Direct CDC sources (mysql & postgresql) + // - Direct CDC sources (mysql & postgresql & sqlserver) // - MQ sources (Kafka) bool cdc_source_job = 13; // Only used when `cdc_source_job` is `true`. diff --git a/proto/connector_service.proto b/proto/connector_service.proto index cf549a8e2e49..964d22745254 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -173,6 +173,7 @@ enum SourceType { POSTGRES = 2; CITUS = 3; MONGODB = 4; + SQL_SERVER = 5; } message GetEventStreamRequest { diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index aaaa6321666e..974cb15259e2 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -79,6 +79,8 @@ impl MockOffsetGenExecutor { lsn: None, txid: None, tx_usec: None, + change_lsn: None, + commit_lsn: None, }, is_heartbeat: false, }; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index a5db0c4fcb93..9647c95d05d9 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -147,8 +147,19 @@ strum = "0.26" strum_macros = "0.26" tempfile = "3" thiserror = "1" + thiserror-ext = { workspace = true } -tiberius = { version = "0.12", default-features = false, features = ["chrono", "time", "tds73", "rust_decimal", "bigdecimal", "rustls"] } +# To easiy get the type_name and impl IntoSql for rust_decimal, we fork the crate. +# Another reason is that we are planning to refactor their IntoSql trait to allow specifying the type to convert. +tiberius = { git = "https://github.com/risingwavelabs/tiberius.git", rev = "f834f2deeb9e2fb08afaf73865f330cf31a3876a", default-features = false, features = [ + "chrono", + "sql-browser-tokio", + "vendored-openssl", + "rust_decimal", + "bigdecimal", + "tds73", + "time", +] } time = "0.3.30" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 076163469316..9dbd17ae58a1 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -52,6 +52,7 @@ def_anyhow_newtype! { sqlx::Error => transparent, // believed to be self-explanatory mysql_async::Error => "MySQL error", tokio_postgres::Error => "Postgres error", + tiberius::error::Error => "Sql Server error", apache_avro::Error => "Avro error", rdkafka::error::KafkaError => "Kafka error", pulsar::Error => "Pulsar error", diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 48f4e16c747b..b422a029fa76 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -21,7 +21,8 @@ macro_rules! for_all_classified_sources { { Mysql }, { Postgres }, { Citus }, - { Mongodb } + { Mongodb }, + { SqlServer } }, // other sources // todo: file source do not nest with mq source. diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 055eab777be5..6cd843c0384b 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -47,6 +47,7 @@ pub use self::mysql::mysql_row_to_owned_row; use self::plain_parser::PlainParser; pub use self::postgres::postgres_row_to_owned_row; use self::simd_json_parser::DebeziumJsonAccessBuilder; +pub use self::sql_server::{sql_server_row_to_owned_row, ScalarImplTiberiusWrapper}; pub use self::unified::json::{JsonAccess, TimestamptzHandling}; pub use self::unified::Access; use self::unified::AccessImpl; @@ -82,6 +83,7 @@ mod mysql; pub mod parquet_parser; pub mod plain_parser; mod postgres; +mod sql_server; mod protobuf; pub mod scalar_adapter; diff --git a/src/connector/src/parser/sql_server.rs b/src/connector/src/parser/sql_server.rs new file mode 100644 index 000000000000..91f78b41ffb6 --- /dev/null +++ b/src/connector/src/parser/sql_server.rs @@ -0,0 +1,297 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::LazyLock; + +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use risingwave_common::catalog::Schema; +use risingwave_common::log::LogSuppresser; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{Date, Decimal, ScalarImpl, Time, Timestamp, Timestamptz}; +use rust_decimal::Decimal as RustDecimal; +use thiserror_ext::AsReport; +use tiberius::Row; + +use crate::parser::util::log_error; + +static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); + +pub fn sql_server_row_to_owned_row(row: &mut Row, schema: &Schema) -> OwnedRow { + let mut datums: Vec> = vec![]; + for i in 0..schema.fields.len() { + let rw_field = &schema.fields[i]; + let name = rw_field.name.as_str(); + let datum = match row.try_get::(i) { + Ok(datum) => datum.map(|d| d.0), + Err(err) => { + log_error!(name, err, "parse column failed"); + None + } + }; + datums.push(datum); + } + OwnedRow::new(datums) +} +macro_rules! impl_tiberius_wrapper { + ($wrapper_name:ident, $variant_name:ident) => { + pub struct $wrapper_name($variant_name); + + impl From<$variant_name> for $wrapper_name { + fn from(value: $variant_name) -> Self { + Self(value) + } + } + }; +} + +impl_tiberius_wrapper!(ScalarImplTiberiusWrapper, ScalarImpl); +impl_tiberius_wrapper!(TimeTiberiusWrapper, Time); +impl_tiberius_wrapper!(DateTiberiusWrapper, Date); +impl_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp); +impl_tiberius_wrapper!(TimestamptzTiberiusWrapper, Timestamptz); +impl_tiberius_wrapper!(DecimalTiberiusWrapper, Decimal); + +macro_rules! impl_chrono_tiberius_wrapper { + ($wrapper_name:ident, $variant_name:ident, $chrono:ty) => { + impl<'a> tiberius::IntoSql<'a> for $wrapper_name { + fn into_sql(self) -> tiberius::ColumnData<'a> { + self.0 .0.into_sql() + } + } + + impl<'a> tiberius::FromSql<'a> for $wrapper_name { + fn from_sql( + value: &'a tiberius::ColumnData<'static>, + ) -> tiberius::Result> { + let instant = <$chrono as tiberius::FromSql>::from_sql(value)?; + let time = instant.map($variant_name::from).map($wrapper_name::from); + tiberius::Result::Ok(time) + } + } + }; +} + +impl_chrono_tiberius_wrapper!(TimeTiberiusWrapper, Time, NaiveTime); +impl_chrono_tiberius_wrapper!(DateTiberiusWrapper, Date, NaiveDate); +impl_chrono_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp, NaiveDateTime); + +impl<'a> tiberius::IntoSql<'a> for DecimalTiberiusWrapper { + fn into_sql(self) -> tiberius::ColumnData<'a> { + match self.0 { + Decimal::Normalized(d) => d.into_sql(), + Decimal::NaN => tiberius::ColumnData::Numeric(None), + Decimal::PositiveInf => tiberius::ColumnData::Numeric(None), + Decimal::NegativeInf => tiberius::ColumnData::Numeric(None), + } + } +} + +impl<'a> tiberius::FromSql<'a> for DecimalTiberiusWrapper { + // TODO(kexiang): will sql server have inf/-inf/nan for decimal? + fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result> { + tiberius::Result::Ok( + ::from_sql(value)? + .map(Decimal::Normalized) + .map(DecimalTiberiusWrapper::from), + ) + } +} + +impl<'a> tiberius::IntoSql<'a> for TimestamptzTiberiusWrapper { + fn into_sql(self) -> tiberius::ColumnData<'a> { + self.0.to_datetime_utc().into_sql() + } +} + +impl<'a> tiberius::FromSql<'a> for TimestamptzTiberiusWrapper { + fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result> { + let instant = as tiberius::FromSql>::from_sql(value)?; + let time = instant + .map(Timestamptz::from) + .map(TimestamptzTiberiusWrapper::from); + tiberius::Result::Ok(time) + } +} + +/// The following table shows the mapping between Rust types and Sql Server types in tiberius. +/// |Rust Type|Sql Server Type| +/// |`u8`|`tinyint`| +/// |`i16`|`smallint`| +/// |`i32`|`int`| +/// |`i64`|`bigint`| +/// |`f32`|`float(24)`| +/// |`f64`|`float(53)`| +/// |`bool`|`bit`| +/// |`String`/`&str`|`nvarchar`/`varchar`/`nchar`/`char`/`ntext`/`text`| +/// |`Vec`/`&[u8]`|`binary`/`varbinary`/`image`| +/// |[`Uuid`]|`uniqueidentifier`| +/// |[`Numeric`]|`numeric`/`decimal`| +/// |[`Decimal`] (with feature flag `rust_decimal`)|`numeric`/`decimal`| +/// |[`XmlData`]|`xml`| +/// |[`NaiveDateTime`] (with feature flag `chrono`)|`datetime`/`datetime2`/`smalldatetime`| +/// |[`NaiveDate`] (with feature flag `chrono`)|`date`| +/// |[`NaiveTime`] (with feature flag `chrono`)|`time`| +/// |[`DateTime`] (with feature flag `chrono`)|`datetimeoffset`| +/// +/// See the [`time`] module for more information about the date and time structs. +/// +/// [`Row#get`]: struct.Row.html#method.get +/// [`Row#try_get`]: struct.Row.html#method.try_get +/// [`time`]: time/index.html +/// [`Uuid`]: struct.Uuid.html +/// [`Numeric`]: numeric/struct.Numeric.html +/// [`Decimal`]: numeric/struct.Decimal.html +/// [`XmlData`]: xml/struct.XmlData.html +/// [`NaiveDateTime`]: time/chrono/struct.NaiveDateTime.html +/// [`NaiveDate`]: time/chrono/struct.NaiveDate.html +/// [`NaiveTime`]: time/chrono/struct.NaiveTime.html +/// [`DateTime`]: time/chrono/struct.DateTime.html +impl<'a> tiberius::FromSql<'a> for ScalarImplTiberiusWrapper { + fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result> { + Ok(match &value { + tiberius::ColumnData::U8(_) => ::from_sql(value)? + .map(|v| ScalarImplTiberiusWrapper::from(ScalarImpl::from(v as i16))), + tiberius::ColumnData::I16(_) => ::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::I32(_) => ::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::I64(_) => ::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::F32(_) => ::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::F64(_) => ::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::Bit(_) => ::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::String(_) => <&str as tiberius::FromSql>::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::Numeric(_) => { + ::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from) + } + tiberius::ColumnData::DateTime(_) + | tiberius::ColumnData::DateTime2(_) + | tiberius::ColumnData::SmallDateTime(_) => { + ::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from) + } + tiberius::ColumnData::Time(_) => { + ::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from) + } + tiberius::ColumnData::Date(_) => { + ::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from) + } + tiberius::ColumnData::DateTimeOffset(_) => { + ::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from) + } + tiberius::ColumnData::Binary(_) => <&[u8] as tiberius::FromSql>::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::Guid(_) | tiberius::ColumnData::Xml(_) => { + return Err(tiberius::error::Error::Conversion( + format!( + "the sql server decoding for {:?} is unsupported", + value.type_name() + ) + .into(), + )) + } + }) + } +} + +/// The following table shows the mapping between Rust types and Sql Server types in tiberius. +/// |Rust type|Sql Server type| +/// |--------|--------| +/// |`u8`|`tinyint`| +/// |`i16`|`smallint`| +/// |`i32`|`int`| +/// |`i64`|`bigint`| +/// |`f32`|`float(24)`| +/// |`f64`|`float(53)`| +/// |`bool`|`bit`| +/// |`String`/`&str` (< 4000 characters)|`nvarchar(4000)`| +/// |`String`/`&str`|`nvarchar(max)`| +/// |`Vec`/`&[u8]` (< 8000 bytes)|`varbinary(8000)`| +/// |`Vec`/`&[u8]`|`varbinary(max)`| +/// |[`Uuid`]|`uniqueidentifier`| +/// |[`Numeric`]|`numeric`/`decimal`| +/// |[`Decimal`] (with feature flag `rust_decimal`)|`numeric`/`decimal`| +/// |[`BigDecimal`] (with feature flag `bigdecimal`)|`numeric`/`decimal`| +/// |[`XmlData`]|`xml`| +/// |[`NaiveDate`] (with `chrono` feature, TDS 7.3 >)|`date`| +/// |[`NaiveTime`] (with `chrono` feature, TDS 7.3 >)|`time`| +/// |[`DateTime`] (with `chrono` feature, TDS 7.3 >)|`datetimeoffset`| +/// |[`NaiveDateTime`] (with `chrono` feature, TDS 7.3 >)|`datetime2`| +/// |[`NaiveDateTime`] (with `chrono` feature, TDS 7.2)|`datetime`| +/// +/// It is possible to use some of the types to write into columns that are not +/// of the same type. For example on systems following the TDS 7.3 standard (SQL +/// Server 2008 and later), the chrono type `NaiveDateTime` can also be used to +/// write to `datetime`, `datetime2` and `smalldatetime` columns. All string +/// types can also be used with `ntext`, `text`, `varchar`, `nchar` and `char` +/// columns. All binary types can also be used with `binary` and `image` +/// columns. +/// +/// See the [`time`] module for more information about the date and time structs. +/// +/// [`Client#query`]: struct.Client.html#method.query +/// [`Client#execute`]: struct.Client.html#method.execute +/// [`time`]: time/index.html +/// [`Uuid`]: struct.Uuid.html +/// [`Numeric`]: numeric/struct.Numeric.html +/// [`Decimal`]: numeric/struct.Decimal.html +/// [`BigDecimal`]: numeric/struct.BigDecimal.html +/// [`XmlData`]: xml/struct.XmlData.html +/// [`NaiveDateTime`]: time/chrono/struct.NaiveDateTime.html +/// [`NaiveDate`]: time/chrono/struct.NaiveDate.html +/// [`NaiveTime`]: time/chrono/struct.NaiveTime.html +/// [`DateTime`]: time/chrono/struct.DateTime.html +impl<'a> tiberius::IntoSql<'a> for ScalarImplTiberiusWrapper { + fn into_sql(self) -> tiberius::ColumnData<'a> { + match self.0 { + ScalarImpl::Int16(v) => v.into_sql(), + ScalarImpl::Int32(v) => v.into_sql(), + ScalarImpl::Int64(v) => v.into_sql(), + ScalarImpl::Float32(v) => v.0.into_sql(), + ScalarImpl::Float64(v) => v.0.into_sql(), + ScalarImpl::Bool(v) => v.into_sql(), + ScalarImpl::Decimal(v) => DecimalTiberiusWrapper::from(v).into_sql(), + ScalarImpl::Date(v) => DateTiberiusWrapper::from(v).into_sql(), + ScalarImpl::Timestamp(v) => TimestampTiberiusWrapper::from(v).into_sql(), + ScalarImpl::Timestamptz(v) => TimestamptzTiberiusWrapper::from(v).into_sql(), + ScalarImpl::Time(v) => TimeTiberiusWrapper::from(v).into_sql(), + // ScalarImpl::Bytea(v) => (*v.clone()).into_sql(), + value => { + // Utf8, Serial, Interval, Jsonb, Int256, Struct, List are not supported yet + unimplemented!("the sql server decoding for {:?} is unsupported", value); + } + } + } +} diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index e4ec3f9870b4..5b9e6e1c1e72 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -106,6 +106,7 @@ pub fn parse_transaction_meta( // The id field has different meanings for different databases: // PG: txID:LSN // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23) + // SQL Server: commit_lsn (e.g. 00000027:00000ac0:0002) match status { DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props { ConnectorProperties::PostgresCdc(_) => { @@ -115,6 +116,9 @@ pub fn parse_transaction_meta( ConnectorProperties::MysqlCdc(_) => { return Ok(TransactionControl::Begin { id: id.into() }) } + ConnectorProperties::SqlServerCdc(_) => { + return Ok(TransactionControl::Begin { id: id.into() }) + } _ => {} }, DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props { @@ -125,6 +129,9 @@ pub fn parse_transaction_meta( ConnectorProperties::MysqlCdc(_) => { return Ok(TransactionControl::Commit { id: id.into() }) } + ConnectorProperties::SqlServerCdc(_) => { + return Ok(TransactionControl::Commit { id: id.into() }) + } _ => {} }, _ => {} diff --git a/src/connector/src/sink/sqlserver.rs b/src/connector/src/sink/sqlserver.rs index 0b8c7fa1f779..0295d3d3de9a 100644 --- a/src/connector/src/sink/sqlserver.rs +++ b/src/connector/src/sink/sqlserver.rs @@ -159,7 +159,7 @@ impl Sink for SqlServerSink { // Query table metadata from SQL Server. let mut sql_server_table_metadata = HashMap::new(); - let mut sql_client = SqlClient::new(&self.config).await?; + let mut sql_client = SqlServerClient::new(&self.config).await?; let query_table_metadata_error = || { SinkError::SqlServer(anyhow!(format!( "SQL Server table {} metadata error", @@ -181,7 +181,7 @@ WHERE ORDER BY col.column_id;"#; let rows = sql_client - .client + .inner_client .query(QUERY_TABLE_METADATA, &[&self.config.table]) .await? .into_results() @@ -272,7 +272,7 @@ pub struct SqlServerSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - sql_client: SqlClient, + sql_client: SqlServerClient, ops: Vec, } @@ -283,7 +283,7 @@ impl SqlServerSinkWriter { pk_indices: Vec, is_append_only: bool, ) -> Result { - let sql_client = SqlClient::new(&config).await?; + let sql_client = SqlServerClient::new(&config).await?; let writer = Self { config, schema, @@ -444,7 +444,7 @@ impl SqlServerSinkWriter { } } } - query.execute(&mut self.sql_client.client).await?; + query.execute(&mut self.sql_client.inner_client).await?; Ok(()) } } @@ -495,11 +495,12 @@ impl SinkWriter for SqlServerSinkWriter { } } -struct SqlClient { - client: Client>, +#[derive(Debug)] +pub struct SqlServerClient { + pub inner_client: Client>, } -impl SqlClient { +impl SqlServerClient { async fn new(msconfig: &SqlServerConfig) -> Result { let mut config = Config::new(); config.host(&msconfig.host); @@ -507,7 +508,10 @@ impl SqlClient { config.authentication(AuthMethod::sql_server(&msconfig.user, &msconfig.password)); config.database(&msconfig.database); config.trust_cert(); + Self::new_with_config(config).await + } + pub async fn new_with_config(mut config: Config) -> Result { let tcp = TcpStream::connect(config.get_addr()) .await .context("failed to connect to sql server") @@ -515,8 +519,32 @@ impl SqlClient { tcp.set_nodelay(true) .context("failed to setting nodelay when connecting to sql server") .map_err(SinkError::SqlServer)?; - let client = Client::connect(config, tcp.compat_write()).await?; - Ok(Self { client }) + + let client = match Client::connect(config.clone(), tcp.compat_write()).await { + // Connection successful. + Ok(client) => client, + // The server wants us to redirect to a different address + Err(tiberius::error::Error::Routing { host, port }) => { + config.host(&host); + config.port(port); + let tcp = TcpStream::connect(config.get_addr()) + .await + .context("failed to connect to sql server after routing") + .map_err(SinkError::SqlServer)?; + tcp.set_nodelay(true) + .context( + "failed to setting nodelay when connecting to sql server after routing", + ) + .map_err(SinkError::SqlServer)?; + // we should not have more than one redirect, so we'll short-circuit here. + Client::connect(config, tcp.compat_write()).await? + } + Err(e) => return Err(e.into()), + }; + + Ok(Self { + inner_client: client, + }) } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 4c44f9610bd1..dc00fee24200 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -45,7 +45,7 @@ use crate::error::ConnectorResult as Result; use crate::parser::ParserConfig; use crate::source::filesystem::FsPageItem; use crate::source::monitor::EnumeratorMetrics; -use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc}; +use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc}; use crate::with_options::WithOptions; use crate::{ dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties, @@ -472,7 +472,7 @@ impl SplitImpl { pub fn is_cdc_split(&self) -> bool { matches!( self, - MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) + MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_) ) } @@ -483,6 +483,7 @@ impl SplitImpl { PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(), MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(), CitusCdc(split) => split.start_offset().clone().unwrap_or_default(), + SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(), _ => unreachable!("get_cdc_split_offset() is only for cdc split"), } } diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index 1c0237b0654c..ae8241a76910 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -28,7 +28,7 @@ use risingwave_pb::connector_service::{SourceType, ValidateSourceRequest, Valida use crate::error::ConnectorResult; use crate::source::cdc::{ table_schema_exclude_additional_columns, CdcProperties, CdcSourceTypeTrait, Citus, - DebeziumCdcSplit, Mongodb, Mysql, Postgres, + DebeziumCdcSplit, Mongodb, Mysql, Postgres, SqlServer, }; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; @@ -185,3 +185,15 @@ impl ListCdcSplits for DebeziumSplitEnumerator { )] } } + +impl ListCdcSplits for DebeziumSplitEnumerator { + type CdcSourceType = SqlServer; + + fn list_cdc_splits(&mut self) -> Vec> { + vec![DebeziumCdcSplit::::new( + self.source_id, + None, + None, + )] + } +} diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 9b266506023b..be1c891b8d07 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -14,6 +14,7 @@ pub mod mock_external_table; pub mod postgres; +pub mod sql_server; #[cfg(not(madsim))] mod maybe_tls_connector; @@ -42,6 +43,9 @@ use crate::source::cdc::external::mysql::{ use crate::source::cdc::external::postgres::{ PostgresExternalTable, PostgresExternalTableReader, PostgresOffset, }; +use crate::source::cdc::external::sql_server::{ + SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset, +}; use crate::source::cdc::CdcSourceType; use crate::WithPropertiesExt; @@ -50,6 +54,7 @@ pub enum CdcTableType { Undefined, MySql, Postgres, + SqlServer, Citus, } @@ -60,14 +65,26 @@ impl CdcTableType { "mysql-cdc" => Self::MySql, "postgres-cdc" => Self::Postgres, "citus-cdc" => Self::Citus, + "sqlserver-cdc" => Self::SqlServer, _ => Self::Undefined, } } pub fn can_backfill(&self) -> bool { + matches!(self, Self::MySql | Self::Postgres | Self::SqlServer) + } + + pub fn enable_transaction_metadata(&self) -> bool { + // In Debezium, transactional metadata cause delay of the newest events, as the `END` message is never sent unless a new transaction starts. + // So we only allow transactional metadata for MySQL and Postgres. + // See more in https://debezium.io/documentation/reference/2.6/connectors/sqlserver.html#sqlserver-transaction-metadata matches!(self, Self::MySql | Self::Postgres) } + pub fn shareable_only(&self) -> bool { + matches!(self, Self::SqlServer) + } + pub async fn create_table_reader( &self, config: ExternalTableConfig, @@ -81,6 +98,9 @@ impl CdcTableType { Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( PostgresExternalTableReader::new(config, schema, pk_indices).await?, )), + Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer( + SqlServerExternalTableReader::new(config, schema, pk_indices).await?, + )), _ => bail!("invalid external table type: {:?}", *self), } } @@ -110,6 +130,7 @@ impl SchemaTableName { CdcTableType::Postgres | CdcTableType::Citus => { properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default() } + CdcTableType::SqlServer => properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default(), _ => { unreachable!("invalid external table type: {:?}", table_type); } @@ -126,6 +147,7 @@ impl SchemaTableName { pub enum CdcOffset { MySql(MySqlOffset), Postgres(PostgresOffset), + SqlServer(SqlServerOffset), } // Example debezium offset for Postgres: @@ -169,6 +191,10 @@ pub struct DebeziumSourceOffset { #[serde(rename = "txId")] pub txid: Option, pub tx_usec: Option, + + // sql server offset + pub commit_lsn: Option, + pub change_lsn: Option, } pub type CdcOffsetParseFunc = Box ConnectorResult + Send>; @@ -188,6 +214,7 @@ pub trait ExternalTableReader { pub enum ExternalTableReaderImpl { MySql(MySqlExternalTableReader), Postgres(PostgresExternalTableReader), + SqlServer(SqlServerExternalTableReader), Mock(MockExternalTableReader), } @@ -259,6 +286,7 @@ impl ExternalTableReader for ExternalTableReaderImpl { match self { ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await, ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await, + ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await, ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await, } } @@ -281,6 +309,9 @@ impl ExternalTableReaderImpl { ExternalTableReaderImpl::Postgres(_) => { PostgresExternalTableReader::get_cdc_offset_parser() } + ExternalTableReaderImpl::SqlServer(_) => { + SqlServerExternalTableReader::get_cdc_offset_parser() + } ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(), } } @@ -300,6 +331,9 @@ impl ExternalTableReaderImpl { ExternalTableReaderImpl::Postgres(postgres) => { postgres.snapshot_read(table_name, start_pk, primary_keys, limit) } + ExternalTableReaderImpl::SqlServer(sql_server) => { + sql_server.snapshot_read(table_name, start_pk, primary_keys, limit) + } ExternalTableReaderImpl::Mock(mock) => { mock.snapshot_read(table_name, start_pk, primary_keys, limit) } @@ -317,6 +351,7 @@ impl ExternalTableReaderImpl { pub enum ExternalTableImpl { MySql(MySqlExternalTable), Postgres(PostgresExternalTable), + SqlServer(SqlServerExternalTable), } impl ExternalTableImpl { @@ -329,6 +364,9 @@ impl ExternalTableImpl { CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres( PostgresExternalTable::connect(config).await?, )), + CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer( + SqlServerExternalTable::connect(config).await?, + )), _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()), } } @@ -337,6 +375,7 @@ impl ExternalTableImpl { match self { ExternalTableImpl::MySql(mysql) => mysql.column_descs(), ExternalTableImpl::Postgres(postgres) => postgres.column_descs(), + ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(), } } @@ -344,6 +383,7 @@ impl ExternalTableImpl { match self { ExternalTableImpl::MySql(mysql) => mysql.pk_names(), ExternalTableImpl::Postgres(postgres) => postgres.pk_names(), + ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(), } } } diff --git a/src/connector/src/source/cdc/external/sql_server.rs b/src/connector/src/source/cdc/external/sql_server.rs new file mode 100644 index 000000000000..76b8e8e09fa4 --- /dev/null +++ b/src/connector/src/source/cdc/external/sql_server.rs @@ -0,0 +1,439 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; + +use anyhow::Context; +use futures::stream::BoxStream; +use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures_async_stream::try_stream; +use itertools::Itertools; +use risingwave_common::bail; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; +use serde_derive::{Deserialize, Serialize}; +use tiberius::{ColumnType, Config, Query, QueryItem}; + +use crate::error::{ConnectorError, ConnectorResult}; +use crate::parser::{sql_server_row_to_owned_row, ScalarImplTiberiusWrapper}; +use crate::sink::sqlserver::SqlServerClient; +use crate::source::cdc::external::{ + CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader, + SchemaTableName, +}; + +// The maximum commit_lsn value in Sql Server +const MAX_COMMIT_LSN: &str = "ffffffff:ffffffff:ffff"; + +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +pub struct SqlServerOffset { + // https://learn.microsoft.com/en-us/answers/questions/1328359/how-to-accurately-sequence-change-data-capture-dat + pub change_lsn: String, + pub commit_lsn: String, +} + +// only compare the lsn field +impl PartialOrd for SqlServerOffset { + fn partial_cmp(&self, other: &Self) -> Option { + match self.change_lsn.partial_cmp(&other.change_lsn) { + Some(Ordering::Equal) => self.commit_lsn.partial_cmp(&other.commit_lsn), + other => other, + } + } +} + +impl SqlServerOffset { + pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { + let dbz_offset: DebeziumOffset = serde_json::from_str(offset) + .with_context(|| format!("invalid upstream offset: {}", offset))?; + + Ok(Self { + change_lsn: dbz_offset + .source_offset + .change_lsn + .context("invalid sql server change_lsn")?, + commit_lsn: dbz_offset + .source_offset + .commit_lsn + .context("invalid sql server commit_lsn")?, + }) + } +} + +pub struct SqlServerExternalTable { + column_descs: Vec, + pk_names: Vec, +} + +impl SqlServerExternalTable { + pub async fn connect(config: ExternalTableConfig) -> ConnectorResult { + tracing::debug!("connect to sql server"); + + let mut client_config = Config::new(); + + client_config.host(&config.host); + client_config.database(&config.database); + client_config.port(config.port.parse::().unwrap()); + client_config.authentication(tiberius::AuthMethod::sql_server( + &config.username, + &config.password, + )); + // TODO(kexiang): add ssl support + // TODO(kexiang): use trust_cert_ca, trust_cert is not secure + client_config.trust_cert(); + + let mut client = SqlServerClient::new_with_config(client_config).await?; + + let mut column_descs = vec![]; + let mut pk_names = vec![]; + { + // With `WHERE 1 = 0`, we only fetch the metadata (column names and types) of the table + let sql = Query::new(format!( + "SELECT * FROM {} WHERE 1 = 0", + SqlServerExternalTableReader::get_normalized_table_name(&SchemaTableName { + schema_name: config.schema.clone(), + table_name: config.table.clone(), + }), + )); + + let mut stream = sql.query(&mut client.inner_client).await?; + while let Some(item) = stream.try_next().await? { + match item { + QueryItem::Metadata(meta) => { + for col in meta.columns() { + column_descs.push(ColumnDesc::named( + col.name(), + ColumnId::placeholder(), + type_to_rw_type(&col.column_type())?, + )); + } + } + QueryItem::Row(row) => { + unreachable!("Unexpected row: {:?}, `SELECT * FROM {} WHERE 1 = 0` should never return rows", row, config.table.clone()); + } + } + } + } + { + let sql = Query::new(format!( + "SELECT kcu.COLUMN_NAME + FROM + INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc + JOIN + INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS kcu + ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME AND + tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA AND + tc.TABLE_NAME = kcu.TABLE_NAME + WHERE + tc.CONSTRAINT_TYPE = 'PRIMARY KEY' AND + tc.TABLE_SCHEMA = '{}' AND tc.TABLE_NAME = '{}'", + config.schema, config.table, + )); + + let mut stream = sql.query(&mut client.inner_client).await?; + while let Some(item) = stream.try_next().await? { + match item { + QueryItem::Metadata(_) => {} + QueryItem::Row(row) => { + let pk_name: &str = row.try_get(0)?.unwrap(); + pk_names.push(pk_name.to_string()); + } + } + } + } + + Ok(Self { + column_descs, + pk_names, + }) + } + + pub fn column_descs(&self) -> &Vec { + &self.column_descs + } + + pub fn pk_names(&self) -> &Vec { + &self.pk_names + } +} + +fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult { + let dtype = match col_type { + ColumnType::Bit => DataType::Boolean, + ColumnType::Bitn => DataType::Bytea, + ColumnType::Int1 => DataType::Int16, + ColumnType::Int2 => DataType::Int16, + ColumnType::Int4 => DataType::Int32, + ColumnType::Int8 => DataType::Int64, + ColumnType::Float4 => DataType::Float32, + ColumnType::Float8 => DataType::Float64, + ColumnType::Decimaln | ColumnType::Numericn => DataType::Decimal, + ColumnType::Daten => DataType::Date, + ColumnType::Timen => DataType::Time, + ColumnType::Datetime + | ColumnType::Datetimen + | ColumnType::Datetime2 + | ColumnType::Datetime4 => DataType::Timestamp, + ColumnType::DatetimeOffsetn => DataType::Timestamptz, + ColumnType::NVarchar | ColumnType::NChar | ColumnType::NText | ColumnType::Text => { + DataType::Varchar + } + // Null, Guid, Image, Money, Money4, Intn, Bitn, Floatn, Xml, Udt, SSVariant, BigVarBin, BigVarChar, BigBinary, BigChar + mssql_type => { + // NOTES: user-defined enum type is classified as `Unknown` + tracing::warn!( + "Unknown Sql Server data type: {:?}, map to varchar", + mssql_type + ); + DataType::Varchar + } + }; + Ok(dtype) +} + +#[derive(Debug)] +pub struct SqlServerExternalTableReader { + rw_schema: Schema, + field_names: String, + client: tokio::sync::Mutex, +} + +impl ExternalTableReader for SqlServerExternalTableReader { + async fn current_cdc_offset(&self) -> ConnectorResult { + let mut client = self.client.lock().await; + // start a transaction to read max start_lsn. + let row = client + .inner_client + .simple_query(String::from("SELECT sys.fn_cdc_get_max_lsn()")) + .await? + .into_row() + .await? + .expect("No result returned by `SELECT sys.fn_cdc_get_max_lsn()`"); + // An example of change_lsn or commit_lsn: "00000027:00000ac0:0002" from debezium + // sys.fn_cdc_get_max_lsn() returns a 10 bytes array, we convert it to a hex string here. + let max_lsn = match row.try_get::<&[u8], usize>(0)? { + Some(bytes) => { + let mut hex_string = String::with_capacity(bytes.len() * 2 + 2); + assert_eq!( + bytes.len(), + 10, + "sys.fn_cdc_get_max_lsn() should return a 10 bytes array." + ); + for byte in &bytes[0..4] { + hex_string.push_str(&format!("{:02x}", byte)); + } + hex_string.push(':'); + for byte in &bytes[4..8] { + hex_string.push_str(&format!("{:02x}", byte)); + } + hex_string.push(':'); + for byte in &bytes[8..10] { + hex_string.push_str(&format!("{:02x}", byte)); + } + hex_string + } + None => bail!("None is returned by `SELECT sys.fn_cdc_get_max_lsn()`, please ensure Sql Server Agent is running."), + }; + + tracing::debug!("current max_lsn: {}", max_lsn); + + Ok(CdcOffset::SqlServer(SqlServerOffset { + change_lsn: max_lsn, + commit_lsn: MAX_COMMIT_LSN.into(), + })) + } + + fn snapshot_read( + &self, + table_name: SchemaTableName, + start_pk: Option, + primary_keys: Vec, + limit: u32, + ) -> BoxStream<'_, ConnectorResult> { + self.snapshot_read_inner(table_name, start_pk, primary_keys, limit) + } +} + +impl SqlServerExternalTableReader { + pub async fn new( + config: ExternalTableConfig, + rw_schema: Schema, + pk_indices: Vec, + ) -> ConnectorResult { + tracing::info!( + ?rw_schema, + ?pk_indices, + "create sql server external table reader" + ); + let mut client_config = Config::new(); + + client_config.host(&config.host); + client_config.database(&config.database); + client_config.port(config.port.parse::().unwrap()); + client_config.authentication(tiberius::AuthMethod::sql_server( + &config.username, + &config.password, + )); + // TODO(kexiang): add ssl support + // TODO(kexiang): use trust_cert_ca, trust_cert is not secure + client_config.trust_cert(); + + let client = SqlServerClient::new_with_config(client_config).await?; + + let field_names = rw_schema + .fields + .iter() + .map(|f| Self::quote_column(&f.name)) + .join(","); + + Ok(Self { + rw_schema, + field_names, + client: tokio::sync::Mutex::new(client), + }) + } + + pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc { + Box::new(move |offset| { + Ok(CdcOffset::SqlServer( + SqlServerOffset::parse_debezium_offset(offset)?, + )) + }) + } + + #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)] + async fn snapshot_read_inner( + &self, + table_name: SchemaTableName, + start_pk_row: Option, + primary_keys: Vec, + limit: u32, + ) { + let order_key = primary_keys + .iter() + .map(|col| Self::quote_column(col)) + .join(","); + let mut sql = Query::new(if start_pk_row.is_none() { + format!( + "SELECT {} FROM {} ORDER BY {} OFFSET 0 ROWS FETCH NEXT {limit} ROWS ONLY", + self.field_names, + Self::get_normalized_table_name(&table_name), + order_key, + ) + } else { + let filter_expr = Self::filter_expression(&primary_keys); + format!( + "SELECT {} FROM {} WHERE {} ORDER BY {} OFFSET 0 ROWS FETCH LIMIT {limit} ROWS ONLY", + self.field_names, + Self::get_normalized_table_name(&table_name), + filter_expr, + order_key, + ) + }); + + let mut client = self.client.lock().await; + + // FIXME(kexiang): Set session timezone to UTC + if let Some(pk_row) = start_pk_row { + let params: Vec> = pk_row.into_iter().collect(); + for param in params { + // primary key should not be null, so it's safe to unwrap + sql.bind(ScalarImplTiberiusWrapper::from(param.unwrap())); + } + } + + let stream = sql.query(&mut client.inner_client).await?.into_row_stream(); + + let row_stream = stream.map(|res| { + // convert sql server row into OwnedRow + let mut row = res?; + Ok::<_, ConnectorError>(sql_server_row_to_owned_row(&mut row, &self.rw_schema)) + }); + + pin_mut!(row_stream); + + #[for_await] + for row in row_stream { + let row = row?; + yield row; + } + } + + pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String { + format!( + "\"{}\".\"{}\"", + table_name.schema_name, table_name.table_name + ) + } + + // sql server cannot leverage the given key to narrow down the range of scan, + // we need to rewrite the comparison conditions by our own. + // (a, b) > (x, y) => ("a" > @P1) OR (("a" = @P1) AND ("b" > @P2)) + fn filter_expression(columns: &[String]) -> String { + let mut conditions = vec![]; + // push the first condition + conditions.push(format!("({} > @P{})", Self::quote_column(&columns[0]), 1)); + for i in 2..=columns.len() { + // '=' condition + let mut condition = String::new(); + for (j, col) in columns.iter().enumerate().take(i - 1) { + if j == 0 { + condition.push_str(&format!("({} = @P{})", Self::quote_column(col), j + 1)); + } else { + condition.push_str(&format!( + " AND ({} = @P{})", + Self::quote_column(col), + j + 1 + )); + } + } + // '>' condition + condition.push_str(&format!( + " AND ({} > @P{})", + Self::quote_column(&columns[i - 1]), + i + )); + conditions.push(format!("({})", condition)); + } + if columns.len() > 1 { + conditions.join(" OR ") + } else { + conditions.join("") + } + } + + fn quote_column(column: &str) -> String { + format!("\"{}\"", column) + } +} + +#[cfg(test)] +mod tests { + use crate::source::cdc::external::SqlServerExternalTableReader; + + #[test] + fn test_sql_server_filter_expr() { + let cols = vec!["id".to_string()]; + let expr = SqlServerExternalTableReader::filter_expression(&cols); + assert_eq!(expr, "(\"id\" > @P1)"); + + let cols = vec!["aa".to_string(), "bb".to_string(), "cc".to_string()]; + let expr = SqlServerExternalTableReader::filter_expression(&cols); + assert_eq!( + expr, + "(\"aa\" > @P1) OR ((\"aa\" = @P1) AND (\"bb\" > @P2)) OR ((\"aa\" = @P1) AND (\"bb\" = @P2) AND (\"cc\" > @P3))" + ); + } +} diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index c86450b59471..605dd47ecd0e 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -50,6 +50,7 @@ pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME; pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME; pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME; pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME; +pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME; pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static { const CDC_CONNECTOR_NAME: &'static str; @@ -65,6 +66,7 @@ impl<'a> From<&'a str> for CdcSourceType { POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres, CITUS_CDC_CONNECTOR => CdcSourceType::Citus, MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb, + SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer, _ => CdcSourceType::Unspecified, } } @@ -77,6 +79,7 @@ impl CdcSourceType { CdcSourceType::Postgres => "Postgres", CdcSourceType::Citus => "Citus", CdcSourceType::Mongodb => "MongoDB", + CdcSourceType::SqlServer => "SQL Server", CdcSourceType::Unspecified => "Unspecified", } } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 75f8165f7928..135daa5e0480 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -167,7 +167,10 @@ impl SplitReader for CdcSplitReader { tracing::info!(?source_id, "cdc connector started"); let instance = match T::source_type() { - CdcSourceType::Mysql | CdcSourceType::Postgres | CdcSourceType::Mongodb => Self { + CdcSourceType::Mysql + | CdcSourceType::Postgres + | CdcSourceType::Mongodb + | CdcSourceType::SqlServer => Self { source_id: split.split_id() as u64, start_offset: split.start_offset().clone(), server_addr: None, diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index d38200910f7e..d6415135ea5c 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -92,6 +92,11 @@ pub struct MongoDbCdcSplit { pub inner: CdcSplitBase, } +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] +pub struct SqlServerCdcSplit { + pub inner: CdcSplitBase, +} + impl MySqlCdcSplit { pub fn new(split_id: u32, start_offset: Option) -> Self { let split = CdcSplitBase { @@ -214,6 +219,38 @@ impl CdcSplitTrait for MongoDbCdcSplit { } } +impl SqlServerCdcSplit { + pub fn new(split_id: u32, start_offset: Option) -> Self { + let split = CdcSplitBase { + split_id, + start_offset, + snapshot_done: false, + }; + Self { inner: split } + } +} + +impl CdcSplitTrait for SqlServerCdcSplit { + fn split_id(&self) -> u32 { + self.inner.split_id + } + + fn start_offset(&self) -> &Option { + &self.inner.start_offset + } + + fn is_snapshot_done(&self) -> bool { + self.inner.snapshot_done + } + + fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> { + // if snapshot_done is already true, it will remain true + self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?; + self.inner.start_offset = Some(last_seen_offset); + Ok(()) + } +} + /// We use this struct to wrap the specific split, which act as an interface to other modules #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] pub struct DebeziumCdcSplit { @@ -223,18 +260,19 @@ pub struct DebeziumCdcSplit { pub postgres_split: Option, pub citus_split: Option, pub mongodb_split: Option, + pub sql_server_split: Option, #[serde(skip)] pub _phantom: PhantomData, } macro_rules! dispatch_cdc_split_inner { - ($dbz_split:expr, $as_type:tt, {$($cdc_source_type:tt),*}, $body:expr) => { + ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => { match T::source_type() { $( CdcSourceType::$cdc_source_type => { $crate::paste! { - $dbz_split.[<$cdc_source_type:lower _split>] + $dbz_split.[<$cdc_source_split>] .[]() .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist")) .$body @@ -251,7 +289,13 @@ macro_rules! dispatch_cdc_split_inner { // call corresponding split method of the specific cdc source type macro_rules! dispatch_cdc_split { ($dbz_split:expr, $as_type:tt, $body:expr) => { - dispatch_cdc_split_inner!($dbz_split, $as_type, {Mysql, Postgres, Citus, Mongodb}, $body) + dispatch_cdc_split_inner!($dbz_split, $as_type, { + {Mysql, mysql_split}, + {Postgres, postgres_split}, + {Citus, citus_split}, + {Mongodb, mongodb_split}, + {SqlServer, sql_server_split} + }, $body) } } @@ -280,6 +324,7 @@ impl DebeziumCdcSplit { postgres_split: None, citus_split: None, mongodb_split: None, + sql_server_split: None, _phantom: PhantomData, }; match T::source_type() { @@ -299,6 +344,10 @@ impl DebeziumCdcSplit { let split = MongoDbCdcSplit::new(split_id, start_offset); ret.mongodb_split = Some(split); } + CdcSourceType::SqlServer => { + let split = SqlServerCdcSplit::new(split_id, start_offset); + ret.sql_server_split = Some(split); + } CdcSourceType::Unspecified => { unreachable!("invalid debezium split") } diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 2258617f84b4..10d549df0c49 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -107,9 +107,7 @@ impl SourceReader { /// Refer to `WaitCheckpointWorker` for more details. pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult> { Ok(match &self.config { - ConnectorProperties::PostgresCdc(_prop) => { - Some(WaitCheckpointTask::CommitCdcOffset(None)) - } + ConnectorProperties::PostgresCdc(_) => Some(WaitCheckpointTask::CommitCdcOffset(None)), ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage( prop.subscription_client().await?, vec![], diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index bb96a6298f5d..eef5ccbd9cbf 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -113,6 +113,16 @@ pub trait WithPropertiesExt: Get + Sized { self.is_cdc_connector() && CdcTableType::from_properties(self).can_backfill() } + /// Tables with MySQL and PostgreSQL connectors are maintained for backward compatibility. + /// The newly added SQL Server CDC connector is only supported when created as shared. + fn is_shareable_only_cdc_connector(&self) -> bool { + self.is_cdc_connector() && CdcTableType::from_properties(self).shareable_only() + } + + fn enable_transaction_metadata(&self) -> bool { + CdcTableType::from_properties(self).enable_transaction_metadata() + } + #[inline(always)] fn is_iceberg_connector(&self) -> bool { let Some(connector) = self.get_connector() else { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 487c972f9c5a..32628a1abe8e 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, }; +use risingwave_common::license::Feature; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_connector::parser::additional_columns::{ @@ -45,7 +46,7 @@ use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::cdc::{ CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, - MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, + MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR, }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; @@ -1094,10 +1095,24 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::None => vec![Encode::None], - ) + ), + SQL_SERVER_CDC_CONNECTOR => hashmap!( + Format::Debezium => vec![Encode::Json], + // support source stream job + Format::Plain => vec![Encode::Json], + ), )) }); +pub fn validate_license(connector: &str) -> Result<()> { + if connector == SQL_SERVER_CDC_CONNECTOR { + Feature::SqlServerCdcSource + .check_available() + .map_err(|e| anyhow::anyhow!(e))?; + } + Ok(()) +} + pub fn validate_compatibility( source_schema: &ConnectorSchema, props: &mut BTreeMap, @@ -1115,6 +1130,8 @@ pub fn validate_compatibility( CONNECTORS_COMPATIBLE_FORMATS.keys() ))) })?; + + validate_license(&connector)?; if connector != KAFKA_CONNECTOR { let res = match (&source_schema.format, &source_schema.row_encode) { (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => { @@ -1193,6 +1210,12 @@ pub fn validate_compatibility( props.insert("publication.create.enable".into(), "true".into()); } } + + if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") { + // Default schema name is "dbo" + props.insert("schema.name".into(), "dbo".into()); + } + Ok(()) } @@ -1374,13 +1397,22 @@ pub fn bind_connector_props( let mut with_properties = handler_args.with_options.clone().into_connector_props(); validate_compatibility(source_schema, &mut with_properties)?; let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); + + if !is_create_source && with_properties.is_shareable_only_cdc_connector() { + return Err(RwError::from(ProtocolError(format!( + "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead", + with_properties.get_connector().unwrap(), + )))); + } if is_create_source && create_cdc_source_job { // set connector to backfill mode with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into()); // enable cdc sharing mode, which will capture all tables in the given `database.name` with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into()); // enable transactional cdc - with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into()); + if with_properties.enable_transaction_metadata() { + with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into()); + } with_properties.insert( CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(), handler_args diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 9855843ad001..ac58bb808867 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -855,7 +855,7 @@ fn derive_connect_properties( source_with_properties: &WithOptionsSecResolved, external_table_name: String, ) -> Result { - use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR}; + use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR}; // we should remove the prefix from `full_table_name` let mut connect_properties = source_with_properties.clone(); if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) { @@ -880,6 +880,16 @@ fn derive_connect_properties( table_name } + SQL_SERVER_CDC_CONNECTOR => { + let (schema_name, table_name) = external_table_name + .split_once('.') + .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'dbo.table'"))?; + + // insert 'schema.name' into connect properties + connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); + + table_name + } _ => { return Err(RwError::from(anyhow!( "connector {} is not supported for cdc table", diff --git a/src/license/src/feature.rs b/src/license/src/feature.rs index e7cef4d1ffcf..00c6d200aa0a 100644 --- a/src/license/src/feature.rs +++ b/src/license/src/feature.rs @@ -50,6 +50,7 @@ macro_rules! for_all_features { { SecretManagement, Paid, "Secret management." }, { CdcTableSchemaMap, Paid, "Automatically map upstream schema to CDC Table."}, { SqlServerSink, Paid, "Sink data from RisingWave to SQL Server." }, + { SqlServerCdcSource, Paid, "CDC source connector for Sql Server." }, } }; }