From ed0937213e87f4f90c2a4c0b5716f10a4e9ef7c3 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Fri, 28 Jun 2024 14:57:25 +0800 Subject: [PATCH] feat(sink): support clickhouse sink checkpoint decouple (#17491) --- src/connector/src/sink/clickhouse.rs | 87 +++++++++++++++++++++------- src/connector/src/sink/deltalake.rs | 5 ++ src/connector/with_options_sink.yaml | 5 ++ 3 files changed, 76 insertions(+), 21 deletions(-) diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 8af58f668a44..bd5ebe8a6ce8 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -13,13 +13,16 @@ // limitations under the License. use core::fmt::Debug; +use core::num::NonZeroU64; use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; use anyhow::anyhow; use clickhouse::insert::Insert; use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow}; use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::session_config::sink_decouple::SinkDecouple; @@ -29,16 +32,16 @@ use serde::Serialize; use serde_derive::Deserialize; use serde_with::serde_as; use thiserror_ext::AsReport; +use tonic::async_trait; use tracing::warn; use with_options::WithOptions; +use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; +use super::writer::SinkWriter; use super::{DummySinkCommitCoordinator, SinkWriterParam}; +use crate::deserialize_optional_u64_from_string; use crate::error::ConnectorResult; use crate::sink::catalog::desc::SinkDesc; -use crate::sink::log_store::DeliveryFutureManagerAddFuture; -use crate::sink::writer::{ - AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, -}; use crate::sink::{ Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; @@ -63,6 +66,9 @@ pub struct ClickHouseCommon { pub table: String, #[serde(rename = "clickhouse.delete.column")] pub delete_column: Option, + /// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] + pub commit_checkpoint_interval: Option, } #[allow(clippy::enum_variant_names)] @@ -406,14 +412,31 @@ impl ClickHouseSink { } impl Sink for ClickHouseSink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = AsyncTruncateLogSinkerOf; + type LogSinker = DecoupleCheckpointLogSinkerOf; const SINK_NAME: &'static str = CLICKHOUSE_SINK; - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + let config_decouple = if let Some(interval) = + desc.properties.get("commit_checkpoint_interval") + && interval.parse::().unwrap_or(0) > 1 + { + true + } else { + false + }; + match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => Ok(false), + SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Disable => { + if config_decouple { + return Err(SinkError::Config(anyhow!( + "config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" + ))); + } + Ok(false) + } + SinkDecouple::Enable => Ok(true), } } @@ -446,18 +469,33 @@ impl Sink for ClickHouseSink { if !self.is_append_only { self.check_pk_match(&clickhouse_column)?; } + + if self.config.common.commit_checkpoint_interval == Some(0) { + return Err(SinkError::Config(anyhow!( + "commit_checkpoint_interval must be greater than 0" + ))); + } Ok(()) } - async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { - Ok(ClickHouseSinkWriter::new( + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + let writer = ClickHouseSinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, ) - .await? - .into_log_sinker(usize::MAX)) + .await?; + let commit_checkpoint_interval = + NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect( + "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", + ); + + Ok(DecoupleCheckpointLogSinkerOf::new( + writer, + writer_param.sink_metrics, + commit_checkpoint_interval, + )) } } pub struct ClickHouseSinkWriter { @@ -647,23 +685,30 @@ impl ClickHouseSinkWriter { } } -impl AsyncTruncateSinkWriter for ClickHouseSinkWriter { - async fn write_chunk<'a>( - &'a mut self, - chunk: StreamChunk, - _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, - ) -> Result<()> { +#[async_trait] +impl SinkWriter for ClickHouseSinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { self.write(chunk).await } + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { - if let Some(inserter) = self.inserter.take() - && is_checkpoint - { + if is_checkpoint && let Some(inserter) = self.inserter.take() { inserter.end().await?; } Ok(()) } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } } #[derive(ClickHouseRow, Deserialize, Clone)] diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index 38427f935018..4ad98da4975f 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -380,6 +380,11 @@ impl Sink for DeltaLakeSink { ))); } } + if self.config.common.commit_checkpoint_interval == Some(0) { + return Err(SinkError::Config(anyhow!( + "commit_checkpoint_interval must be greater than 0" + ))); + } Ok(()) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 7deb67a524fc..27362900dd05 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -95,6 +95,11 @@ ClickHouseConfig: - name: clickhouse.delete.column field_type: String required: false + - name: commit_checkpoint_interval + field_type: u64 + comments: Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + required: false + default: Default::default - name: r#type field_type: String required: true