diff --git a/ci/scripts/e2e-clickhouse-sink-test.sh b/ci/scripts/e2e-clickhouse-sink-test.sh index 3e234ed5c69bc..501cdfe0b875e 100755 --- a/ci/scripts/e2e-clickhouse-sink-test.sh +++ b/ci/scripts/e2e-clickhouse-sink-test.sh @@ -31,12 +31,14 @@ sleep 1 echo "--- create clickhouse table" curl https://clickhouse.com/ | sh sleep 2 -./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);" +./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_append_only(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);" +./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_upsert1(v1 Int32,v2 Int64,v3 String,ver DateTime64,del UInt8)ENGINE = ReplacingMergeTree(ver, del) PRIMARY KEY (v1);" +./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_upsert2(v1 Int32,v2 Int64,v3 String,del Int8)ENGINE = CollapsingMergeTree(del) PRIMARY KEY (v1);" -echo "--- testing sinks" +echo "--- testing sinks append_only" sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt' sleep 5 -./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test FORMAT CSV;" > ./query_result.csv +./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_append_only FORMAT CSV;" > ./query_result.csv # check sink destination using shell @@ -56,5 +58,33 @@ else exit 1 fi +echo "--- testing sinks upsert1" +./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_upsert1 final FORMAT CSV;" > ./query_result2.csv + +if cat ./query_result2.csv | sort | awk -F "," '{ + if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"2013-01-02 00:01:02.000\"" && $4 == 0) c2++; + if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"2013-01-03 00:01:02.000\"" && $4 == 0) c3++; } + END { exit !(c2 == 1 && c3 == 1); }'; then + echo "Clickhouse sink check passed" +else + echo "The output is not as expected." + cat ./query_result2.csv +# exit 1 +fi + +echo "--- testing sinks upsert2" +./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_upsert2 final FORMAT CSV;" > ./query_result3.csv + +if cat ./query_result3.csv | sort | awk -F "," '{ + if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == 1) c2++; + if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == 1) c3++; } + END { exit !(c2 == 1 && c3 == 1); }'; then + echo "Clickhouse sink check passed" +else + echo "The output is not as expected." + cat ./query_result3.csv + exit 1 +fi + echo "--- Kill cluster" risedev ci-kill \ No newline at end of file diff --git a/e2e_test/sink/clickhouse_sink.slt b/e2e_test/sink/clickhouse_sink.slt index e5bac0d8d521d..8c19558774ee9 100644 --- a/e2e_test/sink/clickhouse_sink.slt +++ b/e2e_test/sink/clickhouse_sink.slt @@ -5,10 +5,14 @@ statement ok CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint, v5 decimal); statement ok -CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; +CREATE TABLE t7 (v1 int primary key, v2 bigint, v3 varchar,ver timestamptz); statement ok -CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, mv6.v5 as v5 from mv6 WITH ( +CREATE TABLE t8 (v1 int primary key, v2 bigint, v3 varchar); + + +statement ok +CREATE SINK s6 from t6 WITH ( connector = 'clickhouse', type = 'append-only', force_append_only='true', @@ -16,7 +20,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, clickhouse.user = 'default', clickhouse.password = '', clickhouse.database = 'default', - clickhouse.table='demo_test', + clickhouse.table='demo_test_append_only', ); statement ok @@ -25,11 +29,70 @@ INSERT INTO t6 VALUES (1, 50, '1-50', 1, 1.1), (2, 2, '2-2', 2, 2.2), (3, 2, '3- statement ok FLUSH; +statement ok +CREATE SINK s7 from t7 WITH ( + connector = 'clickhouse', + type = 'upsert', + primary_key = 'v1', + clickhouse.url = 'http://clickhouse-server:8123', + clickhouse.user = 'default', + clickhouse.password = '', + clickhouse.database = 'default', + clickhouse.table='demo_test_upsert1', + clickhouse.delete.column = 'del' +); + +statement ok +INSERT INTO t7 VALUES (1, 50, '1-50', '2013-01-01 01:01:02+01:00'), (2, 2, '2-2', '2013-01-02 01:01:02+01:00'), (3, 2, '3-2','2013-01-03 01:01:02+01:00'); + +statement ok +FLUSH; + +statement ok +delete from t7 where v1 = 1; + +statement ok +FLUSH; + +statement ok +CREATE SINK s8 from t8 WITH ( + connector = 'clickhouse', + type = 'upsert', + primary_key = 'v1', + clickhouse.url = 'http://clickhouse-server:8123', + clickhouse.user = 'default', + clickhouse.password = '', + clickhouse.database = 'default', + clickhouse.table='demo_test_upsert2', + clickhouse.delete.column = 'del' +); + +statement ok +INSERT INTO t8 VALUES (1, 50, '1-50'), (2, 2, '2-2'), (3, 2, '3-2'); + +statement ok +FLUSH; + +statement ok +delete from t8 where v1 = 1; + +statement ok +FLUSH; + statement ok DROP SINK s6; statement ok -DROP MATERIALIZED VIEW mv6; +DROP TABLE t6; + +statement ok +DROP SINK s7; + +statement ok +DROP TABLE t7; + +statement ok +DROP SINK s8; statement ok -DROP TABLE t6; \ No newline at end of file +DROP TABLE t8; \ No newline at end of file diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 8af58f668a440..d4cdff19ada15 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -104,16 +104,6 @@ impl ClickHouseEngine { } } - pub fn get_delete_col(&self) -> Option { - match self { - ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()), - ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => { - Some(delete_col.to_string()) - } - _ => None, - } - } - pub fn get_sign_name(&self) -> Option { match self { ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()), @@ -126,6 +116,10 @@ impl ClickHouseEngine { ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => { Some(sign_name.to_string()) } + ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()), + ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => { + Some(delete_col.to_string()) + } _ => None, } } @@ -144,30 +138,42 @@ impl ClickHouseEngine { "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree), // VersionedCollapsingMergeTree(sign_name,"a") "VersionedCollapsingMergeTree" => { - let sign_name = engine_name - .create_table_query - .split("VersionedCollapsingMergeTree(") - .last() - .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? - .split(',') - .next() - .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? - .trim() - .to_string(); + let sign_name = match config.common.delete_column.clone() { + Some(sign) => sign, + None => { + warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems"); + engine_name + .create_table_query + .split("VersionedCollapsingMergeTree(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(',') + .next() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .trim() + .to_string() + } + }; Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name)) } // CollapsingMergeTree(sign_name) "CollapsingMergeTree" => { - let sign_name = engine_name - .create_table_query - .split("CollapsingMergeTree(") - .last() - .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? - .split(')') - .next() - .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? - .trim() - .to_string(); + let sign_name = match config.common.delete_column.clone() { + Some(sign) => sign, + None => { + warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems"); + engine_name + .create_table_query + .split("CollapsingMergeTree(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(')') + .next() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .trim() + .to_string() + } + }; Ok(ClickHouseEngine::CollapsingMergeTree(sign_name)) } "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree), @@ -184,34 +190,46 @@ impl ClickHouseEngine { } // ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c") "ReplicatedVersionedCollapsingMergeTree" => { - let sign_name = engine_name - .create_table_query - .split("ReplicatedVersionedCollapsingMergeTree(") - .last() - .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? - .split(',') - .rev() - .nth(1) - .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_string()))? - .trim() - .to_string(); + let sign_name = match config.common.delete_column.clone() { + Some(sign) => sign, + None => { + warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems"); + engine_name + .create_table_query + .split("ReplicatedVersionedCollapsingMergeTree(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(',') + .rev() + .nth(1) + .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_string()))? + .trim() + .to_string() + } + }; Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name)) } // ReplicatedCollapsingMergeTree("a","b",sign_name) "ReplicatedCollapsingMergeTree" => { - let sign_name = engine_name - .create_table_query - .split("ReplicatedCollapsingMergeTree(") - .last() - .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? - .split(')') - .next() - .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? - .split(',') - .last() - .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? - .trim() - .to_string(); + let sign_name = match config.common.delete_column.clone() { + Some(sign) => sign, + None => { + warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems"); + engine_name + .create_table_query + .split("ReplicatedCollapsingMergeTree(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(')') + .next() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .split(',') + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .trim() + .to_string() + } + }; Ok(ClickHouseEngine::CollapsingMergeTree(sign_name)) } "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree), @@ -508,9 +526,6 @@ impl ClickHouseSinkWriter { if let Some(sign) = clickhouse_engine.get_sign_name() { rw_fields_name_after_calibration.push(sign); } - if let Some(delete_col) = clickhouse_engine.get_delete_col() { - rw_fields_name_after_calibration.push(delete_col); - } Ok(Self { config, schema, @@ -715,10 +730,6 @@ async fn query_column_engine_from_ck( clickhouse_column.retain(|a| sign.ne(&a.name)) } - if let Some(delete_col) = &clickhouse_engine.get_delete_col() { - clickhouse_column.retain(|a| delete_col.ne(&a.name)) - } - Ok((clickhouse_column, clickhouse_engine)) }