Skip to content

Commit

Permalink
feat(sink) Support Deletes for ClickHouse ReplacingMergeTree
Browse files Browse the repository at this point in the history
- ClickHouse now supports [1] a `is_deleted` column (optional) when creating
  a `ReplacingMergeTree`.

- Adds new `clickhouse.delete.column` config option, which when set will
  enable `upsert` to a `ReplacingMergeTree`

- When the value is deleted the value of this column is set to `1`
  instead of the default `0`.
  • Loading branch information
rickysaltzer committed Jun 17, 2024
1 parent ac7abd9 commit c79c24f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 13 deletions.
84 changes: 71 additions & 13 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,22 @@ pub struct ClickHouseCommon {
pub database: String,
#[serde(rename = "clickhouse.table")]
pub table: String,
#[serde(rename = "clickhouse.delete.column")]
pub delete_column: Option<String>,
}

#[allow(clippy::enum_variant_names)]
#[derive(Debug)]
enum ClickHouseEngine {
MergeTree,
ReplacingMergeTree,
ReplacingMergeTree(Option<String>),
SummingMergeTree,
AggregatingMergeTree,
CollapsingMergeTree(String),
VersionedCollapsingMergeTree(String),
GraphiteMergeTree,
ReplicatedMergeTree,
ReplicatedReplacingMergeTree,
ReplicatedReplacingMergeTree(Option<String>),
ReplicatedSummingMergeTree,
ReplicatedAggregatingMergeTree,
#[expect(dead_code)]
Expand All @@ -94,6 +96,22 @@ impl ClickHouseEngine {
)
}

pub fn is_delete_replacing_engine(&self) -> bool {
match self {
ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
_ => false,
}
}

pub fn get_delete_col(&self) -> Option<String> {
match self {
ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.clone(),
ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.clone(),
_ => None,
}
}

pub fn get_sign_name(&self) -> Option<String> {
match self {
ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
Expand All @@ -110,10 +128,16 @@ impl ClickHouseEngine {
}
}

pub fn from_query_engine(engine_name: &ClickhouseQueryEngine) -> Result<Self> {
pub fn from_query_engine(
engine_name: &ClickhouseQueryEngine,
config: &ClickHouseConfig,
) -> Result<Self> {
match engine_name.engine.as_str() {
"MergeTree" => Ok(ClickHouseEngine::MergeTree),
"ReplacingMergeTree" => Ok(ClickHouseEngine::ReplacingMergeTree),
"ReplacingMergeTree" => {
let delete_column = config.common.delete_column.clone();
Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
}
"SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
"AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
// VersionedCollapsingMergeTree(sign_name,"a")
Expand Down Expand Up @@ -146,7 +170,12 @@ impl ClickHouseEngine {
}
"GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
"ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
"ReplicatedReplacingMergeTree" => Ok(ClickHouseEngine::ReplicatedReplacingMergeTree),
"ReplicatedReplacingMergeTree" => {
let delete_column = config.common.delete_column.clone();
Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
delete_column,
))
}
"ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
"ReplicatedAggregatingMergeTree" => {
Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
Expand Down Expand Up @@ -399,9 +428,16 @@ impl Sink for ClickHouseSink {
let (clickhouse_column, clickhouse_engine) =
query_column_engine_from_ck(client, &self.config).await?;

if !self.is_append_only && !clickhouse_engine.is_collapsing_engine() {
return Err(SinkError::ClickHouse(
"If you want to use upsert, please modify your engine is `VersionedCollapsingMergeTree` or `CollapsingMergeTree` in ClickHouse".to_owned()));
if !self.is_append_only
&& !clickhouse_engine.is_collapsing_engine()
&& !clickhouse_engine.is_delete_replacing_engine()
{
return match clickhouse_engine {
ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) => {
Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
}
_ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree` or `CollapsingMergeTree` in ClickHouse".to_owned()))
};
}

self.check_column_name_and_type(&clickhouse_column)?;
Expand Down Expand Up @@ -470,6 +506,9 @@ impl ClickHouseSinkWriter {
if let Some(sign) = clickhouse_engine.get_sign_name() {
rw_fields_name_after_calibration.push(sign);
}
if let Some(delete_col) = clickhouse_engine.get_delete_col() {
rw_fields_name_after_calibration.push(delete_col);
}
Ok(Self {
config,
schema,
Expand Down Expand Up @@ -567,16 +606,30 @@ impl ClickHouseSinkWriter {
ClickHouseField::Int8(1),
));
}
if self.clickhouse_engine.get_delete_col().is_some() {
clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
ClickHouseField::Int8(0),
))
}
}
Op::Delete | Op::UpdateDelete => {
if !self.clickhouse_engine.is_collapsing_engine() {
if !self.clickhouse_engine.is_collapsing_engine()
&& !self.clickhouse_engine.is_delete_replacing_engine()
{
return Err(SinkError::ClickHouse(
"Clickhouse engine don't support upsert".to_string(),
));
}
clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
ClickHouseField::Int8(-1),
))
if self.clickhouse_engine.get_sign_name().is_some() {
clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
ClickHouseField::Int8(-1),
));
}
if self.clickhouse_engine.get_delete_col().is_some() {
clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
ClickHouseField::Int8(1),
))
}
}
}
let clickhouse_column = ClickHouseColumn {
Expand Down Expand Up @@ -654,11 +707,16 @@ async fn query_column_engine_from_ck(
}

let clickhouse_engine =
ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap())?;
ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;

if let Some(sign) = &clickhouse_engine.get_sign_name() {
clickhouse_column.retain(|a| sign.ne(&a.name))
}

if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
clickhouse_column.retain(|a| delete_col.ne(&a.name))
}

Ok((clickhouse_column, clickhouse_engine))
}

Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ ClickHouseConfig:
- name: clickhouse.table
field_type: String
required: true
- name: clickhouse.delete.column
field_type: String
required: false
- name: r#type
field_type: String
required: true
Expand Down

0 comments on commit c79c24f

Please sign in to comment.