Skip to content

Commit

Permalink
feat(sink): enable sink_decouple by default for kafka, pulsar, kinesi…
Browse files Browse the repository at this point in the history
…s, google pubsub, nats, mqtt clickhouse (#17221)
  • Loading branch information
xxhZs authored Jun 14, 2024
1 parent 41fe501 commit 5c24685
Show file tree
Hide file tree
Showing 13 changed files with 32 additions and 21 deletions.
3 changes: 3 additions & 0 deletions e2e_test/sink/clickhouse_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE t6 (v1 int primary key, v2 bigint, v3 varchar, v4 smallint, v5 decimal);

Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) )
include key as some_key
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
statement ok
set rw_implicit_flush=true;

statement ok
set sink_decouple = false;

statement ok
create table t_kafka (
id integer primary key,
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
create table from_kafka with (
connector = 'kafka',
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/mqtt_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE mqtt (
device_id varchar,
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/pulsar_sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set sink_decouple = false;

statement ok
CREATE TABLE pulsar (
id BIGINT,
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,10 @@ impl Sink for ClickHouseSink {

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ impl Sink for GooglePubSubSink {

const SINK_NAME: &'static str = PUBSUB_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,10 @@ impl Sink for KafkaSink {

const SINK_NAME: &'static str = KAFKA_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ impl Sink for KinesisSink {

const SINK_NAME: &'static str = KINESIS_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,10 @@ impl Sink for MqttSink {

const SINK_NAME: &'static str = MQTT_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,10 @@ impl Sink for NatsSink {

const SINK_NAME: &'static str = NATS_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,10 @@ impl Sink for PulsarSink {

const SINK_NAME: &'static str = PULSAR_SINK;

fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

Expand Down

0 comments on commit 5c24685

Please sign in to comment.