From 7d20fa851b93ccc43c3dfabf4d808ddc145b1888 Mon Sep 17 00:00:00 2001
From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com>
Date: Mon, 26 Aug 2024 12:10:22 +0800
Subject: [PATCH] feat(sink): set default_sink_decouple = true for all sink
(#18182)
---
e2e_test/sink/cassandra_sink.slt | 3 ++
e2e_test/sink/clickhouse_sink.slt | 1 +
.../sink/elasticsearch/elasticsearch_sink.slt | 3 ++
src/connector/src/lib.rs | 21 --------
src/connector/src/sink/clickhouse.rs | 43 ++++++++--------
.../src/sink/decouple_checkpoint_log_sink.rs | 5 ++
src/connector/src/sink/deltalake.rs | 44 ++++++++--------
src/connector/src/sink/google_pubsub.rs | 9 ----
src/connector/src/sink/iceberg/mod.rs | 51 ++++++++++---------
src/connector/src/sink/kafka.rs | 9 ----
src/connector/src/sink/kinesis.rs | 9 ----
src/connector/src/sink/mod.rs | 4 +-
src/connector/src/sink/mongodb.rs | 10 ----
src/connector/src/sink/mqtt.rs | 9 ----
src/connector/src/sink/nats.rs | 9 ----
src/connector/src/sink/pulsar.rs | 9 ----
src/connector/src/sink/remote.rs | 6 +--
src/connector/src/sink/starrocks.rs | 42 ++++++++-------
src/connector/with_options_sink.yaml | 15 +++---
19 files changed, 121 insertions(+), 181 deletions(-)
diff --git a/e2e_test/sink/cassandra_sink.slt b/e2e_test/sink/cassandra_sink.slt
index fe5ca331b591a..ce3d9ea5d6b5a 100644
--- a/e2e_test/sink/cassandra_sink.slt
+++ b/e2e_test/sink/cassandra_sink.slt
@@ -10,6 +10,9 @@ CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float,
statement ok
CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int);
+statement ok
+set sink_decouple = false;
+
statement ok
CREATE SINK s6
FROM
diff --git a/e2e_test/sink/clickhouse_sink.slt b/e2e_test/sink/clickhouse_sink.slt
index e5bac0d8d521d..e037618bb460e 100644
--- a/e2e_test/sink/clickhouse_sink.slt
+++ b/e2e_test/sink/clickhouse_sink.slt
@@ -17,6 +17,7 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4,
clickhouse.password = '',
clickhouse.database = 'default',
clickhouse.table='demo_test',
+ commit_checkpoint_interval = 1,
);
statement ok
diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt
index 70b7e911c0f17..29e600ea450f2 100644
--- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt
+++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt
@@ -1,3 +1,6 @@
+statement ok
+set sink_decouple = false;
+
statement ok
CREATE TABLE t7 (
v1 int primary key,
diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs
index 7ec84b14088e9..6fa8f75cf2112 100644
--- a/src/connector/src/lib.rs
+++ b/src/connector/src/lib.rs
@@ -75,27 +75,6 @@ where
})
}
-pub(crate) fn deserialize_optional_u64_from_string<'de, D>(
- deserializer: D,
-) -> Result