Skip to content

Commit

Permalink
feat(sink): Support Sinking Enum16 Ids to Clickhouse (#14668)
Browse files Browse the repository at this point in the history
Co-authored-by: Xinhao Xu <[email protected]>
  • Loading branch information
HurricanKai and xxhZs authored Feb 20, 2024
1 parent 4197ad5 commit e3972cb
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
16 changes: 8 additions & 8 deletions ci/scripts/e2e-clickhouse-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sleep 1
echo "--- create clickhouse table"
curl https://clickhouse.com/ | sh
sleep 2
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String)ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt'
Expand All @@ -41,13 +41,13 @@ sleep 5

# check sink destination using shell
if cat ./query_result.csv | sort | awk -F "," '{
if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"") c1++;
if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"") c2++;
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"") c3++;
if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"") c4++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"") c5++;
if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"") c6++;
if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"") c7++; }
if ($1 == 1 && $2 == 50 && $3 == "\"1-50\"" && $4 == "\"A\"") c1++;
if ($1 == 13 && $2 == 2 && $3 == "\"13-2\"" && $4 == "\"B\"") c2++;
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"B\"") c3++;
if ($1 == 21 && $2 == 2 && $3 == "\"21-2\"" && $4 == "\"A\"") c4++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"A\"") c5++;
if ($1 == 5 && $2 == 2 && $3 == "\"5-2\"" && $4 == "\"B\"") c6++;
if ($1 == 8 && $2 == 2 && $3 == "\"8-2\"" && $4 == "\"A\"") c7++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1 && c6 == 1 && c7 == 1); }'; then
echo "Clickhouse sink check passed"
else
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar);
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 from mv6 WITH (
connector = 'clickhouse',
type = 'append-only',
force_append_only='true',
Expand All @@ -17,7 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
);

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2');
INSERT INTO t6 VALUES (1, 50, '1-50', 1), (2, 2, '2-2', 2), (3, 2, '3-2', 1), (5, 2, '5-2', 2), (8, 2, '8-2', 1), (13, 2, '13-2', 2), (21, 2, '21-2', 1);

statement ok
FLUSH;
Expand Down
8 changes: 5 additions & 3 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,11 @@ impl ClickHouseSink {
) -> Result<()> {
let is_match = match fields_type {
risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
risingwave_common::types::DataType::Int16 => {
Ok(ck_column.r#type.contains("UInt16") | ck_column.r#type.contains("Int16"))
}
risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
| ck_column.r#type.contains("Int16")
// Allow Int16 to be pushed to Enum16, they share an encoding and value range
// No special care is taken to ensure values are valid.
| ck_column.r#type.contains("Enum16")),
risingwave_common::types::DataType::Int32 => {
Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
}
Expand Down

0 comments on commit e3972cb

Please sign in to comment.