diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index c506f00e6d2ca..44581a8e7387f 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -61,20 +61,22 @@ pub struct ClickHouseCommon { pub database: String, #[serde(rename = "clickhouse.table")] pub table: String, + #[serde(rename = "clickhouse.delete.column")] + pub delete_column: Option, } #[allow(clippy::enum_variant_names)] #[derive(Debug)] enum ClickHouseEngine { MergeTree, - ReplacingMergeTree, + ReplacingMergeTree(Option), SummingMergeTree, AggregatingMergeTree, CollapsingMergeTree(String), VersionedCollapsingMergeTree(String), GraphiteMergeTree, ReplicatedMergeTree, - ReplicatedReplacingMergeTree, + ReplicatedReplacingMergeTree(Option), ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, #[expect(dead_code)] @@ -94,6 +96,22 @@ impl ClickHouseEngine { ) } + pub fn is_delete_replacing_engine(&self) -> bool { + match self { + ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(), + ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(), + _ => false, + } + } + + pub fn get_delete_col(&self) -> Option { + match self { + ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.clone(), + ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.clone(), + _ => None, + } + } + pub fn get_sign_name(&self) -> Option { match self { ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()), @@ -110,10 +128,16 @@ impl ClickHouseEngine { } } - pub fn from_query_engine(engine_name: &ClickhouseQueryEngine) -> Result { + pub fn from_query_engine( + engine_name: &ClickhouseQueryEngine, + config: &ClickHouseConfig, + ) -> Result { match engine_name.engine.as_str() { "MergeTree" => Ok(ClickHouseEngine::MergeTree), - "ReplacingMergeTree" => Ok(ClickHouseEngine::ReplacingMergeTree), + "ReplacingMergeTree" => { + let delete_column = config.common.delete_column.clone(); + Ok(ClickHouseEngine::ReplacingMergeTree(delete_column)) + } "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree), "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree), // VersionedCollapsingMergeTree(sign_name,"a") @@ -146,7 +170,12 @@ impl ClickHouseEngine { } "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree), "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree), - "ReplicatedReplacingMergeTree" => Ok(ClickHouseEngine::ReplicatedReplacingMergeTree), + "ReplicatedReplacingMergeTree" => { + let delete_column = config.common.delete_column.clone(); + Ok(ClickHouseEngine::ReplicatedReplacingMergeTree( + delete_column, + )) + } "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree), "ReplicatedAggregatingMergeTree" => { Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree) @@ -399,9 +428,16 @@ impl Sink for ClickHouseSink { let (clickhouse_column, clickhouse_engine) = query_column_engine_from_ck(client, &self.config).await?; - if !self.is_append_only && !clickhouse_engine.is_collapsing_engine() { - return Err(SinkError::ClickHouse( - "If you want to use upsert, please modify your engine is `VersionedCollapsingMergeTree` or `CollapsingMergeTree` in ClickHouse".to_owned())); + if !self.is_append_only + && !clickhouse_engine.is_collapsing_engine() + && !clickhouse_engine.is_delete_replacing_engine() + { + return match clickhouse_engine { + ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) => { + Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned())) + } + _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree` or `CollapsingMergeTree` in ClickHouse".to_owned())) + }; } self.check_column_name_and_type(&clickhouse_column)?; @@ -470,6 +506,9 @@ impl ClickHouseSinkWriter { if let Some(sign) = clickhouse_engine.get_sign_name() { rw_fields_name_after_calibration.push(sign); } + if let Some(delete_col) = clickhouse_engine.get_delete_col() { + rw_fields_name_after_calibration.push(delete_col); + } Ok(Self { config, schema, @@ -567,16 +606,30 @@ impl ClickHouseSinkWriter { ClickHouseField::Int8(1), )); } + if self.clickhouse_engine.get_delete_col().is_some() { + clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome( + ClickHouseField::Int8(0), + )) + } } Op::Delete | Op::UpdateDelete => { - if !self.clickhouse_engine.is_collapsing_engine() { + if !self.clickhouse_engine.is_collapsing_engine() + && !self.clickhouse_engine.is_delete_replacing_engine() + { return Err(SinkError::ClickHouse( "Clickhouse engine don't support upsert".to_string(), )); } - clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome( - ClickHouseField::Int8(-1), - )) + if self.clickhouse_engine.get_sign_name().is_some() { + clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome( + ClickHouseField::Int8(-1), + )); + } + if self.clickhouse_engine.get_delete_col().is_some() { + clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome( + ClickHouseField::Int8(1), + )) + } } } let clickhouse_column = ClickHouseColumn { @@ -654,11 +707,16 @@ async fn query_column_engine_from_ck( } let clickhouse_engine = - ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap())?; + ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?; if let Some(sign) = &clickhouse_engine.get_sign_name() { clickhouse_column.retain(|a| sign.ne(&a.name)) } + + if let Some(delete_col) = &clickhouse_engine.get_delete_col() { + clickhouse_column.retain(|a| delete_col.ne(&a.name)) + } + Ok((clickhouse_column, clickhouse_engine)) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 731bb900335ee..cd560699b812f 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -84,6 +84,9 @@ ClickHouseConfig: - name: clickhouse.table field_type: String required: true + - name: clickhouse.delete.column + field_type: String + required: false - name: r#type field_type: String required: true