Skip to content

Commit

Permalink
feat(sink): support clickhouse sink checkpoint decouple (#17491)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jun 28, 2024
1 parent 18f9980 commit ed09372
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 21 deletions.
87 changes: 66 additions & 21 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
// limitations under the License.

use core::fmt::Debug;
use core::num::NonZeroU64;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use anyhow::anyhow;
use clickhouse::insert::Insert;
use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
Expand All @@ -29,16 +32,16 @@ use serde::Serialize;
use serde_derive::Deserialize;
use serde_with::serde_as;
use thiserror_ext::AsReport;
use tonic::async_trait;
use tracing::warn;
use with_options::WithOptions;

use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use super::writer::SinkWriter;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::deserialize_optional_u64_from_string;
use crate::error::ConnectorResult;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
use crate::sink::writer::{
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt,
};
use crate::sink::{
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
Expand All @@ -63,6 +66,9 @@ pub struct ClickHouseCommon {
pub table: String,
#[serde(rename = "clickhouse.delete.column")]
pub delete_column: Option<String>,
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
}

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -406,14 +412,31 @@ impl ClickHouseSink {
}
impl Sink for ClickHouseSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = AsyncTruncateLogSinkerOf<ClickHouseSinkWriter>;
type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let config_decouple = if let Some(interval) =
desc.properties.get("commit_checkpoint_interval")
&& interval.parse::<u64>().unwrap_or(0) > 1
{
true
} else {
false
};

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Default => Ok(config_decouple),
SinkDecouple::Disable => {
if config_decouple {
return Err(SinkError::Config(anyhow!(
"config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
}
SinkDecouple::Enable => Ok(true),
}
}

Expand Down Expand Up @@ -446,18 +469,33 @@ impl Sink for ClickHouseSink {
if !self.is_append_only {
self.check_pk_match(&clickhouse_column)?;
}

if self.config.common.commit_checkpoint_interval == Some(0) {
return Err(SinkError::Config(anyhow!(
"commit_checkpoint_interval must be greater than 0"
)));
}
Ok(())
}

async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(ClickHouseSinkWriter::new(
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let writer = ClickHouseSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?
.into_log_sinker(usize::MAX))
.await?;
let commit_checkpoint_interval =
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);

Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
writer_param.sink_metrics,
commit_checkpoint_interval,
))
}
}
pub struct ClickHouseSinkWriter {
Expand Down Expand Up @@ -647,23 +685,30 @@ impl ClickHouseSinkWriter {
}
}

impl AsyncTruncateSinkWriter for ClickHouseSinkWriter {
async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
#[async_trait]
impl SinkWriter for ClickHouseSinkWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
self.write(chunk).await
}

async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
if let Some(inserter) = self.inserter.take()
&& is_checkpoint
{
if is_checkpoint && let Some(inserter) = self.inserter.take() {
inserter.end().await?;
}
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc<Bitmap>) -> Result<()> {
Ok(())
}
}

#[derive(ClickHouseRow, Deserialize, Clone)]
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ impl Sink for DeltaLakeSink {
)));
}
}
if self.config.common.commit_checkpoint_interval == Some(0) {
return Err(SinkError::Config(anyhow!(
"commit_checkpoint_interval must be greater than 0"
)));
}
Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ ClickHouseConfig:
- name: clickhouse.delete.column
field_type: String
required: false
- name: commit_checkpoint_interval
field_type: u64
comments: Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
required: false
default: Default::default
- name: r#type
field_type: String
required: true
Expand Down

0 comments on commit ed09372

Please sign in to comment.