Skip to content

Commit

Permalink
fix(sink): fix sink_decouple = false and commit_checkpoint_interval d…
Browse files Browse the repository at this point in the history
…efalut = 1; (#18348)
  • Loading branch information
xxhZs authored Sep 11, 2024
1 parent 23410f0 commit 5065bb4
Show file tree
Hide file tree
Showing 44 changed files with 132 additions and 139 deletions.
5 changes: 3 additions & 2 deletions ci/scripts/e2e-cassandra-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ tar xfvz cassandra_latest.tar.gz
export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version)
export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}"
# remove bundled packages, and use installed packages, because Python 3.12 has removed asyncore, but I failed to install libev support for bundled Python driver.
rm ${CASSANDRA_DIR}/lib/six-1.12.0-py2.py3-none-any.zip
rm ${CASSANDRA_DIR}/lib/cassandra-driver-internal-only-3.25.0.zip

rm ${CASSANDRA_DIR}/lib/futures-2.1.6-py2.py3-none-any.zip
rm ${CASSANDRA_DIR}/lib/cassandra-driver-internal-only-3.29.0.zip
apt-get install -y libev4 libev-dev
pip3 install --break-system-packages cassandra-driver
export CQLSH_HOST=cassandra-server
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

Expand Down Expand Up @@ -37,7 +34,6 @@ CREATE SINK sink1 AS select * from mv1 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand All @@ -54,7 +50,6 @@ CREATE SINK sink2 AS select * from mv1 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

sleep 20s
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# CDC source basic test
statement ok
set sink_decouple = false;

statement ok
create source mysql_mydb with (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ CREATE SINK s6 AS select * from mv6 WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3,
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
1 change: 0 additions & 1 deletion e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4,
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test',
commit_checkpoint_interval = 1,
);

statement ok
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/create_sink_as.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t4 (v1 int primary key, v2 int);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean, v10 decimal, v11 decimal[]);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/doris_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb);

Expand Down
1 change: 0 additions & 1 deletion e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
catalog.type = 'storage',
database.name='demo_db',
table.name='e2e_demo_table',
commit_checkpoint_interval = 1
);

statement ok
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/mongodb_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
create table t1(
a smallint,
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/redis_cluster_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 int);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/redis_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/remote/types.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float, v5 double, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/sqlserver_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
create table t_many_data_type_rw (
k1 int, k2 int,
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/starrocks_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamp, v9 boolean, v10 jsonb, v11 decimal);

Expand Down
2 changes: 2 additions & 0 deletions integration_tests/big-query-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

-- create sink with local file
CREATE SINK bhv_big_query_sink
FROM
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/cassandra-and-scylladb-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK bhv_cassandra_sink
FROM
bhv_mv WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/clickhouse-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK bhv_clickhouse_sink
FROM
bhv_mv WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/deltalake-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

create sink delta_lake_sink from source
with (
connector = 'deltalake',
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/doris-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

create secret doris_secret with (backend = 'meta') as '123456';

CREATE SINK bhv_doris_sink
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/dynamodb/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK dyn_sink
FROM
movies
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/elasticsearch-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK bhv_es7_sink
FROM
bhv_mv WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/kafka-cdc-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK IF NOT EXISTS counts_sink
FROM counts
WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/mqtt/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK mqtt_sink
FROM
personnel
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/mysql-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK target_count_mysql_sink
FROM
target_count WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/nats/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE TABLE
personnel (id integer, name varchar);

Expand Down
2 changes: 2 additions & 0 deletions integration_tests/postgres-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK target_count_postgres_sink
FROM
target_count WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

CREATE SINK bhv_redis_sink_1
FROM
bhv_mv WITH (
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/starrocks-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set sink_decouple = false;

create secret starrocks_secret with (backend = 'meta') as '123456';

CREATE SINK bhv_starrocks_sink_primary
Expand Down
26 changes: 0 additions & 26 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
use serde::ser::{SerializeSeq, SerializeStruct};
use serde::Serialize;
Expand All @@ -38,12 +37,10 @@ use with_options::WithOptions;

use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
};
use super::writer::SinkWriter;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::error::ConnectorResult;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::{
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
Expand Down Expand Up @@ -497,29 +494,6 @@ impl Sink for ClickHouseSink {

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
}
}
}

async fn validate(&self) -> Result<()> {
// For upsert clickhouse sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use async_trait::async_trait;
use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 10;
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10;
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";

pub fn default_commit_checkpoint_interval() -> u64 {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE
}

/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
Expand Down
26 changes: 0 additions & 26 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
Expand All @@ -41,11 +40,9 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;

use super::catalog::desc::SinkDesc;
use super::coordinate::CoordinatedSinkWriter;
use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
};
use super::writer::SinkWriter;
use super::{
Expand Down Expand Up @@ -285,29 +282,6 @@ impl Sink for DeltaLakeSink {

const SINK_NAME: &'static str = DELTALAKE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
let commit_checkpoint_interval =
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
interval
.parse::<u64>()
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
} else {
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
};

match user_specified {
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => {
if commit_checkpoint_interval > 1 {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
}
}
}

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let inner = DeltaLakeSinkWriter::new(
self.config.clone(),
Expand Down
Loading

0 comments on commit 5065bb4

Please sign in to comment.