diff --git a/integration_tests/clickhouse-sink/README.md b/integration_tests/clickhouse-sink/README.md index 607621faefeae..a383f3fba5ee4 100644 --- a/integration_tests/clickhouse-sink/README.md +++ b/integration_tests/clickhouse-sink/README.md @@ -23,6 +23,8 @@ docker compose exec clickhouse-server bash /opt/clickhouse/clickhouse-sql/run-sq - create_mv.sql - create_sink.sql +We only support `upsert` with clickhouse' `CollapsingMergeTree` and `VersionedCollapsingMergeTree` + 4. Execute a simple query: ```sh diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index f4fdf9b761f38..fb06baf42920c 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -11,18 +11,17 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use core::fmt::Debug; use std::collections::{HashMap, HashSet}; use std::time::Duration; use anyhow::anyhow; -use clickhouse::{Client, Client as ClickHouseClient, Row as ClickHouseRow}; +use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow}; use itertools::Itertools; -use risingwave_common::array::{Op, RowRef, StreamChunk}; +use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; -use risingwave_common::types::{DataType, ScalarRefImpl, Serial}; +use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial}; use serde::ser::{SerializeSeq, SerializeStruct}; use serde::Serialize; use serde_derive::Deserialize; @@ -38,6 +37,10 @@ use crate::sink::{ Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; +const QUERY_ENGINE: &str = + "select distinct ?fields from system.tables where database = ? and table = ?"; +const QUERY_COLUMN: &str = + "select distinct ?fields from system.columns where database = ? and table = ? order by ?"; pub const CLICKHOUSE_SINK: &str = "clickhouse"; const BUFFER_SIZE: usize = 1024; @@ -55,6 +58,75 @@ pub struct ClickHouseCommon { pub table: String, } +#[allow(clippy::enum_variant_names)] +#[derive(Debug)] +enum ClickHouseEngine { + MergeTree, + ReplacingMergeTree, + SummingMergeTree, + AggregatingMergeTree, + CollapsingMergeTree(String), + VersionedCollapsingMergeTree(String), + GraphiteMergeTree, +} +impl ClickHouseEngine { + pub fn is_collapsing_engine(&self) -> bool { + matches!( + self, + ClickHouseEngine::CollapsingMergeTree(_) + | ClickHouseEngine::VersionedCollapsingMergeTree(_) + ) + } + + pub fn get_sign_name(&self) -> Option { + match self { + ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()), + ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => { + Some(sign_name.to_string()) + } + _ => None, + } + } + + pub fn from_query_engine(engine_name: &ClickhouseQueryEngine) -> Result { + match engine_name.engine.as_str() { + "MergeTree" => Ok(ClickHouseEngine::MergeTree), + "ReplacingMergeTree" => Ok(ClickHouseEngine::ReplacingMergeTree), + "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree), + "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree), + "VersionedCollapsingMergeTree" => { + let sign_name = engine_name + .create_table_query + .split("VersionedCollapsingMergeTree(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(',') + .next() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .to_string(); + Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name)) + } + "CollapsingMergeTree" => { + let sign_name = engine_name + .create_table_query + .split("CollapsingMergeTree(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(')') + .next() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .to_string(); + Ok(ClickHouseEngine::CollapsingMergeTree(sign_name)) + } + "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree), + _ => Err(SinkError::ClickHouse(format!( + "Cannot find clickhouse engine {:?}", + engine_name.engine + ))), + } + } +} + const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5); impl ClickHouseCommon { @@ -198,9 +270,7 @@ impl ClickHouseSink { } risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")), risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")), - risingwave_common::types::DataType::Decimal => { - Err(SinkError::ClickHouse("can not support Decimal".to_string())) - } + risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")), risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")), risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")), risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse( @@ -232,7 +302,7 @@ impl ClickHouseSink { Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64")) } risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse( - "clickhouse can not support Interval".to_string(), + "clickhouse can not support Int256".to_string(), )), }; if !is_match? { @@ -264,20 +334,15 @@ impl Sink for ClickHouseSink { // check reachability let client = self.config.common.build_client()?; - let query_column = "select distinct ?fields from system.columns where database = ? and table = ? order by ?".to_string(); - let clickhouse_column = client - .query(&query_column) - .bind(self.config.common.database.clone()) - .bind(self.config.common.table.clone()) - .bind("position") - .fetch_all::() - .await?; - if clickhouse_column.is_empty() { - return Err(SinkError::ClickHouse(format!( - "table {:?}.{:?} is not find in clickhouse", - self.config.common.database, self.config.common.table - ))); + + let (clickhouse_column, clickhouse_engine) = + query_column_engine_from_ck(client, &self.config).await?; + + if !self.is_append_only && !clickhouse_engine.is_collapsing_engine() { + return Err(SinkError::ClickHouse( + "If you want to use upsert, please modify your engine is `VersionedCollapsingMergeTree` or `CollapsingMergeTree` in ClickHouse".to_owned())); } + self.check_column_name_and_type(&clickhouse_column)?; if !self.is_append_only { self.check_pk_match(&clickhouse_column)?; @@ -300,17 +365,20 @@ pub struct ClickHouseSinkWriter { pub config: ClickHouseConfig, schema: Schema, pk_indices: Vec, - client: Client, + client: ClickHouseClient, is_append_only: bool, // Save some features of the clickhouse column type column_correct_vec: Vec, - clickhouse_fields_name: Vec, + rw_fields_name_after_calibration: Vec, + clickhouse_engine: ClickHouseEngine, } #[derive(Debug)] struct ClickHouseSchemaFeature { can_null: bool, // Time accuracy in clickhouse for rw and ck conversions accuracy_time: u8, + + accuracy_decimal: (u8, u8), } impl ClickHouseSinkWriter { @@ -320,25 +388,23 @@ impl ClickHouseSinkWriter { pk_indices: Vec, is_append_only: bool, ) -> Result { - if !is_append_only { - tracing::warn!("Update and delete are not recommended because of their impact on clickhouse performance."); - } let client = config.common.build_client()?; - let query_column = "select distinct ?fields from system.columns where database = ? and table = ? order by position".to_string(); - let clickhouse_column = client - .query(&query_column) - .bind(config.common.database.clone()) - .bind(config.common.table.clone()) - .fetch_all::() - .await?; + + let (clickhouse_column, clickhouse_engine) = + query_column_engine_from_ck(client.clone(), &config).await?; + let column_correct_vec: Result> = clickhouse_column .iter() .map(Self::build_column_correct_vec) .collect(); - let clickhouse_fields_name = build_fields_name_type_from_schema(&schema)? + let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)? .iter() .map(|(a, _)| a.clone()) .collect_vec(); + + if let Some(sign) = clickhouse_engine.get_sign_name() { + rw_fields_name_after_calibration.push(sign); + } Ok(Self { config, schema, @@ -346,7 +412,8 @@ impl ClickHouseSinkWriter { client, is_append_only, column_correct_vec: column_correct_vec?, - clickhouse_fields_name, + rw_fields_name_after_calibration, + clickhouse_engine, }) } @@ -368,138 +435,84 @@ impl ClickHouseSinkWriter { } else { 0_u8 }; + let accuracy_decimal = if ck_column.r#type.contains("Decimal(") { + let decimal_all = ck_column + .r#type + .split("Decimal(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(')') + .next() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .split(", ") + .collect_vec(); + let length = decimal_all + .first() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .parse::() + .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?; + + if length > 38 { + return Err(SinkError::ClickHouse( + "RW don't support Decimal256".to_string(), + )); + } + + let scale = decimal_all + .last() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .parse::() + .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?; + (length, scale) + } else { + (0_u8, 0_u8) + }; Ok(ClickHouseSchemaFeature { can_null, accuracy_time, + accuracy_decimal, }) } - async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + async fn write(&mut self, chunk: StreamChunk) -> Result<()> { let mut insert = self.client.insert_with_fields_name( &self.config.common.table, - self.clickhouse_fields_name.clone(), + self.rw_fields_name_after_calibration.clone(), )?; for (op, row) in chunk.rows() { - if op != Op::Insert { - tracing::warn!( - "append only click house sink receive an {:?} which will be ignored.", - op - ); - continue; - } let mut clickhouse_filed_vec = vec![]; for (index, data) in row.iter().enumerate() { clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref( data, &self.column_correct_vec, index, - true, )?); } - let clickhouse_column = ClickHouseColumn { - row: clickhouse_filed_vec, - }; - insert.write(&clickhouse_column).await?; - } - insert.end().await?; - Ok(()) - } - - async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { - let get_pk_names_and_data = |row: RowRef<'_>, index: usize| { - let pk_names = self - .schema - .names() - .iter() - .cloned() - .enumerate() - .filter(|(index, _)| self.pk_indices.contains(index)) - .map(|(_, b)| b) - .collect_vec(); - let mut pk_data = vec![]; - for pk_index in &self.pk_indices { - if let ClickHouseFieldWithNull::WithoutSome(v) = - ClickHouseFieldWithNull::from_scalar_ref( - row.datum_at(*pk_index), - &self.column_correct_vec, - index, - false, - )? - .pop() - .unwrap() - { - pk_data.push(v) - } else { - return Err(SinkError::ClickHouse("pk can not be null".to_string())); - } - } - Ok((pk_names, pk_data)) - }; - - for (index, (op, row)) in chunk.rows().enumerate() { match op { - Op::Insert => { - let mut insert = self.client.insert_with_fields_name( - &self.config.common.table, - self.clickhouse_fields_name.clone(), - )?; - let mut clickhouse_filed_vec = vec![]; - for (index, data) in row.iter().enumerate() { - clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref( - data, - &self.column_correct_vec, - index, - true, - )?); + Op::Insert | Op::UpdateInsert => { + if self.clickhouse_engine.get_sign_name().is_some() { + clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome( + ClickHouseField::Int8(1), + )); } - let clickhouse_column = ClickHouseColumn { - row: clickhouse_filed_vec, - }; - insert.write(&clickhouse_column).await?; - insert.end().await?; } - Op::Delete => { - let (delete_pk_names, delete_pk_data) = get_pk_names_and_data(row, index)?; - self.client - .delete(&self.config.common.table, delete_pk_names) - .delete(delete_pk_data) - .await?; - } - Op::UpdateDelete => continue, - Op::UpdateInsert => { - let (update_pk_names, update_pk_data) = get_pk_names_and_data(row, index)?; - let mut clickhouse_update_filed_vec = vec![]; - for (index, data) in row.iter().enumerate() { - if !self.pk_indices.contains(&index) { - clickhouse_update_filed_vec.extend( - ClickHouseFieldWithNull::from_scalar_ref( - data, - &self.column_correct_vec, - index, - false, - )?, - ); - } + Op::Delete | Op::UpdateDelete => { + if !self.clickhouse_engine.is_collapsing_engine() { + return Err(SinkError::ClickHouse( + "Clickhouse engine don't support upsert".to_string(), + )); } - // Get the names of the columns excluding pk, and use them to update. - let fields_name_update = self - .clickhouse_fields_name - .iter() - .filter(|n| !update_pk_names.contains(n)) - .map(|s| s.to_string()) - .collect_vec(); - - let update = self.client.update( - &self.config.common.table, - update_pk_names, - fields_name_update.clone(), - ); - update - .update_fields(clickhouse_update_filed_vec, update_pk_data) - .await?; + clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome( + ClickHouseField::Int8(-1), + )) } } + let clickhouse_column = ClickHouseColumn { + row: clickhouse_filed_vec, + }; + insert.write(&clickhouse_column).await?; } + insert.end().await?; Ok(()) } } @@ -510,11 +523,7 @@ impl AsyncTruncateSinkWriter for ClickHouseSinkWriter { chunk: StreamChunk, _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - if self.is_append_only { - self.append_only(chunk).await - } else { - self.upsert(chunk).await - } + self.write(chunk).await } } @@ -525,6 +534,48 @@ struct SystemColumn { is_in_primary_key: u8, } +#[derive(ClickHouseRow, Deserialize)] +struct ClickhouseQueryEngine { + name: String, + engine: String, + create_table_query: String, +} + +async fn query_column_engine_from_ck( + client: ClickHouseClient, + config: &ClickHouseConfig, +) -> Result<(Vec, ClickHouseEngine)> { + let query_engine = QUERY_ENGINE; + let query_column = QUERY_COLUMN; + + let clickhouse_engine = client + .query(query_engine) + .bind(config.common.database.clone()) + .bind(config.common.table.clone()) + .fetch_all::() + .await?; + let mut clickhouse_column = client + .query(query_column) + .bind(config.common.database.clone()) + .bind(config.common.table.clone()) + .bind("position") + .fetch_all::() + .await?; + if clickhouse_engine.is_empty() || clickhouse_column.is_empty() { + return Err(SinkError::ClickHouse(format!( + "table {:?}.{:?} is not find in clickhouse", + config.common.database, config.common.table + ))); + } + + let clickhouse_engine = ClickHouseEngine::from_query_engine(clickhouse_engine.get(0).unwrap())?; + + if let Some(sign) = &clickhouse_engine.get_sign_name() { + clickhouse_column.retain(|a| sign.ne(&a.name)) + } + Ok((clickhouse_column, clickhouse_engine)) +} + /// Serialize this structure to simulate the `struct` call clickhouse interface #[derive(ClickHouseRow, Debug)] struct ClickHouseColumn { @@ -543,6 +594,26 @@ enum ClickHouseField { String(String), Bool(bool), List(Vec), + Int8(i8), + Decimal(ClickHouseDecimal), +} +#[derive(Debug)] +enum ClickHouseDecimal { + Decimal32(i32), + Decimal64(i64), + Decimal128(i128), +} +impl Serialize for ClickHouseDecimal { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + match self { + ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v), + ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v), + ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v), + } + } } /// Enum that support clickhouse nullable @@ -558,7 +629,6 @@ impl ClickHouseFieldWithNull { data: Option>, clickhouse_schema_feature_vec: &Vec, clickhouse_schema_feature_index: usize, - is_insert: bool, ) -> Result> { let clickhouse_schema_feature = clickhouse_schema_feature_vec .get(clickhouse_schema_feature_index) @@ -586,8 +656,29 @@ impl ClickHouseFieldWithNull { ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()), ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_string()), ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v), - ScalarRefImpl::Decimal(_) => { - return Err(SinkError::ClickHouse("can not support Decimal".to_string())) + ScalarRefImpl::Decimal(d) => { + if let Decimal::Normalized(d) = d { + let scale = + clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32; + + let scale = if scale < 0 { + d.mantissa() / 10_i128.pow(scale.unsigned_abs()) + } else { + d.mantissa() * 10_i128.pow(scale as u32) + }; + + if clickhouse_schema_feature.accuracy_decimal.0 <= 9 { + ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(scale as i32)) + } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 { + ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(scale as i64)) + } else { + ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(scale)) + } + } else { + return Err(SinkError::ClickHouse( + "clickhouse can not support Decimal NAN,-INF and INF".to_string(), + )); + } } ScalarRefImpl::Interval(_) => { return Err(SinkError::ClickHouse( @@ -604,14 +695,9 @@ impl ClickHouseFieldWithNull { )) } ScalarRefImpl::Timestamp(v) => { - if is_insert { - let time = v.get_timestamp_nanos() - / 10_i32.pow((9 - clickhouse_schema_feature.accuracy_time).into()) as i64; - ClickHouseField::Int64(time) - } else { - let time = v.truncate_micros().to_string(); - ClickHouseField::String(time) - } + let time = v.get_timestamp_nanos() + / 10_i32.pow((9 - clickhouse_schema_feature.accuracy_time).into()) as i64; + ClickHouseField::Int64(time) } ScalarRefImpl::Timestamptz(_) => { return Err(SinkError::ClickHouse( @@ -630,7 +716,6 @@ impl ClickHouseFieldWithNull { field, clickhouse_schema_feature_vec, clickhouse_schema_feature_index + index, - is_insert, )?; struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List( a, @@ -645,7 +730,6 @@ impl ClickHouseFieldWithNull { i, clickhouse_schema_feature_vec, clickhouse_schema_feature_index, - is_insert, )?) } return Ok(vec![ClickHouseFieldWithNull::WithoutSome( @@ -658,9 +742,7 @@ impl ClickHouseFieldWithNull { )) } }; - // Insert needs to be serialized with `Some`, update doesn't need to be serialized with - // `Some` - let data = if is_insert && clickhouse_schema_feature.can_null { + let data = if clickhouse_schema_feature.can_null { vec![ClickHouseFieldWithNull::WithSome(data)] } else { vec![ClickHouseFieldWithNull::WithoutSome(data)] @@ -690,6 +772,8 @@ impl Serialize for ClickHouseField { } s.end() } + ClickHouseField::Decimal(v) => v.serialize(serializer), + ClickHouseField::Int8(v) => serializer.serialize_i8(*v), } } }