diff --git a/ci/scripts/e2e-cassandra-sink-test.sh b/ci/scripts/e2e-cassandra-sink-test.sh index 0e1c9a98d49e8..b222e4a944967 100755 --- a/ci/scripts/e2e-cassandra-sink-test.sh +++ b/ci/scripts/e2e-cassandra-sink-test.sh @@ -42,8 +42,9 @@ tar xfvz cassandra_latest.tar.gz export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version) export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}" # remove bundled packages, and use installed packages, because Python 3.12 has removed asyncore, but I failed to install libev support for bundled Python driver. -rm ${CASSANDRA_DIR}/lib/six-1.12.0-py2.py3-none-any.zip -rm ${CASSANDRA_DIR}/lib/cassandra-driver-internal-only-3.25.0.zip + +rm ${CASSANDRA_DIR}/lib/futures-2.1.6-py2.py3-none-any.zip +rm ${CASSANDRA_DIR}/lib/cassandra-driver-internal-only-3.29.0.zip apt-get install -y libev4 libev-dev pip3 install --break-system-packages cassandra-driver export CQLSH_HOST=cassandra-server diff --git a/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt b/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt index 0dc937303a852..b0e433c819f83 100644 --- a/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt +++ b/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt @@ -1,6 +1,3 @@ -statement ok -set sink_decouple = false; - statement ok set streaming_parallelism=4; @@ -37,7 +34,6 @@ CREATE SINK sink1 AS select * from mv1 WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - commit_checkpoint_interval = 1 ); statement ok @@ -54,7 +50,6 @@ CREATE SINK sink2 AS select * from mv1 WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - commit_checkpoint_interval = 1 ); sleep 20s diff --git a/e2e_test/iceberg/test_case/cdc/load.slt b/e2e_test/iceberg/test_case/cdc/load.slt index df0c319990374..6e6850725f98a 100644 --- a/e2e_test/iceberg/test_case/cdc/load.slt +++ b/e2e_test/iceberg/test_case/cdc/load.slt @@ -1,4 +1,6 @@ # CDC source basic test +statement ok +set sink_decouple = false; statement ok create source mysql_mydb with ( diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt index 66eb11da1f438..d57c3096cc1ee 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt @@ -39,7 +39,6 @@ CREATE SINK s6 AS select * from mv6 WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - commit_checkpoint_interval = 1 ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_upsert_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_upsert_table.slt index de96205a2debf..73d953bc2937a 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_upsert_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_upsert_table.slt @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', primary_key = 'v1', - commit_checkpoint_interval = 1 ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table.slt index 72f0bce46d183..3a27df42903ee 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_partition_append_only_table.slt @@ -36,7 +36,6 @@ CREATE SINK s6 AS select * from mv6 WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - commit_checkpoint_interval = 1 ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_sink_partition_upsert_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_partition_upsert_table.slt index 2b213a77175bd..39f170a834382 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_partition_upsert_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_partition_upsert_table.slt @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', primary_key = 'v1', - commit_checkpoint_interval = 1 ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table.slt index 46670ac362599..f0cf9f5fa3133 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_append_only_table.slt @@ -36,7 +36,6 @@ CREATE SINK s6 AS select * from mv6 WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - commit_checkpoint_interval = 1 ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_upsert_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_upsert_table.slt index 5637ce34c940f..f43e2788a020a 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_range_partition_upsert_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_range_partition_upsert_table.slt @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', primary_key = 'v1', - commit_checkpoint_interval = 1 ); statement ok diff --git a/e2e_test/sink/clickhouse_sink.slt b/e2e_test/sink/clickhouse_sink.slt index e037618bb460e..e5bac0d8d521d 100644 --- a/e2e_test/sink/clickhouse_sink.slt +++ b/e2e_test/sink/clickhouse_sink.slt @@ -17,7 +17,6 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4, clickhouse.password = '', clickhouse.database = 'default', clickhouse.table='demo_test', - commit_checkpoint_interval = 1, ); statement ok diff --git a/e2e_test/sink/create_sink_as.slt b/e2e_test/sink/create_sink_as.slt index 5c66c5623553e..dc6d0f61419c6 100644 --- a/e2e_test/sink/create_sink_as.slt +++ b/e2e_test/sink/create_sink_as.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE t4 (v1 int primary key, v2 int); diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt index 74dca623a9d0a..cb9f9e7817212 100644 --- a/e2e_test/sink/deltalake_rust_sink.slt +++ b/e2e_test/sink/deltalake_rust_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean, v10 decimal, v11 decimal[]); diff --git a/e2e_test/sink/doris_sink.slt b/e2e_test/sink/doris_sink.slt index 3242206badaea..3e6a4aca9d9f6 100644 --- a/e2e_test/sink/doris_sink.slt +++ b/e2e_test/sink/doris_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb); diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index e3917908f651b..b08abd8a4918c 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -31,7 +31,6 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH catalog.type = 'storage', database.name='demo_db', table.name='e2e_demo_table', - commit_checkpoint_interval = 1 ); statement ok diff --git a/e2e_test/sink/mongodb_sink.slt b/e2e_test/sink/mongodb_sink.slt index 2122993e3003a..ddc5a91a20c3f 100644 --- a/e2e_test/sink/mongodb_sink.slt +++ b/e2e_test/sink/mongodb_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok create table t1( a smallint, diff --git a/e2e_test/sink/redis_cluster_sink.slt b/e2e_test/sink/redis_cluster_sink.slt index 03d197485777a..3effd7795d039 100644 --- a/e2e_test/sink/redis_cluster_sink.slt +++ b/e2e_test/sink/redis_cluster_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE t6 (v1 int primary key, v2 int); diff --git a/e2e_test/sink/redis_sink.slt b/e2e_test/sink/redis_sink.slt index 7475a80ae696e..8828c22b80d27 100644 --- a/e2e_test/sink/redis_sink.slt +++ b/e2e_test/sink/redis_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean); diff --git a/e2e_test/sink/remote/types.slt b/e2e_test/sink/remote/types.slt index f2421eabec906..e511d5e6a6ee7 100644 --- a/e2e_test/sink/remote/types.slt +++ b/e2e_test/sink/remote/types.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float, v5 double, v6 decimal, v7 varchar, v8 timestamp, v9 boolean); diff --git a/e2e_test/sink/sqlserver_sink.slt b/e2e_test/sink/sqlserver_sink.slt index 156b8b865ffc8..08bbd3364ed9a 100644 --- a/e2e_test/sink/sqlserver_sink.slt +++ b/e2e_test/sink/sqlserver_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok create table t_many_data_type_rw ( k1 int, k2 int, diff --git a/e2e_test/sink/starrocks_sink.slt b/e2e_test/sink/starrocks_sink.slt index dedb01755cbbe..0aceac592618a 100644 --- a/e2e_test/sink/starrocks_sink.slt +++ b/e2e_test/sink/starrocks_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb, v11 decimal); diff --git a/integration_tests/big-query-sink/create_sink.sql b/integration_tests/big-query-sink/create_sink.sql index a41fe0243120d..01fb5e340d545 100644 --- a/integration_tests/big-query-sink/create_sink.sql +++ b/integration_tests/big-query-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + -- create sink with local file CREATE SINK bhv_big_query_sink FROM diff --git a/integration_tests/cassandra-and-scylladb-sink/create_sink.sql b/integration_tests/cassandra-and-scylladb-sink/create_sink.sql index a0a305aebd0e0..fdda994d01427 100644 --- a/integration_tests/cassandra-and-scylladb-sink/create_sink.sql +++ b/integration_tests/cassandra-and-scylladb-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK bhv_cassandra_sink FROM bhv_mv WITH ( diff --git a/integration_tests/clickhouse-sink/create_sink.sql b/integration_tests/clickhouse-sink/create_sink.sql index 5f730ed6ff910..b913a246b286e 100644 --- a/integration_tests/clickhouse-sink/create_sink.sql +++ b/integration_tests/clickhouse-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK bhv_clickhouse_sink FROM bhv_mv WITH ( diff --git a/integration_tests/deltalake-sink/create_sink.sql b/integration_tests/deltalake-sink/create_sink.sql index f42b09d726e56..17c1c44aea255 100644 --- a/integration_tests/deltalake-sink/create_sink.sql +++ b/integration_tests/deltalake-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + create sink delta_lake_sink from source with ( connector = 'deltalake', diff --git a/integration_tests/doris-sink/create_sink.sql b/integration_tests/doris-sink/create_sink.sql index d4702219fed09..d6b28148c083d 100644 --- a/integration_tests/doris-sink/create_sink.sql +++ b/integration_tests/doris-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + create secret doris_secret with (backend = 'meta') as '123456'; CREATE SINK bhv_doris_sink diff --git a/integration_tests/dynamodb/create_sink.sql b/integration_tests/dynamodb/create_sink.sql index 6de71404a9da1..43cb2be6d1447 100644 --- a/integration_tests/dynamodb/create_sink.sql +++ b/integration_tests/dynamodb/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK dyn_sink FROM movies diff --git a/integration_tests/elasticsearch-sink/create_sink.sql b/integration_tests/elasticsearch-sink/create_sink.sql index 07046507d117d..f72f8f0e6ec3b 100644 --- a/integration_tests/elasticsearch-sink/create_sink.sql +++ b/integration_tests/elasticsearch-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK bhv_es7_sink FROM bhv_mv WITH ( diff --git a/integration_tests/kafka-cdc-sink/create_sink.sql b/integration_tests/kafka-cdc-sink/create_sink.sql index 349aac0ca9b0a..0c25553adebba 100644 --- a/integration_tests/kafka-cdc-sink/create_sink.sql +++ b/integration_tests/kafka-cdc-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK IF NOT EXISTS counts_sink FROM counts WITH ( diff --git a/integration_tests/mqtt/create_sink.sql b/integration_tests/mqtt/create_sink.sql index 69b6886943944..27b84aa354250 100644 --- a/integration_tests/mqtt/create_sink.sql +++ b/integration_tests/mqtt/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK mqtt_sink FROM personnel diff --git a/integration_tests/mysql-sink/create_sink.sql b/integration_tests/mysql-sink/create_sink.sql index 9776360df2914..f73b92e8ce259 100644 --- a/integration_tests/mysql-sink/create_sink.sql +++ b/integration_tests/mysql-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK target_count_mysql_sink FROM target_count WITH ( diff --git a/integration_tests/nats/create_sink.sql b/integration_tests/nats/create_sink.sql index beee01afcecfb..fda1ab1c77621 100644 --- a/integration_tests/nats/create_sink.sql +++ b/integration_tests/nats/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE TABLE personnel (id integer, name varchar); diff --git a/integration_tests/postgres-sink/create_sink.sql b/integration_tests/postgres-sink/create_sink.sql index 5041f1a36b741..ec76f16ac3037 100644 --- a/integration_tests/postgres-sink/create_sink.sql +++ b/integration_tests/postgres-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK target_count_postgres_sink FROM target_count WITH ( diff --git a/integration_tests/redis-sink/create_sink.sql b/integration_tests/redis-sink/create_sink.sql index 61ffb67326227..f88a68aca2110 100644 --- a/integration_tests/redis-sink/create_sink.sql +++ b/integration_tests/redis-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + CREATE SINK bhv_redis_sink_1 FROM bhv_mv WITH ( diff --git a/integration_tests/starrocks-sink/create_sink.sql b/integration_tests/starrocks-sink/create_sink.sql index 8d7ebf98dfb20..7cfe69ef21973 100644 --- a/integration_tests/starrocks-sink/create_sink.sql +++ b/integration_tests/starrocks-sink/create_sink.sql @@ -1,3 +1,5 @@ +set sink_decouple = false; + create secret starrocks_secret with (backend = 'meta') as '123456'; CREATE SINK bhv_starrocks_sink_primary diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 6b3e78f6a7b9d..07db42790f581 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -25,7 +25,6 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial}; use serde::ser::{SerializeSeq, SerializeStruct}; use serde::Serialize; @@ -38,12 +37,10 @@ use with_options::WithOptions; use super::decouple_checkpoint_log_sink::{ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, - DEFAULT_COMMIT_CHECKPOINT_INTERVAL, }; use super::writer::SinkWriter; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::error::ConnectorResult; -use crate::sink::catalog::desc::SinkDesc; use crate::sink::{ Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; @@ -497,29 +494,6 @@ impl Sink for ClickHouseSink { const SINK_NAME: &'static str = CLICKHOUSE_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - let commit_checkpoint_interval = - if let Some(interval) = desc.properties.get("commit_checkpoint_interval") { - interval - .parse::() - .unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL) - } else { - DEFAULT_COMMIT_CHECKPOINT_INTERVAL - }; - - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => { - if commit_checkpoint_interval > 1 { - return Err(SinkError::Config(anyhow!( - "config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" - ))); - } - Ok(false) - } - } - } - async fn validate(&self) -> Result<()> { // For upsert clickhouse sink, the primary key must be defined. if !self.is_append_only && self.pk_indices.is_empty() { diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 61a2f0f70fd05..59e3335eb36db 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -20,10 +20,12 @@ use async_trait::async_trait; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; -pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 10; +pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10; +pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1; +pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval"; pub fn default_commit_checkpoint_interval() -> u64 { - DEFAULT_COMMIT_CHECKPOINT_INTERVAL + DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE } /// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`). diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index 2dedffa3469e3..494adb2dd6fed 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -31,7 +31,6 @@ use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; @@ -41,11 +40,9 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; -use super::catalog::desc::SinkDesc; use super::coordinate::CoordinatedSinkWriter; use super::decouple_checkpoint_log_sink::{ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, - DEFAULT_COMMIT_CHECKPOINT_INTERVAL, }; use super::writer::SinkWriter; use super::{ @@ -285,29 +282,6 @@ impl Sink for DeltaLakeSink { const SINK_NAME: &'static str = DELTALAKE_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - let commit_checkpoint_interval = - if let Some(interval) = desc.properties.get("commit_checkpoint_interval") { - interval - .parse::() - .unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL) - } else { - DEFAULT_COMMIT_CHECKPOINT_INTERVAL - }; - - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => { - if commit_checkpoint_interval > 1 { - return Err(SinkError::Config(anyhow!( - "config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" - ))); - } - Ok(false) - } - } - } - async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { let inner = DeltaLakeSinkWriter::new( self.config.clone(), diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 1b135cd4d3b40..e295938a45a61 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -65,10 +65,8 @@ use with_options::WithOptions; use self::mock_catalog::MockCatalog; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; -use super::catalog::desc::SinkDesc; use super::decouple_checkpoint_log_sink::{ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, - DEFAULT_COMMIT_CHECKPOINT_INTERVAL, }; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -76,7 +74,7 @@ use super::{ use crate::error::ConnectorResult; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::writer::SinkWriter; -use crate::sink::{Result, SinkCommitCoordinator, SinkDecouple, SinkParam}; +use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; use crate::{ deserialize_bool_from_string, deserialize_optional_bool_from_string, deserialize_optional_string_seq_from_string, @@ -843,31 +841,6 @@ impl Sink for IcebergSink { const SINK_NAME: &'static str = ICEBERG_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - let commit_checkpoint_interval = - desc.properties - .get("commit_checkpoint_interval") - .map(|interval| { - interval - .parse::() - .unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL) - }); - - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => { - if let Some(commit_checkpoint_interval) = commit_checkpoint_interval - && commit_checkpoint_interval > 1 - { - return Err(SinkError::Config(anyhow!( - "config conflict: Iceberg config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" - ))); - } - Ok(false) - } - } - } - async fn validate(&self) -> Result<()> { if "glue".eq_ignore_ascii_case(self.config.catalog_type()) { risingwave_common::license::Feature::IcebergSinkWithGlue @@ -1399,7 +1372,7 @@ mod test { use risingwave_common::catalog::Field; - use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL; + use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE; use crate::sink::iceberg::IcebergConfig; use crate::source::DataType; @@ -1482,7 +1455,7 @@ mod test { .into_iter() .map(|(k, v)| (k.to_string(), v.to_string())) .collect(), - commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL, + commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE, create_table_if_not_exists: false, }; diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index dafbc856207a9..b453af53cca41 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -53,6 +53,13 @@ use ::deltalake::DeltaTableError; use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; +use clickhouse::CLICKHOUSE_SINK; +use decouple_checkpoint_log_sink::{ + COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE, + DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE, +}; +use deltalake::DELTALAKE_SINK; +use iceberg::ICEBERG_SINK; use opendal::Error as OpendalError; use risingwave_common::array::ArrayError; use risingwave_common::bitmap::Bitmap; @@ -66,6 +73,7 @@ use risingwave_pb::catalog::PbSinkType; use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::MetaClient; +use starrocks::STARROCKS_SINK; use thiserror::Error; use thiserror_ext::AsReport; pub use tracing; @@ -366,13 +374,54 @@ impl SinkWriterParam { } } +fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool { + matches!( + sink_name, + ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK + ) +} pub trait Sink: TryFrom { const SINK_NAME: &'static str; type LogSinker: LogSinker; type Coordinator: SinkCommitCoordinator; + fn set_default_commit_checkpoint_interval( + desc: &mut SinkDesc, + user_specified: &SinkDecouple, + ) -> Result<()> { + if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) { + match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) { + Some(commit_checkpoint_interval) => { + let commit_checkpoint_interval = commit_checkpoint_interval + .parse::() + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if matches!(user_specified, SinkDecouple::Disable) + && commit_checkpoint_interval > 1 + { + return Err(SinkError::Config(anyhow!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"))); + } + } + None => match user_specified { + SinkDecouple::Default | SinkDecouple::Enable => { + desc.properties.insert( + COMMIT_CHECKPOINT_INTERVAL.to_string(), + DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(), + ); + } + SinkDecouple::Disable => { + desc.properties.insert( + COMMIT_CHECKPOINT_INTERVAL.to_string(), + DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(), + ); + } + }, + } + } + Ok(()) + } + /// `user_specified` is the value of `sink_decouple` config. - fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(user_specified: &SinkDecouple) -> Result { match user_specified { SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 4988a00b95645..aa8ca0625d05f 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -59,7 +59,6 @@ use tracing::warn; use super::elasticsearch::{is_es_sink, StreamChunkConverter, ES_OPTION_DELIMITER}; use crate::error::ConnectorResult; -use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; @@ -115,7 +114,7 @@ def_remote_sink!(); pub trait RemoteSinkTrait: Send + Sync + 'static { const SINK_NAME: &'static str; - fn default_sink_decouple(_desc: &SinkDesc) -> bool { + fn default_sink_decouple() -> bool { true } } @@ -143,9 +142,9 @@ impl Sink for RemoteSink { const SINK_NAME: &'static str = R::SINK_NAME; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Default => Ok(R::default_sink_decouple(desc)), + SinkDecouple::Default => Ok(R::default_sink_decouple()), SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 21a4fc371b940..5c3e724721d18 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -24,7 +24,6 @@ use mysql_async::Opts; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::DataType; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; @@ -38,7 +37,7 @@ use tokio::task::JoinHandle; use url::form_urlencoded; use with_options::WithOptions; -use super::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL; +use super::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE; use super::doris_starrocks_connector::{ HeaderBuilder, InserterInner, StarrocksTxnRequestBuilder, STARROCKS_DELETE_SIGN, STARROCKS_SUCCESS_STATUS, @@ -48,7 +47,6 @@ use super::{ SinkCommitCoordinator, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; -use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; use crate::sink::{Result, Sink, SinkWriter, SinkWriterParam}; @@ -118,7 +116,7 @@ pub struct StarrocksConfig { } fn default_commit_checkpoint_interval() -> u64 { - DEFAULT_COMMIT_CHECKPOINT_INTERVAL + DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE } impl StarrocksConfig { @@ -264,29 +262,6 @@ impl Sink for StarrocksSink { const SINK_NAME: &'static str = STARROCKS_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { - let commit_checkpoint_interval = - if let Some(interval) = desc.properties.get("commit_checkpoint_interval") { - interval - .parse::() - .unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL) - } else { - DEFAULT_COMMIT_CHECKPOINT_INTERVAL - }; - - match user_specified { - SinkDecouple::Default | SinkDecouple::Enable => Ok(true), - SinkDecouple::Disable => { - if commit_checkpoint_interval > 1 { - return Err(SinkError::Config(anyhow!( - "config conflict: Starrocks config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled" - ))); - } - Ok(false) - } - } - } - async fn validate(&self) -> Result<()> { if !self.is_append_only && self.pk_indices.is_empty() { return Err(SinkError::Config(anyhow!( diff --git a/src/connector/src/sink/trivial.rs b/src/connector/src/sink/trivial.rs index 5c5e093c8e0f0..e19f99943338c 100644 --- a/src/connector/src/sink/trivial.rs +++ b/src/connector/src/sink/trivial.rs @@ -17,7 +17,6 @@ use std::marker::PhantomData; use async_trait::async_trait; use risingwave_common::session_config::sink_decouple::SinkDecouple; -use super::catalog::desc::SinkDesc; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkLogReader, SinkParam, @@ -67,7 +66,7 @@ impl Sink for TrivialSink { const SINK_NAME: &'static str = T::SINK_NAME; // Disable sink decoupling for all trivial sinks because it introduces overhead without any benefit - fn is_sink_decouple(_desc: &SinkDesc, _user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(_user_specified: &SinkDecouple) -> Result { Ok(false) } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index e8a8efff68801..1af3435eaea24 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -115,7 +115,7 @@ ClickHouseConfig: field_type: u64 comments: Commit every n(>0) checkpoints, default is 10. required: false - default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL + default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE - name: r#type field_type: String required: true @@ -143,7 +143,7 @@ DeltaLakeConfig: field_type: u64 comments: Commit every n(>0) checkpoints, default is 10. required: false - default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL + default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE - name: r#type field_type: String required: true @@ -339,7 +339,7 @@ IcebergConfig: field_type: u64 comments: Commit every n(>0) checkpoints, default is 10. required: false - default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL + default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE - name: create_table_if_not_exists field_type: bool required: false @@ -1021,7 +1021,7 @@ StarrocksConfig: also, in this time, the `sink_decouple` option should be enabled as well. Defaults to 10 if commit_checkpoint_interval <= 0 required: false - default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL + default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE - name: starrocks.partial_update field_type: String comments: Enable partial update diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 2717c454e6435..3e34475c8d4bb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -212,7 +212,7 @@ impl StreamSink { partition_info: Option, ) -> Result { let columns = derive_columns(input.schema(), out_names, &user_cols)?; - let (input, sink) = Self::derive_sink_desc( + let (input, mut sink) = Self::derive_sink_desc( input, user_distributed_by, name, @@ -241,8 +241,11 @@ impl StreamSink { if connector == TABLE_SINK && sink.target_table.is_none() { unsupported_sink(TABLE_SINK) } else { + SinkType::set_default_commit_checkpoint_interval( + &mut sink, + &input.ctx().session_ctx().config().sink_decouple(), + )?; SinkType::is_sink_decouple( - &sink, &input.ctx().session_ctx().config().sink_decouple(), ) }