diff --git a/ci/scripts/e2e-clickhouse-sink-test.sh b/ci/scripts/e2e-clickhouse-sink-test.sh index 3464bd3c3c14d..a99951e62fab9 100755 --- a/ci/scripts/e2e-clickhouse-sink-test.sh +++ b/ci/scripts/e2e-clickhouse-sink-test.sh @@ -31,7 +31,7 @@ sleep 1 echo "--- create clickhouse table" curl https://clickhouse.com/ | sh sleep 2 -./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String)ENGINE = ReplacingMergeTree PRIMARY KEY (v1);" +./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);" echo "--- testing sinks" sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt' @@ -41,13 +41,13 @@ sleep 5 # check sink destination using shell if cat ./query_result.csv | sort | awk -F "," '{ -if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"") c1++; - if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"") c2++; - if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"") c3++; - if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"") c4++; - if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"") c5++; - if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"") c6++; - if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"") c7++; } +if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"") c1++; + if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"") c2++; + if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"") c3++; + if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"") c4++; + if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"") c5++; + if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"") c6++; + if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"") c7++; } END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then echo "Clickhouse sink check passed" else diff --git a/e2e_test/sink/clickhouse_sink.slt b/e2e_test/sink/clickhouse_sink.slt index 909bdbfd6356b..9791f484326d7 100644 --- a/e2e_test/sink/clickhouse_sink.slt +++ b/e2e_test/sink/clickhouse_sink.slt @@ -1,11 +1,11 @@ statement ok -CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar); +CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint); statement ok CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok -CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( +CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 from mv6 WITH ( connector = 'clickhouse', type = 'append-only', force_append_only='true', @@ -17,7 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ); statement ok -INSERT INTO t6 VALUES (1, 50, '1-50'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2'); +INSERT INTO t6 VALUES (1, 50, '1-50', 1), (2, 2, '2-2', 2), (3, 2, '3-2', 1), (5, 2, '5-2', 2), (8, 2, '8-2', 1), (13, 2, '13-2', 2), (21, 2, '21-2', 1); statement ok FLUSH; diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 2a9a2e5a39eb6..cd692c3c6cd58 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -261,9 +261,11 @@ impl ClickHouseSink { ) -> Result<()> { let is_match = match fields_type { risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")), - risingwave_common::types::DataType::Int16 => { - Ok(ck_column.r#type.contains("UInt16") | ck_column.r#type.contains("Int16")) - } + risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16") + | ck_column.r#type.contains("Int16") + // Allow Int16 to be pushed to Enum16, they share an encoding and value range + // No special care is taken to ensure values are valid. + | ck_column.r#type.contains("Enum16")), risingwave_common::types::DataType::Int32 => { Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32")) }