diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt index 57275043da202..70f35299aa177 100644 --- a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt @@ -254,11 +254,17 @@ CREATE TABLE shared_orders ( ) from mssql_source table 'dbo.orders'; statement ok -CREATE TABLE shared_single_type ( - id INT, - c_time time, - PRIMARY KEY (id) -) from mssql_source table 'dbo.single_type'; +CREATE TABLE shared_single_type (*) from mssql_source table 'dbo.single_type'; + +statement ok +CREATE TABLE shared_sqlserver_all_data_types ( + * +) from mssql_source table 'dbo.sqlserver_all_data_types'; + +sleep 5s + +statement ok +DROP table shared_sqlserver_all_data_types; statement ok CREATE TABLE shared_sqlserver_all_data_types ( @@ -271,12 +277,18 @@ CREATE TABLE shared_sqlserver_all_data_types ( c_decimal DECIMAL, c_real REAL, c_float FLOAT, + c_char VARCHAR, c_varchar VARCHAR, + c_nvarchar VARCHAR, + c_ntext VARCHAR, + c_binary BYTEA, c_varbinary BYTEA, + c_uniqueidentifier VARCHAR, c_date DATE, c_time TIME, c_datetime2 TIMESTAMP, c_datetimeoffset TIMESTAMPTZ, + c_xml varchar, PRIMARY KEY (id) ) from mssql_source table 'dbo.sqlserver_all_data_types'; @@ -306,7 +318,12 @@ CREATE TABLE upper_table ( "ID" INT, "Name" VARCHAR, PRIMARY KEY ("ID") -) from upper_mssql_source table 'UpperSchema.UpperTable'; +) +INCLUDE TIMESTAMP AS commit_ts +INCLUDE DATABASE_NAME as database_name +INCLUDE SCHEMA_NAME as schema_name +INCLUDE TABLE_NAME as table_name +from upper_mssql_source table 'UpperSchema.UpperTable'; statement ok create materialized view shared_orders_cnt as select count(*) as cnt from shared_orders; @@ -349,11 +366,46 @@ SELECT * from shared_single_type order by id; 3 23:59:59.999 query TTTTTTT -SELECT * from shared_sqlserver_all_data_types order by id; +SELECT id, c_bit, c_tinyint, c_smallint, c_int, c_bigint from shared_sqlserver_all_data_types order by id; +---- +1 f 0 0 0 0 +2 t 255 -32768 -2147483648 -9223372036854775808 +3 t 127 32767 2147483647 9223372036854775807 + +query TTTT +SELECT id, c_decimal, c_real, c_float from shared_sqlserver_all_data_types order by id; +---- +1 0 0 0 +2 -10 -10000 -10000 +3 -10 10000 10000 + +query TTTTTTT +SELECT id, c_varchar, c_nvarchar, c_ntext, c_binary, c_varbinary, c_uniqueidentifier from shared_sqlserver_all_data_types order by id; +---- +1 (empty) 中 中 \xff000000 NULL NULL +2 aa 🌹 🌹 NULL \xff 6F9619FF-8B86-D011-B42D-00C04FC964FF +3 zzzz 🌹👍 🌹👍 \xffffffff \xffffffff 6F9619FF-8B86-D011-B42D-00C04FC964FF + +query TTT +SELECT id, char_length(c_char) from shared_sqlserver_all_data_types order by id; +---- +1 4 +2 4 +3 4 + +query TTTTTTT +SELECT id, c_date, c_time, c_datetime2, c_datetimeoffset from shared_sqlserver_all_data_types order by id; ---- -1 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 -2 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 -3 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 +1 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +2 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +3 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 + +query TTTTTTT +SELECT id, c_xml from shared_sqlserver_all_data_types order by id; +---- +1 John Doe30 +2 Jane Doe28 +3 Jane Doe # ------------ kill cluster ------------ # system ok @@ -410,20 +462,80 @@ SELECT * from shared_single_type order by id; 13 23:59:59.999 query TTTTTTT -SELECT * from shared_sqlserver_all_data_types order by id; +SELECT id, c_bit, c_tinyint, c_smallint, c_int, c_bigint from shared_sqlserver_all_data_types order by id; +---- +1 f 0 0 0 0 +2 t 255 -32768 -2147483648 -9223372036854775808 +3 t 127 32767 2147483647 9223372036854775807 +11 f 0 0 0 0 +12 t 255 -32768 -2147483648 -9223372036854775808 +13 t 127 32767 2147483647 9223372036854775807 + +query TTTT +SELECT id, c_decimal, c_real, c_float from shared_sqlserver_all_data_types order by id; +---- +1 0 0 0 +2 -10 -10000 -10000 +3 -10 10000 10000 +11 0 0 0 +12 -10 -10000 -10000 +13 -10 10000 10000 + +query TTTTTTT +SELECT id, c_varchar, c_nvarchar, c_ntext, c_binary, c_varbinary, c_uniqueidentifier from shared_sqlserver_all_data_types order by id; ---- -1 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 -2 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 -3 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 -11 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 -12 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 -13 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 - -query TT -SELECT * from upper_table order by "ID"; +1 (empty) 中 中 \xff000000 NULL NULL +2 aa 🌹 🌹 NULL \xff 6F9619FF-8B86-D011-B42D-00C04FC964FF +3 zzzz 🌹👍 🌹👍 \xffffffff \xffffffff 6F9619FF-8B86-D011-B42D-00C04FC964FF +11 (empty) 中 中 \xff000000 NULL NULL +12 aa 🌹 🌹 NULL \xff 6F9619FF-8B86-D011-B42D-00C04FC964FF +13 zzzz 🌹👍 🌹👍 \xffffffff \xffffffff 6F9619FF-8B86-D011-B42D-00C04FC964FF + +query TTT +SELECT id, char_length(c_char) from shared_sqlserver_all_data_types order by id; +---- +1 4 +2 4 +3 4 +11 4 +12 4 +13 4 + +query TTTTTTT +SELECT id, c_date, c_time, c_datetime2, c_datetimeoffset from shared_sqlserver_all_data_types order by id; +---- +1 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +2 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +3 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 +11 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +12 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +13 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 + +query TTTTTTT +SELECT id, c_xml from shared_sqlserver_all_data_types order by id; +---- +1 John Doe30 +2 Jane Doe28 +3 Jane Doe +11 John Doe30 +12 Jane Doe28 +13 Jane Doe + +query TTTTTTTT +SELECT "ID", "Name", database_name, schema_name, table_name from upper_table order by "ID"; +---- +1 Alice UpperDB UpperSchema UpperTable +11 Alice UpperDB UpperSchema UpperTable + +query TTTTTTTT +SELECT commit_ts from upper_table where "ID" = 1; +---- +1970-01-01 00:00:00+00:00 + +query TTTTTTTT +SELECT commit_ts > '2024-08-16 00:00:00.000+00:00' from upper_table where "ID" = 11; ---- -1 Alice -11 Alice +t # ------------ drop stage ------------ statement ok diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql index e5089aa86d020..caaf8eeee4b4f 100644 --- a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql @@ -16,9 +16,8 @@ VALUES INSERT INTO single_type VALUES (13, '23:59:59.999') +INSERT INTO sqlserver_all_data_types VALUES (11, 'False', 0, 0, 0, 0, 0, 0, 0, '', '', N'中', N'中', 0xff, NULL, NULL, '2001-01-01', '00:00:00', '2001-01-01 00:00:00', '2001-01-01 00:00:00', 'John Doe30'); -INSERT INTO sqlserver_all_data_types VALUES (11, 'False', 0, 0, 0, 0, 0, 0, 0, '', NULL, '2001-01-01', '00:00:00', '2001-01-01 00:00:00', '2001-01-01 00:00:00'); +INSERT INTO sqlserver_all_data_types VALUES (12, 'True', 255, -32768, -2147483648, -9223372036854775808, -10.0, -9999.999999, -10000.0, 'aa', 'aa', N'🌹', N'🌹', NULL, 0xff, '6f9619ff-8b86-d011-b42d-00c04fc964ff', '1990-01-01', '13:59:59.123', '2000-01-01 11:00:00.123', '1990-01-01 00:00:01.123', ' Jane Doe 28 '); -INSERT INTO sqlserver_all_data_types VALUES (12, 'True', 255, -32768, -2147483648, -9223372036854775808, -10.0, -9999.999999, -10000.0, 'aa', 0xff, '1990-01-01', '13:59:59.123', '2000-01-01 11:00:00.123', '1990-01-01 00:00:01.123'); - -INSERT INTO sqlserver_all_data_types VALUES (13, 'True', 127, 32767, 2147483647, 9223372036854775807, -10.0, 9999.999999, 10000.0, 'zzzz', 0xffffffff, '2999-12-31', '23:59:59.999', '2099-12-31 23:59:59.999', '2999-12-31 23:59:59.999') +INSERT INTO sqlserver_all_data_types VALUES (13, 'True', 127, 32767, 2147483647, 9223372036854775807, -10.0, 9999.999999, 10000.0, 'zzzz', 'zzzz', N'🌹👍', N'🌹👍', 0xffffffff, 0xffffffff, '6F9619FF-8B86-D011-B42D-00C04FC964FF', '2999-12-31', '23:59:59.999', '2099-12-31 23:59:59.999', '2999-12-31 23:59:59.999', 'Jane Doe') diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql index 792f1ddae1034..64999ffc8e8a7 100644 --- a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql @@ -52,12 +52,18 @@ CREATE TABLE sqlserver_all_data_types ( c_decimal DECIMAL(28), c_real real, c_float float, + c_char char(4), c_varchar varchar(4), + c_nvarchar nvarchar(4), + c_ntext ntext, + c_binary binary(4), c_varbinary varbinary(4), + c_uniqueidentifier uniqueidentifier, c_date date, c_time time, c_datetime2 datetime2, - c_datetimeoffset datetimeoffset + c_datetimeoffset datetimeoffset, + c_xml xml ); EXEC sys.sp_cdc_enable_table @@ -65,11 +71,11 @@ EXEC sys.sp_cdc_enable_table @source_name = 'sqlserver_all_data_types', @role_name = NULL; -INSERT INTO sqlserver_all_data_types VALUES (1, 'False', 0, 0, 0, 0, 0, 0, 0, '', NULL, '2001-01-01', '00:00:00', '2001-01-01 00:00:00', '2001-01-01 00:00:00'); +INSERT INTO sqlserver_all_data_types VALUES (1, 'False', 0, 0, 0, 0, 0, 0, 0, '', '', N'中', N'中', 0xff, NULL, NULL, '2001-01-01', '00:00:00', '2001-01-01 00:00:00', '2001-01-01 00:00:00', 'John Doe30'); -INSERT INTO sqlserver_all_data_types VALUES (2, 'True', 255, -32768, -2147483648, -9223372036854775808, -10.0, -9999.999999, -10000.0, 'aa', 0xff, '1990-01-01', '13:59:59.123', '2000-01-01 11:00:00.123', '1990-01-01 00:00:01.123'); +INSERT INTO sqlserver_all_data_types VALUES (2, 'True', 255, -32768, -2147483648, -9223372036854775808, -10.0, -9999.999999, -10000.0, 'aa', 'aa', N'🌹', N'🌹', NULL, 0xff, '6f9619ff-8b86-d011-b42d-00c04fc964ff', '1990-01-01', '13:59:59.123', '2000-01-01 11:00:00.123', '1990-01-01 00:00:01.123', ' Jane Doe 28 '); -INSERT INTO sqlserver_all_data_types VALUES (3, 'True', 127, 32767, 2147483647, 9223372036854775807, -10.0, 9999.999999, 10000.0, 'zzzz', 0xffffffff, '2999-12-31', '23:59:59.999', '2099-12-31 23:59:59.999', '2999-12-31 23:59:59.999') +INSERT INTO sqlserver_all_data_types VALUES (3, 'True', 127, 32767, 2147483647, 9223372036854775807, -10.0, 9999.999999, 10000.0, 'zzzz', 'zzzz', N'🌹👍', N'🌹👍', 0xffffffff, 0xffffffff, '6F9619FF-8B86-D011-B42D-00C04FC964FF', '2999-12-31', '23:59:59.999', '2099-12-31 23:59:59.999', '2999-12-31 23:59:59.999', 'Jane Doe') -- Table without enabling CDC CREATE TABLE orders_without_cdc ( diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java index e2b58fad4195f..aa00efea09ae1 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java @@ -289,6 +289,7 @@ private boolean isDataTypeCompatible(String ssDataType, Data.DataType.TypeName t return Data.DataType.TypeName.INT16_VALUE <= val && val <= Data.DataType.TypeName.INT64_VALUE; case "integer": + case "int": return Data.DataType.TypeName.INT32_VALUE <= val && val <= Data.DataType.TypeName.INT64_VALUE; case "bigint": @@ -305,9 +306,15 @@ private boolean isDataTypeCompatible(String ssDataType, Data.DataType.TypeName t case "decimal": case "numeric": return val == Data.DataType.TypeName.DECIMAL_VALUE; + case "char": + case "nchar": case "varchar": - case "character varying": + case "nvarchar": + case "text": + case "ntext": + case "uniqueidentifier": return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "binary": case "varbinary": return val == Data.DataType.TypeName.BYTEA_VALUE; case "date": diff --git a/src/connector/src/parser/sql_server.rs b/src/connector/src/parser/sql_server.rs index 91f78b41ffb6b..e1ae9a8eff86e 100644 --- a/src/connector/src/parser/sql_server.rs +++ b/src/connector/src/parser/sql_server.rs @@ -21,7 +21,9 @@ use risingwave_common::row::OwnedRow; use risingwave_common::types::{Date, Decimal, ScalarImpl, Time, Timestamp, Timestamptz}; use rust_decimal::Decimal as RustDecimal; use thiserror_ext::AsReport; +use tiberius::xml::XmlData; use tiberius::Row; +use uuid::Uuid; use crate::parser::util::log_error; @@ -74,7 +76,7 @@ macro_rules! impl_chrono_tiberius_wrapper { fn from_sql( value: &'a tiberius::ColumnData<'static>, ) -> tiberius::Result> { - let instant = <$chrono as tiberius::FromSql>::from_sql(value)?; + let instant = <$chrono>::from_sql(value)?; let time = instant.map($variant_name::from).map($wrapper_name::from); tiberius::Result::Ok(time) } @@ -101,7 +103,7 @@ impl<'a> tiberius::FromSql<'a> for DecimalTiberiusWrapper { // TODO(kexiang): will sql server have inf/-inf/nan for decimal? fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result> { tiberius::Result::Ok( - ::from_sql(value)? + RustDecimal::from_sql(value)? .map(Decimal::Normalized) .map(DecimalTiberiusWrapper::from), ) @@ -116,7 +118,7 @@ impl<'a> tiberius::IntoSql<'a> for TimestamptzTiberiusWrapper { impl<'a> tiberius::FromSql<'a> for TimestamptzTiberiusWrapper { fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result> { - let instant = as tiberius::FromSql>::from_sql(value)?; + let instant = DateTime::::from_sql(value)?; let time = instant .map(Timestamptz::from) .map(TimestamptzTiberiusWrapper::from); @@ -160,68 +162,57 @@ impl<'a> tiberius::FromSql<'a> for TimestamptzTiberiusWrapper { impl<'a> tiberius::FromSql<'a> for ScalarImplTiberiusWrapper { fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result> { Ok(match &value { - tiberius::ColumnData::U8(_) => ::from_sql(value)? + tiberius::ColumnData::U8(_) => u8::from_sql(value)? .map(|v| ScalarImplTiberiusWrapper::from(ScalarImpl::from(v as i16))), - tiberius::ColumnData::I16(_) => ::from_sql(value)? + tiberius::ColumnData::I16(_) => i16::from_sql(value)? .map(ScalarImpl::from) .map(ScalarImplTiberiusWrapper::from), - tiberius::ColumnData::I32(_) => ::from_sql(value)? + tiberius::ColumnData::I32(_) => i32::from_sql(value)? .map(ScalarImpl::from) .map(ScalarImplTiberiusWrapper::from), - tiberius::ColumnData::I64(_) => ::from_sql(value)? + tiberius::ColumnData::I64(_) => i64::from_sql(value)? .map(ScalarImpl::from) .map(ScalarImplTiberiusWrapper::from), - tiberius::ColumnData::F32(_) => ::from_sql(value)? + tiberius::ColumnData::F32(_) => f32::from_sql(value)? .map(ScalarImpl::from) .map(ScalarImplTiberiusWrapper::from), - tiberius::ColumnData::F64(_) => ::from_sql(value)? + tiberius::ColumnData::F64(_) => f64::from_sql(value)? .map(ScalarImpl::from) .map(ScalarImplTiberiusWrapper::from), - tiberius::ColumnData::Bit(_) => ::from_sql(value)? + tiberius::ColumnData::Bit(_) => bool::from_sql(value)? .map(ScalarImpl::from) .map(ScalarImplTiberiusWrapper::from), - tiberius::ColumnData::String(_) => <&str as tiberius::FromSql>::from_sql(value)? + tiberius::ColumnData::String(_) => <&str>::from_sql(value)? .map(ScalarImpl::from) .map(ScalarImplTiberiusWrapper::from), - tiberius::ColumnData::Numeric(_) => { - ::from_sql(value)? - .map(|w| ScalarImpl::from(w.0)) - .map(ScalarImplTiberiusWrapper::from) - } + tiberius::ColumnData::Numeric(_) => DecimalTiberiusWrapper::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from), tiberius::ColumnData::DateTime(_) | tiberius::ColumnData::DateTime2(_) - | tiberius::ColumnData::SmallDateTime(_) => { - ::from_sql(value)? - .map(|w| ScalarImpl::from(w.0)) - .map(ScalarImplTiberiusWrapper::from) - } - tiberius::ColumnData::Time(_) => { - ::from_sql(value)? - .map(|w| ScalarImpl::from(w.0)) - .map(ScalarImplTiberiusWrapper::from) - } - tiberius::ColumnData::Date(_) => { - ::from_sql(value)? - .map(|w| ScalarImpl::from(w.0)) - .map(ScalarImplTiberiusWrapper::from) - } - tiberius::ColumnData::DateTimeOffset(_) => { - ::from_sql(value)? - .map(|w| ScalarImpl::from(w.0)) - .map(ScalarImplTiberiusWrapper::from) - } - tiberius::ColumnData::Binary(_) => <&[u8] as tiberius::FromSql>::from_sql(value)? + | tiberius::ColumnData::SmallDateTime(_) => TimestampTiberiusWrapper::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::Time(_) => TimeTiberiusWrapper::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::Date(_) => DateTiberiusWrapper::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::DateTimeOffset(_) => TimestamptzTiberiusWrapper::from_sql(value)? + .map(|w| ScalarImpl::from(w.0)) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::Binary(_) => <&[u8]>::from_sql(value)? + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::Guid(_) => ::from_sql(value)? + .map(|uuid| uuid.to_string().to_uppercase()) + .map(ScalarImpl::from) + .map(ScalarImplTiberiusWrapper::from), + tiberius::ColumnData::Xml(_) => <&XmlData>::from_sql(value)? + .map(|xml| xml.clone().into_string()) .map(ScalarImpl::from) .map(ScalarImplTiberiusWrapper::from), - tiberius::ColumnData::Guid(_) | tiberius::ColumnData::Xml(_) => { - return Err(tiberius::error::Error::Conversion( - format!( - "the sql server decoding for {:?} is unsupported", - value.type_name() - ) - .into(), - )) - } }) } } diff --git a/src/connector/src/source/cdc/external/sql_server.rs b/src/connector/src/source/cdc/external/sql_server.rs index 76b8e8e09fa41..02cf1088909e3 100644 --- a/src/connector/src/source/cdc/external/sql_server.rs +++ b/src/connector/src/source/cdc/external/sql_server.rs @@ -14,7 +14,7 @@ use std::cmp::Ordering; -use anyhow::Context; +use anyhow::{anyhow, Context}; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; @@ -24,7 +24,7 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, ScalarImpl}; use serde_derive::{Deserialize, Serialize}; -use tiberius::{ColumnType, Config, Query, QueryItem}; +use tiberius::{Config, Query, QueryItem}; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::{sql_server_row_to_owned_row, ScalarImplTiberiusWrapper}; @@ -99,29 +99,31 @@ impl SqlServerExternalTable { let mut column_descs = vec![]; let mut pk_names = vec![]; { - // With `WHERE 1 = 0`, we only fetch the metadata (column names and types) of the table let sql = Query::new(format!( - "SELECT * FROM {} WHERE 1 = 0", - SqlServerExternalTableReader::get_normalized_table_name(&SchemaTableName { - schema_name: config.schema.clone(), - table_name: config.table.clone(), - }), + "SELECT + COLUMN_NAME, + DATA_TYPE + FROM + INFORMATION_SCHEMA.COLUMNS + WHERE + TABLE_SCHEMA = '{}' + AND TABLE_NAME = '{}'", + config.schema.clone(), + config.table.clone(), )); let mut stream = sql.query(&mut client.inner_client).await?; while let Some(item) = stream.try_next().await? { match item { - QueryItem::Metadata(meta) => { - for col in meta.columns() { - column_descs.push(ColumnDesc::named( - col.name(), - ColumnId::placeholder(), - type_to_rw_type(&col.column_type())?, - )); - } - } + QueryItem::Metadata(_) => {} QueryItem::Row(row) => { - unreachable!("Unexpected row: {:?}, `SELECT * FROM {} WHERE 1 = 0` should never return rows", row, config.table.clone()); + let col_name: &str = row.try_get(0)?.unwrap(); + let col_type: &str = row.try_get(1)?.unwrap(); + column_descs.push(ColumnDesc::named( + col_name, + ColumnId::placeholder(), + type_to_rw_type(col_type, col_name)?, + )); } } } @@ -169,35 +171,29 @@ impl SqlServerExternalTable { } } -fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult { - let dtype = match col_type { - ColumnType::Bit => DataType::Boolean, - ColumnType::Bitn => DataType::Bytea, - ColumnType::Int1 => DataType::Int16, - ColumnType::Int2 => DataType::Int16, - ColumnType::Int4 => DataType::Int32, - ColumnType::Int8 => DataType::Int64, - ColumnType::Float4 => DataType::Float32, - ColumnType::Float8 => DataType::Float64, - ColumnType::Decimaln | ColumnType::Numericn => DataType::Decimal, - ColumnType::Daten => DataType::Date, - ColumnType::Timen => DataType::Time, - ColumnType::Datetime - | ColumnType::Datetimen - | ColumnType::Datetime2 - | ColumnType::Datetime4 => DataType::Timestamp, - ColumnType::DatetimeOffsetn => DataType::Timestamptz, - ColumnType::NVarchar | ColumnType::NChar | ColumnType::NText | ColumnType::Text => { - DataType::Varchar - } - // Null, Guid, Image, Money, Money4, Intn, Bitn, Floatn, Xml, Udt, SSVariant, BigVarBin, BigVarChar, BigBinary, BigChar +fn type_to_rw_type(col_type: &str, col_name: &str) -> ConnectorResult { + let dtype = match col_type.to_lowercase().as_str() { + "bit" => DataType::Boolean, + "binary" | "varbinary" => DataType::Bytea, + "tinyint" | "smallint" => DataType::Int16, + "integer" | "int" => DataType::Int32, + "bigint" => DataType::Int64, + "real" => DataType::Float32, + "float" => DataType::Float64, + "decimal" | "numeric" => DataType::Decimal, + "date" => DataType::Date, + "time" => DataType::Time, + "datetime" | "datetime2" | "smalldatetime" => DataType::Timestamp, + "datetimeoffset" => DataType::Timestamptz, + "char" | "nchar" | "varchar" | "nvarchar" | "text" | "ntext" | "xml" + | "uniqueidentifier" => DataType::Varchar, mssql_type => { - // NOTES: user-defined enum type is classified as `Unknown` - tracing::warn!( - "Unknown Sql Server data type: {:?}, map to varchar", - mssql_type - ); - DataType::Varchar + return Err(anyhow!( + "Unsupported Sql Server data type: {:?}, column name: {}", + mssql_type, + col_name + ) + .into()); } }; Ok(dtype)