diff --git a/e2e_test/sink/clickhouse_sink.slt b/e2e_test/sink/clickhouse_sink.slt index 2adc70dcf409e..e5bac0d8d521d 100644 --- a/e2e_test/sink/clickhouse_sink.slt +++ b/e2e_test/sink/clickhouse_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint, v5 decimal); diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt index 1cf27b811d9be..bd684bebdf785 100644 --- a/e2e_test/sink/kafka/avro.slt +++ b/e2e_test/sink/kafka/avro.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) ) include key as some_key diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index c3f738fbce2ae..f5e2e0333bc35 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -1,6 +1,9 @@ statement ok set rw_implicit_flush=true; +statement ok +set sink_decouple = false; + statement ok create table t_kafka ( id integer primary key, diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index c3f6f0d3ad8e2..61a91435567da 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok create table from_kafka with ( connector = 'kafka', diff --git a/e2e_test/sink/mqtt_sink.slt b/e2e_test/sink/mqtt_sink.slt index d19addb024c97..2602d2ddc6198 100644 --- a/e2e_test/sink/mqtt_sink.slt +++ b/e2e_test/sink/mqtt_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE mqtt ( device_id varchar, diff --git a/e2e_test/sink/pulsar_sink.slt b/e2e_test/sink/pulsar_sink.slt index 2284f9c1877e3..f8d6aa1aaff4c 100644 --- a/e2e_test/sink/pulsar_sink.slt +++ b/e2e_test/sink/pulsar_sink.slt @@ -1,3 +1,6 @@ +statement ok +set sink_decouple = false; + statement ok CREATE TABLE pulsar ( id BIGINT, diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 26a779dc94ae1..c506f00e6d2ca 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -379,11 +379,10 @@ impl Sink for ClickHouseSink { const SINK_NAME: &'static str = CLICKHOUSE_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Default => Ok(desc.sink_type.is_append_only()), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), } } diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index 9f250b6692cd2..ad039ad020704 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -96,11 +96,10 @@ impl Sink for GooglePubSubSink { const SINK_NAME: &'static str = PUBSUB_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Default => Ok(desc.sink_type.is_append_only()), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), } } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 1abfed3a39a45..617f427ae71f1 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -320,11 +320,10 @@ impl Sink for KafkaSink { const SINK_NAME: &'static str = KAFKA_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Default => Ok(desc.sink_type.is_append_only()), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), } } diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 04cd3390f1c9d..771d3c8a6f91d 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -80,11 +80,10 @@ impl Sink for KinesisSink { const SINK_NAME: &'static str = KINESIS_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Default => Ok(desc.sink_type.is_append_only()), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), } } diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index 7bb0872b27f35..d9dfdbe03b21b 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -165,11 +165,10 @@ impl Sink for MqttSink { const SINK_NAME: &'static str = MQTT_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Default => Ok(desc.sink_type.is_append_only()), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), } } diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 109e52473b92b..162aca3c4d2e8 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -99,11 +99,10 @@ impl Sink for NatsSink { const SINK_NAME: &'static str = NATS_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Default => Ok(desc.sink_type.is_append_only()), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), } } diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 8b6f963c6a3a7..3f016ad94946d 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -170,11 +170,10 @@ impl Sink for PulsarSink { const SINK_NAME: &'static str = PULSAR_SINK; - fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { match user_specified { - SinkDecouple::Default => Ok(desc.sink_type.is_append_only()), + SinkDecouple::Default | SinkDecouple::Enable => Ok(true), SinkDecouple::Disable => Ok(false), - SinkDecouple::Enable => Ok(true), } }