Skip to content

Commit

Permalink
feat(sink): support delta lake checkpoint decouple (#16777)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored May 21, 2024
1 parent 57d4da2 commit 3d58cc7
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};

pub struct IcebergLogSinkerOf<W> {
pub struct DecoupleCheckpointLogSinkerOf<W> {
writer: W,
sink_metrics: SinkMetrics,
commit_checkpoint_interval: NonZeroU64,
}

impl<W> IcebergLogSinkerOf<W> {
impl<W> DecoupleCheckpointLogSinkerOf<W> {
/// Create a log sinker with a commit checkpoint interval. The sinker should be used with a
/// decouple log reader `KvLogStoreReader`.
pub fn new(
writer: W,
sink_metrics: SinkMetrics,
commit_checkpoint_interval: NonZeroU64,
) -> Self {
IcebergLogSinkerOf {
DecoupleCheckpointLogSinkerOf {
writer,
sink_metrics,
commit_checkpoint_interval,
Expand All @@ -44,7 +44,7 @@ impl<W> IcebergLogSinkerOf<W> {
}

#[async_trait]
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for IcebergLogSinkerOf<W> {
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> {
let mut sink_writer = self.writer;
let sink_metrics = self.sink_metrics;
Expand Down
53 changes: 47 additions & 6 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::num::NonZeroU64;
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -30,6 +31,7 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
Expand All @@ -39,13 +41,15 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::serde_as;
use with_options::WithOptions;

use super::catalog::desc::SinkDesc;
use super::coordinate::CoordinatedSinkWriter;
use super::writer::{LogSinkerOf, SinkWriter};
use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use super::writer::SinkWriter;
use super::{
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use crate::sink::writer::SinkWriterExt;
use crate::deserialize_optional_u64_from_string;

pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";
Expand All @@ -65,6 +69,9 @@ pub struct DeltaLakeCommon {
pub s3_endpoint: Option<String>,
#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
}
impl DeltaLakeCommon {
pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
Expand Down Expand Up @@ -269,18 +276,42 @@ fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -

impl Sink for DeltaLakeSink {
type Coordinator = DeltaLakeSinkCommitter;
type LogSinker = LogSinkerOf<CoordinatedSinkWriter<DeltaLakeSinkWriter>>;
type LogSinker = DecoupleCheckpointLogSinkerOf<CoordinatedSinkWriter<DeltaLakeSinkWriter>>;

const SINK_NAME: &'static str = DELTALAKE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let config_decouple = if let Some(interval) =
desc.properties.get("commit_checkpoint_interval")
&& interval.parse::<u64>().unwrap_or(0) > 1
{
true
} else {
false
};

match user_specified {
SinkDecouple::Default => Ok(config_decouple),
SinkDecouple::Disable => {
if config_decouple {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled"
)));
}
Ok(false)
}
SinkDecouple::Enable => Ok(true),
}
}

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let inner = DeltaLakeSinkWriter::new(
self.config.clone(),
self.param.schema().clone(),
self.param.downstream_pk.clone(),
)
.await?;
Ok(CoordinatedSinkWriter::new(
let writer = CoordinatedSinkWriter::new(
writer_param
.meta_client
.expect("should have meta client")
Expand All @@ -294,8 +325,18 @@ impl Sink for DeltaLakeSink {
})?,
inner,
)
.await?
.into_log_sinker(writer_param.sink_metrics))
.await?;

let commit_checkpoint_interval =
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);

Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
writer_param.sink_metrics,
commit_checkpoint_interval,
))
}

async fn validate(&self) -> Result<()> {
Expand Down
7 changes: 3 additions & 4 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

mod jni_catalog;
mod log_sink;
mod mock_catalog;
mod prometheus;

Expand Down Expand Up @@ -54,11 +53,11 @@ use thiserror_ext::AsReport;
use url::Url;
use with_options::WithOptions;

use self::log_sink::IcebergLogSinkerOf;
use self::mock_catalog::MockCatalog;
use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder;
use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
use super::catalog::desc::SinkDesc;
use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
use super::{
Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
Expand Down Expand Up @@ -516,7 +515,7 @@ impl IcebergSink {

impl Sink for IcebergSink {
type Coordinator = IcebergSinkCommitter;
type LogSinker = IcebergLogSinkerOf<CoordinatedSinkWriter<IcebergWriter>>;
type LogSinker = DecoupleCheckpointLogSinkerOf<CoordinatedSinkWriter<IcebergWriter>>;

const SINK_NAME: &'static str = ICEBERG_SINK;

Expand Down Expand Up @@ -577,7 +576,7 @@ impl Sink for IcebergSink {
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
);

Ok(IcebergLogSinkerOf::new(
Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
writer_param.sink_metrics,
commit_checkpoint_interval,
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod boxed;
pub mod catalog;
pub mod clickhouse;
pub mod coordinate;
pub mod decouple_checkpoint_log_sink;
pub mod deltalake;
pub mod doris;
pub mod doris_starrocks_connector;
Expand Down
4 changes: 4 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ DeltaLakeConfig:
- name: gcs.service.account
field_type: String
required: false
- name: commit_checkpoint_interval
field_type: u64
required: false
default: Default::default
- name: r#type
field_type: String
required: true
Expand Down

0 comments on commit 3d58cc7

Please sign in to comment.