Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add clickhouse delete column for CollapsingMergeTree #17385

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions ci/scripts/e2e-clickhouse-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ sleep 1
echo "--- create clickhouse table"
curl https://clickhouse.com/ | sh
sleep 2
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_append_only(v1 Int32,v2 Int64,v3 String,v4 Enum16('A'=1,'B'=2), v5 decimal64(3))ENGINE = ReplacingMergeTree PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_upsert1(v1 Int32,v2 Int64,v3 String,ver DateTime64,del UInt8)ENGINE = ReplacingMergeTree(ver, del) PRIMARY KEY (v1);"
./clickhouse client --host=clickhouse-server --port=9000 --query="CREATE table demo_test_upsert2(v1 Int32,v2 Int64,v3 String,del Int8)ENGINE = CollapsingMergeTree(del) PRIMARY KEY (v1);"

echo "--- testing sinks"
echo "--- testing sinks append_only"
sqllogictest -p 4566 -d dev './e2e_test/sink/clickhouse_sink.slt'
sleep 5
./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test FORMAT CSV;" > ./query_result.csv
./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_append_only FORMAT CSV;" > ./query_result.csv


# check sink destination using shell
Expand All @@ -56,5 +58,33 @@ else
exit 1
fi

echo "--- testing sinks upsert1"
./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_upsert1 final FORMAT CSV;" > ./query_result2.csv

if cat ./query_result2.csv | sort | awk -F "," '{
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == "\"2013-01-02 00:01:02.000\"" && $4 == 0) c2++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == "\"2013-01-03 00:01:02.000\"" && $4 == 0) c3++; }
END { exit !(c2 == 1 && c3 == 1); }'; then
echo "Clickhouse sink check passed"
else
echo "The output is not as expected."
cat ./query_result2.csv
# exit 1
fi

echo "--- testing sinks upsert2"
./clickhouse client --host=clickhouse-server --port=9000 --query="select * from demo_test_upsert2 final FORMAT CSV;" > ./query_result3.csv

if cat ./query_result3.csv | sort | awk -F "," '{
if ($1 == 2 && $2 == 2 && $3 == "\"2-2\"" && $4 == 1) c2++;
if ($1 == 3 && $2 == 2 && $3 == "\"3-2\"" && $4 == 1) c3++; }
END { exit !(c2 == 1 && c3 == 1); }'; then
echo "Clickhouse sink check passed"
else
echo "The output is not as expected."
cat ./query_result3.csv
exit 1
fi

echo "--- Kill cluster"
risedev ci-kill
73 changes: 68 additions & 5 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ statement ok
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint, v5 decimal);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
CREATE TABLE t7 (v1 int primary key, v2 bigint, v3 varchar,ver timestamptz);

statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, mv6.v5 as v5 from mv6 WITH (
CREATE TABLE t8 (v1 int primary key, v2 bigint, v3 varchar);


statement ok
CREATE SINK s6 from t6 WITH (
connector = 'clickhouse',
type = 'append-only',
force_append_only='true',
clickhouse.url = 'http://clickhouse-server:8123',
clickhouse.user = 'default',
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test',
clickhouse.table='demo_test_append_only',
);

statement ok
Expand All @@ -25,11 +29,70 @@ INSERT INTO t6 VALUES (1, 50, '1-50', 1, 1.1), (2, 2, '2-2', 2, 2.2), (3, 2, '3-
statement ok
FLUSH;

statement ok
CREATE SINK s7 from t7 WITH (
connector = 'clickhouse',
type = 'upsert',
primary_key = 'v1',
clickhouse.url = 'http://clickhouse-server:8123',
clickhouse.user = 'default',
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test_upsert1',
clickhouse.delete.column = 'del'
);

statement ok
INSERT INTO t7 VALUES (1, 50, '1-50', '2013-01-01 01:01:02+01:00'), (2, 2, '2-2', '2013-01-02 01:01:02+01:00'), (3, 2, '3-2','2013-01-03 01:01:02+01:00');

statement ok
FLUSH;

statement ok
delete from t7 where v1 = 1;

statement ok
FLUSH;

statement ok
CREATE SINK s8 from t8 WITH (
connector = 'clickhouse',
type = 'upsert',
primary_key = 'v1',
clickhouse.url = 'http://clickhouse-server:8123',
clickhouse.user = 'default',
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test_upsert2',
clickhouse.delete.column = 'del'
);

statement ok
INSERT INTO t8 VALUES (1, 50, '1-50'), (2, 2, '2-2'), (3, 2, '3-2');

statement ok
FLUSH;

statement ok
delete from t8 where v1 = 1;

statement ok
FLUSH;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;
DROP TABLE t6;

statement ok
DROP SINK s7;

statement ok
DROP TABLE t7;

statement ok
DROP SINK s8;

statement ok
DROP TABLE t6;
DROP TABLE t8;
133 changes: 72 additions & 61 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ impl ClickHouseEngine {
}
}

pub fn get_delete_col(&self) -> Option<String> {
match self {
ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
Some(delete_col.to_string())
}
_ => None,
}
}

pub fn get_sign_name(&self) -> Option<String> {
match self {
ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
Expand All @@ -126,6 +116,10 @@ impl ClickHouseEngine {
ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
Some(sign_name.to_string())
}
ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
Some(delete_col.to_string())
}
_ => None,
}
}
Expand All @@ -144,30 +138,42 @@ impl ClickHouseEngine {
"AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
// VersionedCollapsingMergeTree(sign_name,"a")
"VersionedCollapsingMergeTree" => {
let sign_name = engine_name
.create_table_query
.split("VersionedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(',')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.trim()
.to_string();
let sign_name = match config.common.delete_column.clone() {
Some(sign) => sign,
None => {
warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems");
engine_name
.create_table_query
.split("VersionedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(',')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.trim()
.to_string()
}
};
Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
}
// CollapsingMergeTree(sign_name)
"CollapsingMergeTree" => {
let sign_name = engine_name
.create_table_query
.split("CollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.trim()
.to_string();
let sign_name = match config.common.delete_column.clone() {
Some(sign) => sign,
None => {
warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems");
engine_name
.create_table_query
.split("CollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.trim()
.to_string()
}
};
Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
}
"GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
Expand All @@ -184,34 +190,46 @@ impl ClickHouseEngine {
}
// ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c")
"ReplicatedVersionedCollapsingMergeTree" => {
let sign_name = engine_name
.create_table_query
.split("ReplicatedVersionedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(',')
.rev()
.nth(1)
.ok_or_else(|| SinkError::ClickHouse("must have index 1".to_string()))?
.trim()
.to_string();
let sign_name = match config.common.delete_column.clone() {
Some(sign) => sign,
None => {
warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems");
engine_name
.create_table_query
.split("ReplicatedVersionedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(',')
.rev()
.nth(1)
.ok_or_else(|| SinkError::ClickHouse("must have index 1".to_string()))?
.trim()
.to_string()
}
};
Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
}
// ReplicatedCollapsingMergeTree("a","b",sign_name)
"ReplicatedCollapsingMergeTree" => {
let sign_name = engine_name
.create_table_query
.split("ReplicatedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.split(',')
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.trim()
.to_string();
let sign_name = match config.common.delete_column.clone() {
Some(sign) => sign,
None => {
warn!("If you use `upsert`, it is recommended to fill in `delete_column`, otherwise we will use string matching to get it, which may have some problems");
engine_name
.create_table_query
.split("ReplicatedCollapsingMergeTree(")
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.split(',')
.last()
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.trim()
.to_string()
}
};
Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
}
"ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
Expand Down Expand Up @@ -508,9 +526,6 @@ impl ClickHouseSinkWriter {
if let Some(sign) = clickhouse_engine.get_sign_name() {
rw_fields_name_after_calibration.push(sign);
}
if let Some(delete_col) = clickhouse_engine.get_delete_col() {
rw_fields_name_after_calibration.push(delete_col);
}
Ok(Self {
config,
schema,
Expand Down Expand Up @@ -715,10 +730,6 @@ async fn query_column_engine_from_ck(
clickhouse_column.retain(|a| sign.ne(&a.name))
}

if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
clickhouse_column.retain(|a| delete_col.ne(&a.name))
}

Ok((clickhouse_column, clickhouse_engine))
}

Expand Down
Loading