diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs
index 367cf4ce35ac4..cab147d0000bb 100644
--- a/src/common/src/session_config/mod.rs
+++ b/src/common/src/session_config/mod.rs
@@ -15,6 +15,7 @@
 mod over_window;
 mod query_mode;
 mod search_path;
+pub mod sink_decouple;
 mod transaction_isolation_level;
 mod visibility_mode;
 
@@ -30,6 +31,7 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
 use tracing::info;
 
 use crate::error::{ErrorCode, RwError};
+use crate::session_config::sink_decouple::SinkDecouple;
 use crate::session_config::transaction_isolation_level::IsolationLevel;
 pub use crate::session_config::visibility_mode::VisibilityMode;
 use crate::util::epoch::Epoch;
@@ -333,7 +335,6 @@ type ServerVersionNum = ConfigI32<SERVER_VERSION_NUM, 90_500>;
 type ForceSplitDistinctAgg = ConfigBool<FORCE_SPLIT_DISTINCT_AGG, false>;
 type ClientMinMessages = ConfigString<CLIENT_MIN_MESSAGES>;
 type ClientEncoding = ConfigString<CLIENT_ENCODING>;
-type SinkDecouple = ConfigBool<SINK_DECOUPLE, false>;
 type SynchronizeSeqscans = ConfigBool<SYNCHRONIZE_SEQSCANS, false>;
 type StatementTimeout = ConfigI32<STATEMENT_TIMEOUT, 0>;
 type LockTimeout = ConfigI32<LOCK_TIMEOUT, 0>;
@@ -1012,8 +1013,8 @@ impl ConfigMap {
         &self.client_encoding
     }
 
-    pub fn get_sink_decouple(&self) -> bool {
-        self.sink_decouple.0
+    pub fn get_sink_decouple(&self) -> SinkDecouple {
+        self.sink_decouple
     }
 
     pub fn get_standard_conforming_strings(&self) -> &str {
diff --git a/src/common/src/session_config/sink_decouple.rs b/src/common/src/session_config/sink_decouple.rs
new file mode 100644
index 0000000000000..7a3c1925ebd8e
--- /dev/null
+++ b/src/common/src/session_config/sink_decouple.rs
@@ -0,0 +1,70 @@
+// Copyright 2023 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use crate::error::ErrorCode::InvalidConfigValue;
+use crate::error::{ErrorCode, RwError};
+use crate::session_config::{ConfigEntry, CONFIG_KEYS, SINK_DECOUPLE};
+
+#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)]
+pub enum SinkDecouple {
+    // default sink couple config of specific sink
+    #[default]
+    Default,
+    // enable sink decouple
+    Enable,
+    // disable sink decouple
+    Disable,
+}
+
+impl<'a> TryFrom<&'a [&'a str]> for SinkDecouple {
+    type Error = RwError;
+
+    fn try_from(value: &'a [&'a str]) -> Result<Self, Self::Error> {
+        if value.len() != 1 {
+            return Err(ErrorCode::InternalError(format!(
+                "SET {} takes only one argument",
+                Self::entry_name()
+            ))
+            .into());
+        }
+
+        let s = value[0];
+        match s.to_ascii_lowercase().as_str() {
+            "true" | "enable" => Ok(Self::Enable),
+            "false" | "disable" => Ok(Self::Disable),
+            "default" => Ok(Self::Default),
+            _ => Err(InvalidConfigValue {
+                config_entry: Self::entry_name().to_string(),
+                config_value: s.to_string(),
+            }
+            .into()),
+        }
+    }
+}
+
+impl ConfigEntry for SinkDecouple {
+    fn entry_name() -> &'static str {
+        CONFIG_KEYS[SINK_DECOUPLE]
+    }
+}
+
+impl ToString for SinkDecouple {
+    fn to_string(&self) -> String {
+        match self {
+            Self::Default => "default".to_string(),
+            Self::Enable => "enable".to_string(),
+            Self::Disable => "disable".to_string(),
+        }
+    }
+}
diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs
index c67fdb8c1983e..2080f512d1cac 100644
--- a/src/connector/src/sink/mod.rs
+++ b/src/connector/src/sink/mod.rs
@@ -100,7 +100,8 @@ macro_rules! dispatch_sink {
 
 #[macro_export]
 macro_rules! match_sink_name_str {
-    ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {
+    ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
+        use $crate::sink::Sink;
         match $name_str {
             $(
                 <$sink_type>::SINK_NAME => {
@@ -112,7 +113,7 @@ macro_rules! match_sink_name_str {
             )*
             other => ($on_other_closure)(other),
         }
-    };
+    }};
     ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
         $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
     }};
@@ -225,6 +226,10 @@ pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
     type LogSinker: LogSinker;
     type Coordinator: SinkCommitCoordinator;
 
+    fn default_sink_decouple() -> bool {
+        false
+    }
+
     async fn validate(&self) -> Result<()>;
     async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
     #[expect(clippy::unused_async)]
diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs
index 4b9250c4b23f5..749ed60b4ff33 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sink.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs
@@ -15,6 +15,7 @@
 use std::assert_matches::assert_matches;
 use std::io::{Error, ErrorKind};
 
+use anyhow::anyhow;
 use fixedbitset::FixedBitSet;
 use itertools::Itertools;
 use pretty_xmlish::{Pretty, XmlNode};
@@ -23,12 +24,14 @@ use risingwave_common::constants::log_store::{
     EPOCH_COLUMN_INDEX, KV_LOG_STORE_PREDEFINED_COLUMNS, SEQ_ID_COLUMN_INDEX,
 };
 use risingwave_common::error::{ErrorCode, Result};
+use risingwave_common::session_config::sink_decouple::SinkDecouple;
 use risingwave_common::util::sort_util::OrderType;
+use risingwave_connector::match_sink_name_str;
 use risingwave_connector::sink::catalog::desc::SinkDesc;
 use risingwave_connector::sink::catalog::{SinkId, SinkType};
 use risingwave_connector::sink::{
-    SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
-    SINK_USER_FORCE_APPEND_ONLY_OPTION,
+    SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
+    SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
 };
 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
 use tracing::info;
@@ -92,6 +95,24 @@ impl StreamSink {
             properties,
         )?;
 
+        // check and ensure that the sink connector is specified and supported
+        match sink.properties.get(CONNECTOR_TYPE_KEY) {
+            Some(connector) => match_sink_name_str!(
+                connector.to_lowercase().as_str(),
+                SinkType,
+                Ok(()),
+                |other| Err(SinkError::Config(anyhow!(
+                    "unsupported sink type {}",
+                    other
+                )))
+            )?,
+            None => {
+                return Err(
+                    SinkError::Config(anyhow!("connector not specified when create sink")).into(),
+                );
+            }
+        }
+
         Ok(Self::new(input, sink))
     }
 
@@ -367,10 +388,27 @@ impl StreamNode for StreamSink {
         PbNodeBody::Sink(SinkNode {
             sink_desc: Some(self.sink_desc.to_proto()),
             table: Some(table.to_internal_table_prost()),
-            log_store_type: if self.base.ctx.session_ctx().config().get_sink_decouple() {
-                SinkLogStoreType::KvLogStore as i32
-            } else {
-                SinkLogStoreType::InMemoryLogStore as i32
+            log_store_type: match self.base.ctx.session_ctx().config().get_sink_decouple() {
+                SinkDecouple::Default => {
+                    let enable_sink_decouple =
+                        match_sink_name_str!(
+                        self.sink_desc.properties.get(CONNECTOR_TYPE_KEY).expect(
+                            "have checked connector is contained when create the `StreamSink`"
+                        ).to_lowercase().as_str(),
+                        SinkTypeName,
+                        SinkTypeName::default_sink_decouple(),
+                        |_unsupported| unreachable!(
+                            "have checked connector is supported when create the `StreamSink`"
+                        )
+                    );
+                    if enable_sink_decouple {
+                        SinkLogStoreType::KvLogStore as i32
+                    } else {
+                        SinkLogStoreType::InMemoryLogStore as i32
+                    }
+                }
+                SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32,
+                SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32,
             },
         })
     }
diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs
index 295a36376b376..8eb2710b34372 100644
--- a/src/stream/src/from_proto/sink.rs
+++ b/src/stream/src/from_proto/sink.rs
@@ -66,8 +66,6 @@ impl ExecutorBuilder for SinkExecutorBuilder {
                 SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY))
             })?;
 
-            use risingwave_connector::sink::Sink;
-
             match_sink_name_str!(
                 sink_type.to_lowercase().as_str(),
                 SinkType,