diff --git a/src/connector/src/sink/iceberg/log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs similarity index 96% rename from src/connector/src/sink/iceberg/log_sink.rs rename to src/connector/src/sink/decouple_checkpoint_log_sink.rs index dc9ee34f7dda..ed30d83eae69 100644 --- a/src/connector/src/sink/iceberg/log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -21,13 +21,13 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; -pub struct IcebergLogSinkerOf { +pub struct DecoupleCheckpointLogSinkerOf { writer: W, sink_metrics: SinkMetrics, commit_checkpoint_interval: NonZeroU64, } -impl IcebergLogSinkerOf { +impl DecoupleCheckpointLogSinkerOf { /// Create a log sinker with a commit checkpoint interval. The sinker should be used with a /// decouple log reader `KvLogStoreReader`. pub fn new( @@ -35,7 +35,7 @@ impl IcebergLogSinkerOf { sink_metrics: SinkMetrics, commit_checkpoint_interval: NonZeroU64, ) -> Self { - IcebergLogSinkerOf { + DecoupleCheckpointLogSinkerOf { writer, sink_metrics, commit_checkpoint_interval, @@ -44,7 +44,7 @@ impl IcebergLogSinkerOf { } #[async_trait] -impl> LogSinker for IcebergLogSinkerOf { +impl> LogSinker for DecoupleCheckpointLogSinkerOf { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { let mut sink_writer = self.writer; let sink_metrics = self.sink_metrics; diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index e81bb141a4c3..f5940df637ca 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::num::NonZeroU64; use std::collections::HashMap; use std::sync::Arc; @@ -30,6 +31,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; @@ -39,13 +41,15 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::serde_as; use with_options::WithOptions; +use super::catalog::desc::SinkDesc; use super::coordinate::CoordinatedSinkWriter; -use super::writer::{LogSinkerOf, SinkWriter}; +use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; +use super::writer::SinkWriter; use super::{ Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; -use crate::sink::writer::SinkWriterExt; +use crate::deserialize_optional_u64_from_string; pub const DELTALAKE_SINK: &str = "deltalake"; pub const DEFAULT_REGION: &str = "us-east-1"; @@ -65,6 +69,9 @@ pub struct DeltaLakeCommon { pub s3_endpoint: Option, #[serde(rename = "gcs.service.account")] pub gcs_service_account: Option, + // Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] + pub commit_checkpoint_interval: Option, } impl DeltaLakeCommon { pub async fn create_deltalake_client(&self) -> Result { @@ -269,10 +276,34 @@ fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) - impl Sink for DeltaLakeSink { type Coordinator = DeltaLakeSinkCommitter; - type LogSinker = LogSinkerOf>; + type LogSinker = DecoupleCheckpointLogSinkerOf>; const SINK_NAME: &'static str = DELTALAKE_SINK; + fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + let config_decouple = if let Some(interval) = + desc.properties.get("commit_checkpoint_interval") + && interval.parse::().unwrap_or(0) > 1 + { + true + } else { + false + }; + + match user_specified { + SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Disable => { + if config_decouple { + return Err(SinkError::Config(anyhow!( + "config conflict: DeltaLake config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled" + ))); + } + Ok(false) + } + SinkDecouple::Enable => Ok(true), + } + } + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { let inner = DeltaLakeSinkWriter::new( self.config.clone(), @@ -280,7 +311,7 @@ impl Sink for DeltaLakeSink { self.param.downstream_pk.clone(), ) .await?; - Ok(CoordinatedSinkWriter::new( + let writer = CoordinatedSinkWriter::new( writer_param .meta_client .expect("should have meta client") @@ -294,8 +325,18 @@ impl Sink for DeltaLakeSink { })?, inner, ) - .await? - .into_log_sinker(writer_param.sink_metrics)) + .await?; + + let commit_checkpoint_interval = + NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect( + "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", + ); + + Ok(DecoupleCheckpointLogSinkerOf::new( + writer, + writer_param.sink_metrics, + commit_checkpoint_interval, + )) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index f2e3b45d5915..deceeee249b5 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -13,7 +13,6 @@ // limitations under the License. mod jni_catalog; -mod log_sink; mod mock_catalog; mod prometheus; @@ -54,11 +53,11 @@ use thiserror_ext::AsReport; use url::Url; use with_options::WithOptions; -use self::log_sink::IcebergLogSinkerOf; use self::mock_catalog::MockCatalog; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::catalog::desc::SinkDesc; +use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; @@ -516,7 +515,7 @@ impl IcebergSink { impl Sink for IcebergSink { type Coordinator = IcebergSinkCommitter; - type LogSinker = IcebergLogSinkerOf>; + type LogSinker = DecoupleCheckpointLogSinkerOf>; const SINK_NAME: &'static str = ICEBERG_SINK; @@ -577,7 +576,7 @@ impl Sink for IcebergSink { "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", ); - Ok(IcebergLogSinkerOf::new( + Ok(DecoupleCheckpointLogSinkerOf::new( writer, writer_param.sink_metrics, commit_checkpoint_interval, diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 2ef4bb953b67..699398870979 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -17,6 +17,7 @@ pub mod boxed; pub mod catalog; pub mod clickhouse; pub mod coordinate; +pub mod decouple_checkpoint_log_sink; pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 3dbe5d394a8e..83f7d3bba3cd 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -107,6 +107,10 @@ DeltaLakeConfig: - name: gcs.service.account field_type: String required: false + - name: commit_checkpoint_interval + field_type: u64 + required: false + default: Default::default - name: r#type field_type: String required: true