diff --git a/e2e_test/sink/cassandra_sink.slt b/e2e_test/sink/cassandra_sink.slt index fe5ca331b591a..ce3d9ea5d6b5a 100644 --- a/e2e_test/sink/cassandra_sink.slt +++ b/e2e_test/sink/cassandra_sink.slt @@ -10,6 +10,9 @@ CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, statement ok CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int); +statement ok +set sink_decouple = false; + statement ok CREATE SINK s6 FROM diff --git a/e2e_test/sink/clickhouse_sink.slt b/e2e_test/sink/clickhouse_sink.slt index e5bac0d8d521d..e037618bb460e 100644 --- a/e2e_test/sink/clickhouse_sink.slt +++ b/e2e_test/sink/clickhouse_sink.slt @@ -17,6 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, clickhouse.password = '', clickhouse.database = 'default', clickhouse.table='demo_test', + commit_checkpoint_interval = 1, ); statement ok diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt index 70b7e911c0f17..29e600ea450f2 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE t7 ( v1 int primary key, diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 02a3b8c84b50f..c6e3cc23db0db 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -75,27 +75,6 @@ where }) } -pub(crate) fn deserialize_optional_u64_from_string<'de, D>( - deserializer: D, -) -> Result, D::Error> -where - D: de::Deserializer<'de>, -{ - let s: String = de::Deserialize::deserialize(deserializer)?; - if s.is_empty() { - Ok(None) - } else { - s.parse() - .map_err(|_| { - de::Error::invalid_value( - de::Unexpected::Str(&s), - &"integer greater than or equal to 0", - ) - }) - .map(Some) - } -} - pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>( deserializer: D, ) -> std::result::Result>, D::Error> diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 4337f2b9d76b7..6b3e78f6a7b9d 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -30,16 +30,18 @@ use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial}; use serde::ser::{SerializeSeq, SerializeStruct}; use serde::Serialize; use serde_derive::Deserialize; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; use tonic::async_trait; use tracing::warn; use with_options::WithOptions; -use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; +use super::decouple_checkpoint_log_sink::{ + default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, + DEFAULT_COMMIT_CHECKPOINT_INTERVAL, +}; use super::writer::SinkWriter; use super::{DummySinkCommitCoordinator, SinkWriterParam}; -use crate::deserialize_optional_u64_from_string; use crate::error::ConnectorResult; use crate::sink::catalog::desc::SinkDesc; use crate::sink::{ @@ -52,6 +54,7 @@ const QUERY_COLUMN: &str = "select distinct ?fields from system.columns where database = ? and table = ? order by ?"; pub const CLICKHOUSE_SINK: &str = "clickhouse"; +#[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct ClickHouseCommon { #[serde(rename = "clickhouse.url")] @@ -66,9 +69,10 @@ pub struct ClickHouseCommon { pub table: String, #[serde(rename = "clickhouse.delete.column")] pub delete_column: Option, - /// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. - #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] - pub commit_checkpoint_interval: Option, + /// Commit every n(>0) checkpoints, default is 10. + #[serde(default = "default_commit_checkpoint_interval")] + #[serde_as(as = "DisplayFromStr")] + pub commit_checkpoint_interval: u64, } #[allow(clippy::enum_variant_names)] @@ -494,26 +498,25 @@ impl Sink for ClickHouseSink { const SINK_NAME: &'static str = CLICKHOUSE_SINK; fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - let config_decouple = if let Some(interval) = - desc.properties.get("commit_checkpoint_interval") - && interval.parse::().unwrap_or(0) > 1 - { - true - } else { - false - }; + let commit_checkpoint_interval = + if let Some(interval) = desc.properties.get("commit_checkpoint_interval") { + interval + .parse::() + .unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL) + } else { + DEFAULT_COMMIT_CHECKPOINT_INTERVAL + }; match user_specified { - SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => { - if config_decouple { + if commit_checkpoint_interval > 1 { return Err(SinkError::Config(anyhow!( "config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" ))); } Ok(false) } - SinkDecouple::Enable => Ok(true), } } @@ -552,9 +555,9 @@ impl Sink for ClickHouseSink { self.check_pk_match(&clickhouse_column)?; } - if self.config.common.commit_checkpoint_interval == Some(0) { + if self.config.common.commit_checkpoint_interval == 0 { return Err(SinkError::Config(anyhow!( - "commit_checkpoint_interval must be greater than 0" + "`commit_checkpoint_interval` must be greater than 0" ))); } Ok(()) @@ -569,7 +572,7 @@ impl Sink for ClickHouseSink { ) .await?; let commit_checkpoint_interval = - NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect( + NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect( "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", ); diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 26576cf3e3666..4ba57e3adda7a 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -20,6 +20,11 @@ use async_trait::async_trait; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; +pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 10; + +pub fn default_commit_checkpoint_interval() -> u64 { + DEFAULT_COMMIT_CHECKPOINT_INTERVAL +} /// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`). /// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader, diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index 661145959a0ff..2dedffa3469e3 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -38,23 +38,26 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; use serde_derive::{Deserialize, Serialize}; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; use super::catalog::desc::SinkDesc; use super::coordinate::CoordinatedSinkWriter; -use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; +use super::decouple_checkpoint_log_sink::{ + default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, + DEFAULT_COMMIT_CHECKPOINT_INTERVAL, +}; use super::writer::SinkWriter; use super::{ Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; -use crate::deserialize_optional_u64_from_string; pub const DELTALAKE_SINK: &str = "deltalake"; pub const DEFAULT_REGION: &str = "us-east-1"; pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key"; +#[serde_as] #[derive(Deserialize, Serialize, Debug, Clone, WithOptions)] pub struct DeltaLakeCommon { #[serde(rename = "s3.access.key")] @@ -69,10 +72,12 @@ pub struct DeltaLakeCommon { pub s3_endpoint: Option, #[serde(rename = "gcs.service.account")] pub gcs_service_account: Option, - /// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. - #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] - pub commit_checkpoint_interval: Option, + /// Commit every n(>0) checkpoints, default is 10. + #[serde(default = "default_commit_checkpoint_interval")] + #[serde_as(as = "DisplayFromStr")] + pub commit_checkpoint_interval: u64, } + impl DeltaLakeCommon { pub async fn create_deltalake_client(&self) -> Result { let table = match Self::get_table_url(&self.location)? { @@ -281,26 +286,25 @@ impl Sink for DeltaLakeSink { const SINK_NAME: &'static str = DELTALAKE_SINK; fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - let config_decouple = if let Some(interval) = - desc.properties.get("commit_checkpoint_interval") - && interval.parse::().unwrap_or(0) > 1 - { - true - } else { - false - }; + let commit_checkpoint_interval = + if let Some(interval) = desc.properties.get("commit_checkpoint_interval") { + interval + .parse::() + .unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL) + } else { + DEFAULT_COMMIT_CHECKPOINT_INTERVAL + }; match user_specified { - SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => { - if config_decouple { + if commit_checkpoint_interval > 1 { return Err(SinkError::Config(anyhow!( "config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" ))); } Ok(false) } - SinkDecouple::Enable => Ok(true), } } @@ -328,7 +332,7 @@ impl Sink for DeltaLakeSink { .await?; let commit_checkpoint_interval = - NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect( + NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect( "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", ); @@ -380,9 +384,9 @@ impl Sink for DeltaLakeSink { ))); } } - if self.config.common.commit_checkpoint_interval == Some(0) { + if self.config.common.commit_checkpoint_interval == 0 { return Err(SinkError::Config(anyhow!( - "commit_checkpoint_interval must be greater than 0" + "`commit_checkpoint_interval` must be greater than 0" ))); } Ok(()) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index a01daa59c1272..ea0e0e4776318 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -29,13 +29,11 @@ use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::publisher::{Awaiter, Publisher}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde_derive::Deserialize; use serde_with::serde_as; use tonic::Status; use with_options::WithOptions; -use super::catalog::desc::SinkDesc; use super::catalog::SinkFormatDesc; use super::formatter::SinkFormatterImpl; use super::log_store::DeliveryFutureManagerAddFuture; @@ -114,13 +112,6 @@ impl Sink for GooglePubSubSink { const SINK_NAME: &'static str = PUBSUB_SINK; - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => Ok(false), - } - } - async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::GooglePubSub(anyhow!( diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 0e2d06d1dcaf7..609b234af05a3 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -55,6 +55,7 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; use serde_derive::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; use storage_catalog::StorageCatalogConfig; use thiserror_ext::AsReport; use url::Url; @@ -64,7 +65,10 @@ use self::mock_catalog::MockCatalog; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::catalog::desc::SinkDesc; -use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; +use super::decouple_checkpoint_log_sink::{ + default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, + DEFAULT_COMMIT_CHECKPOINT_INTERVAL, +}; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; @@ -72,14 +76,12 @@ use crate::error::ConnectorResult; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::writer::SinkWriter; use crate::sink::{Result, SinkCommitCoordinator, SinkDecouple, SinkParam}; -use crate::{ - deserialize_bool_from_string, deserialize_optional_string_seq_from_string, - deserialize_optional_u64_from_string, -}; +use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string}; /// This iceberg sink is WIP. When it ready, we will change this name to "iceberg". pub const ICEBERG_SINK: &str = "iceberg"; +#[serde_as] #[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions, Default)] pub struct IcebergConfig { pub connector: String, // Avoid deny unknown field. Must be "iceberg" @@ -142,9 +144,10 @@ pub struct IcebergConfig { #[serde(skip)] pub java_catalog_props: HashMap, - // Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. - #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] - pub commit_checkpoint_interval: Option, + /// Commit every n(>0) checkpoints, default is 10. + #[serde(default = "default_commit_checkpoint_interval")] + #[serde_as(as = "DisplayFromStr")] + pub commit_checkpoint_interval: u64, } impl IcebergConfig { @@ -196,9 +199,9 @@ impl IcebergConfig { .map(|(k, v)| (k[8..].to_string(), v.to_string())) .collect(); - if config.commit_checkpoint_interval == Some(0) { + if config.commit_checkpoint_interval == 0 { return Err(SinkError::Config(anyhow!( - "commit_checkpoint_interval must be greater than 0" + "`commit_checkpoint_interval` must be greater than 0" ))); } @@ -752,26 +755,25 @@ impl Sink for IcebergSink { const SINK_NAME: &'static str = ICEBERG_SINK; fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - let config_decouple = if let Some(interval) = - desc.properties.get("commit_checkpoint_interval") - && interval.parse::().unwrap_or(0) > 1 - { - true - } else { - false - }; + let commit_checkpoint_interval = + if let Some(interval) = desc.properties.get("commit_checkpoint_interval") { + interval + .parse::() + .unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL) + } else { + DEFAULT_COMMIT_CHECKPOINT_INTERVAL + }; match user_specified { - SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => { - if config_decouple { + if commit_checkpoint_interval > 1 { return Err(SinkError::Config(anyhow!( - "config conflict: Iceberg config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled" + "config conflict: Iceberg config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" ))); } Ok(false) } - SinkDecouple::Enable => Ok(true), } } @@ -809,7 +811,7 @@ impl Sink for IcebergSink { .await?; let commit_checkpoint_interval = - NonZeroU64::new(self.config.commit_checkpoint_interval.unwrap_or(1)).expect( + NonZeroU64::new(self.config.commit_checkpoint_interval).expect( "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", ); @@ -1298,6 +1300,7 @@ mod test { use risingwave_common::catalog::Field; + use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL; use crate::sink::iceberg::IcebergConfig; use crate::source::DataType; @@ -1380,7 +1383,7 @@ mod test { .into_iter() .map(|(k, v)| (k.to_string(), v.to_string())) .collect(), - commit_checkpoint_interval: None, + commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL, }; assert_eq!(iceberg_config, expected_iceberg_config); diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 588c5d99ae955..ad2b4044b3c3b 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -26,7 +26,6 @@ use rdkafka::types::RDKafkaErrorCode; use rdkafka::ClientConfig; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; @@ -38,7 +37,6 @@ use super::{Sink, SinkError, SinkParam}; use crate::connector_common::{ AwsAuthProps, KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon, }; -use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; use crate::sink::writer::{ @@ -321,13 +319,6 @@ impl Sink for KafkaSink { const SINK_NAME: &'static str = KAFKA_SINK; - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => Ok(false), - } - } - async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { let formatter = SinkFormatterImpl::new( &self.format_desc, diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index b10f25d962124..db1bf1c7def39 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -23,7 +23,6 @@ use futures::{FutureExt, TryFuture}; use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde_derive::Deserialize; use serde_with::serde_as; use with_options::WithOptions; @@ -32,7 +31,6 @@ use super::catalog::SinkFormatDesc; use super::SinkParam; use crate::connector_common::KinesisCommon; use crate::dispatch_sink_formatter_str_key_impl; -use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; use crate::sink::writer::{ @@ -79,13 +77,6 @@ impl Sink for KinesisSink { const SINK_NAME: &'static str = KINESIS_SINK; - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => Ok(false), - } - } - async fn validate(&self) -> Result<()> { // Kinesis requires partition key. There is no builtin support for round-robin as in kafka/pulsar. // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 0ee077bbdccb8..3cfd72feabaa1 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -370,8 +370,8 @@ pub trait Sink: TryFrom { /// `user_specified` is the value of `sink_decouple` config. fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Disable | SinkDecouple::Default => Ok(false), - SinkDecouple::Enable => Ok(true), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), + SinkDecouple::Disable => Ok(false), } } diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index 8840c72176960..d09d44b0de9dc 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -25,7 +25,6 @@ use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; use risingwave_common::must_match; use risingwave_common::row::Row; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::ScalarRefImpl; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; @@ -33,7 +32,6 @@ use thiserror_ext::AsReport; use tonic::async_trait; use with_options::WithOptions; -use super::catalog::desc::SinkDesc; use super::encoder::BsonEncoder; use crate::connector_common::MongodbCommon; use crate::deserialize_bool_from_string; @@ -185,14 +183,6 @@ impl Sink for MongodbSink { const SINK_NAME: &'static str = MONGODB_SINK; - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - match user_specified { - // Set default sink decouple to false, because mongodb sink writer only ensure delivery on checkpoint barrier - SinkDecouple::Default | SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), - } - } - async fn validate(&self) -> Result<()> { if !self.is_append_only { if self.pk_indices.is_empty() { diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index 896da3a7f4d89..5caf348cd60d2 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -20,7 +20,6 @@ use anyhow::{anyhow, Context as _}; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::{DataType, ScalarRefImpl}; use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::ConnectionError; @@ -38,7 +37,6 @@ use super::writer::AsyncTruncateSinkWriterExt; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::connector_common::MqttCommon; use crate::deserialize_bool_from_string; -use crate::sink::catalog::desc::SinkDesc; use crate::sink::log_store::DeliveryFutureManagerAddFuture; use crate::sink::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter}; use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY}; @@ -165,13 +163,6 @@ impl Sink for MqttSink { const SINK_NAME: &'static str = MQTT_SINK; - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => Ok(false), - } - } - async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::Mqtt(anyhow!( diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 878cc1bee27d2..c5b1c49d5c2e0 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -21,7 +21,6 @@ use futures::prelude::TryFuture; use futures::FutureExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde_derive::Deserialize; use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -34,7 +33,6 @@ use super::encoder::{ use super::utils::chunk_to_json; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::connector_common::NatsCommon; -use crate::sink::catalog::desc::SinkDesc; use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; use crate::sink::log_store::DeliveryFutureManagerAddFuture; use crate::sink::writer::{ @@ -107,13 +105,6 @@ impl Sink for NatsSink { const SINK_NAME: &'static str = NATS_SINK; - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => Ok(false), - } - } - async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::Nats(anyhow!( diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index a92d5b16f85e3..d2135bf493d18 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -22,7 +22,6 @@ use pulsar::producer::{Message, SendFuture}; use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; @@ -30,7 +29,6 @@ use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam, SinkWriterParam}; use crate::connector_common::{AwsAuthProps, PulsarCommon, PulsarOauthCommon}; -use crate::sink::catalog::desc::SinkDesc; use crate::sink::encoder::SerTo; use crate::sink::formatter::{SinkFormatter, SinkFormatterImpl}; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -170,13 +168,6 @@ impl Sink for PulsarSink { const SINK_NAME: &'static str = PULSAR_SINK; - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => Ok(false), - } - } - async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { Ok(PulsarSinkWriter::new( self.config.clone(), diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 606965a8424d7..6fcef5d41b654 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -75,9 +75,7 @@ macro_rules! def_remote_sink { { ElasticSearch, ElasticSearchSink, "elasticsearch" } { Opensearch, OpenSearchSink, "opensearch"} { Cassandra, CassandraSink, "cassandra" } - { Jdbc, JdbcSink, "jdbc", |desc| { - desc.sink_type.is_append_only() - } } + { Jdbc, JdbcSink, "jdbc" } { DeltaLake, DeltaLakeSink, "deltalake" } { HttpJava, HttpJavaSink, "http" } } @@ -119,7 +117,7 @@ def_remote_sink!(); pub trait RemoteSinkTrait: Send + Sync + 'static { const SINK_NAME: &'static str; fn default_sink_decouple(_desc: &SinkDesc) -> bool { - false + true } } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 84d3f95131758..54537580ea5a5 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -38,6 +38,7 @@ use tokio::task::JoinHandle; use url::form_urlencoded; use with_options::WithOptions; +use super::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL; use super::doris_starrocks_connector::{ HeaderBuilder, InserterInner, StarrocksTxnRequestBuilder, STARROCKS_DELETE_SIGN, STARROCKS_SUCCESS_STATUS, @@ -47,7 +48,6 @@ use super::{ SinkCommitCoordinator, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; -use crate::deserialize_optional_u64_from_string; use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; @@ -105,9 +105,10 @@ pub struct StarrocksConfig { /// to Starrocks at every n checkpoints by leveraging the /// [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), /// also, in this time, the `sink_decouple` option should be enabled as well. - /// Defaults to 1 if commit_checkpoint_interval <= 0 - #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] - pub commit_checkpoint_interval: Option, + /// Defaults to 10 if commit_checkpoint_interval <= 0 + #[serde(default = "default_commit_checkpoint_interval")] + #[serde_as(as = "DisplayFromStr")] + pub commit_checkpoint_interval: u64, /// Enable partial update #[serde(rename = "starrocks.partial_update")] @@ -116,6 +117,10 @@ pub struct StarrocksConfig { pub r#type: String, // accept "append-only" or "upsert" } +fn default_commit_checkpoint_interval() -> u64 { + DEFAULT_COMMIT_CHECKPOINT_INTERVAL +} + impl StarrocksConfig { pub fn from_btreemap(properties: BTreeMap) -> Result { let config = @@ -129,9 +134,9 @@ impl StarrocksConfig { SINK_TYPE_UPSERT ))); } - if config.commit_checkpoint_interval == Some(0) { + if config.commit_checkpoint_interval == 0 { return Err(SinkError::Config(anyhow!( - "commit_checkpoint_interval must be greater than 0" + "`commit_checkpoint_interval` must be greater than 0" ))); } Ok(config) @@ -260,26 +265,25 @@ impl Sink for StarrocksSink { const SINK_NAME: &'static str = STARROCKS_SINK; fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - let config_decouple = if let Some(interval) = - desc.properties.get("commit_checkpoint_interval") - && interval.parse::().unwrap_or(0) > 1 - { - true - } else { - false - }; + let commit_checkpoint_interval = + if let Some(interval) = desc.properties.get("commit_checkpoint_interval") { + interval + .parse::() + .unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL) + } else { + DEFAULT_COMMIT_CHECKPOINT_INTERVAL + }; match user_specified { - SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => { - if config_decouple { + if commit_checkpoint_interval > 1 { return Err(SinkError::Config(anyhow!( - "config conflict: StarRocks config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" + "config conflict: Starrocks config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" ))); } Ok(false) } - SinkDecouple::Enable => Ok(true), } } @@ -323,7 +327,7 @@ impl Sink for StarrocksSink { async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { let commit_checkpoint_interval = - NonZeroU64::new(self.config.commit_checkpoint_interval.unwrap_or(1)).expect( + NonZeroU64::new(self.config.commit_checkpoint_interval).expect( "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", ); diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index c6c36917e9c21..4672da3025d56 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -98,9 +98,9 @@ ClickHouseConfig: required: false - name: commit_checkpoint_interval field_type: u64 - comments: Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + comments: Commit every n(>0) checkpoints, default is 10. required: false - default: Default::default + default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL - name: r#type field_type: String required: true @@ -126,9 +126,9 @@ DeltaLakeConfig: required: false - name: commit_checkpoint_interval field_type: u64 - comments: Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + comments: Commit every n(>0) checkpoints, default is 10. required: false - default: Default::default + default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL - name: r#type field_type: String required: true @@ -322,8 +322,9 @@ IcebergConfig: required: false - name: commit_checkpoint_interval field_type: u64 + comments: Commit every n(>0) checkpoints, default is 10. required: false - default: Default::default + default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL KafkaConfig: fields: - name: properties.bootstrap.server @@ -988,9 +989,9 @@ StarrocksConfig: to Starrocks at every n checkpoints by leveraging the [StreamLoad Transaction API](https://docs.starrocks.io/docs/loading/Stream_Load_transaction_interface/), also, in this time, the `sink_decouple` option should be enabled as well. - Defaults to 1 if commit_checkpoint_interval <= 0 + Defaults to 10 if commit_checkpoint_interval <= 0 required: false - default: Default::default + default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL - name: starrocks.partial_update field_type: String comments: Enable partial update