diff --git a/e2e_test/sink/append_only_sink.slt b/e2e_test/sink/append_only_sink.slt index 9a6aeabb88ae3..405ca132ae0a9 100644 --- a/e2e_test/sink/append_only_sink.slt +++ b/e2e_test/sink/append_only_sink.slt @@ -22,7 +22,7 @@ create sink invalid_sink_type from t with (connector = 'blackhole', type = 'inva statement error `force_append_only` must be true or false create sink invalid_force_append_only from t with (connector = 'blackhole', force_append_only = 'invalid'); -statement error db error: ERROR: QueryError: internal error: Sink error: config error: unsupported sink connector invalid +statement error db error: ERROR: QueryError: Sink error: config error: unsupported sink type invalid create sink invalid_connector from t with (connector = 'invalid'); statement ok diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 90ac1a31bc6f7..a4f3729f3367b 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -15,6 +15,7 @@ mod over_window; mod query_mode; mod search_path; +pub mod sink_decouple; mod transaction_isolation_level; mod visibility_mode; @@ -30,6 +31,7 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; use tracing::info; use crate::error::{ErrorCode, RwError}; +use crate::session_config::sink_decouple::SinkDecouple; use crate::session_config::transaction_isolation_level::IsolationLevel; pub use crate::session_config::visibility_mode::VisibilityMode; use crate::util::epoch::Epoch; @@ -335,7 +337,6 @@ type ServerVersionNum = ConfigI32; type ForceSplitDistinctAgg = ConfigBool; type ClientMinMessages = ConfigString; type ClientEncoding = ConfigString; -type SinkDecouple = ConfigBool; type SynchronizeSeqscans = ConfigBool; type StatementTimeout = ConfigI32; type LockTimeout = ConfigI32; @@ -1026,8 +1027,8 @@ impl ConfigMap { &self.client_encoding } - pub fn get_sink_decouple(&self) -> bool { - self.sink_decouple.0 + pub fn get_sink_decouple(&self) -> SinkDecouple { + self.sink_decouple } pub fn get_standard_conforming_strings(&self) -> &str { diff --git a/src/common/src/session_config/sink_decouple.rs b/src/common/src/session_config/sink_decouple.rs new file mode 100644 index 0000000000000..7a3c1925ebd8e --- /dev/null +++ b/src/common/src/session_config/sink_decouple.rs @@ -0,0 +1,70 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::ErrorCode::InvalidConfigValue; +use crate::error::{ErrorCode, RwError}; +use crate::session_config::{ConfigEntry, CONFIG_KEYS, SINK_DECOUPLE}; + +#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] +pub enum SinkDecouple { + // default sink couple config of specific sink + #[default] + Default, + // enable sink decouple + Enable, + // disable sink decouple + Disable, +} + +impl<'a> TryFrom<&'a [&'a str]> for SinkDecouple { + type Error = RwError; + + fn try_from(value: &'a [&'a str]) -> Result { + if value.len() != 1 { + return Err(ErrorCode::InternalError(format!( + "SET {} takes only one argument", + Self::entry_name() + )) + .into()); + } + + let s = value[0]; + match s.to_ascii_lowercase().as_str() { + "true" | "enable" => Ok(Self::Enable), + "false" | "disable" => Ok(Self::Disable), + "default" => Ok(Self::Default), + _ => Err(InvalidConfigValue { + config_entry: Self::entry_name().to_string(), + config_value: s.to_string(), + } + .into()), + } + } +} + +impl ConfigEntry for SinkDecouple { + fn entry_name() -> &'static str { + CONFIG_KEYS[SINK_DECOUPLE] + } +} + +impl ToString for SinkDecouple { + fn to_string(&self) -> String { + match self { + Self::Default => "default".to_string(), + Self::Enable => "enable".to_string(), + Self::Disable => "disable".to_string(), + } + } +} diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 5bab8b5afc628..7e3143e312d42 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -35,6 +35,7 @@ use strum_macros::{Display, EnumString}; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; use crate::common::KafkaCommon; +use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::{ DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem, @@ -294,6 +295,10 @@ impl Sink for KafkaSink { const SINK_NAME: &'static str = KAFKA_SINK; + fn default_sink_decouple(desc: &SinkDesc) -> bool { + desc.sink_type.is_append_only() + } + async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result { let formatter = SinkFormatterImpl::new( &self.format_desc, diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 239ffdabc8ee2..7825c0919d5d1 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -53,6 +53,7 @@ use thiserror::Error; pub use tracing; use self::catalog::{SinkFormatDesc, SinkType}; +use crate::sink::catalog::desc::SinkDesc; use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::log_store::LogReader; use crate::sink::writer::SinkWriter; @@ -102,7 +103,8 @@ macro_rules! dispatch_sink { #[macro_export] macro_rules! match_sink_name_str { - ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => { + ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{ + use $crate::sink::Sink; match $name_str { $( <$sink_type>::SINK_NAME => { @@ -114,7 +116,7 @@ macro_rules! match_sink_name_str { )* other => ($on_other_closure)(other), } - }; + }}; ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{ $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}} }}; @@ -263,6 +265,10 @@ pub trait Sink: TryFrom { type LogSinker: LogSinker; type Coordinator: SinkCommitCoordinator; + fn default_sink_decouple(_desc: &SinkDesc) -> bool { + false + } + async fn validate(&self) -> Result<()>; async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result; #[expect(clippy::unused_async)] diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 74a3cfdcac572..80335d278c8d4 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -344,7 +344,7 @@ pub mod tests { frontend.run_sql(sql).await.unwrap(); let sql = r#"CREATE SINK snk1 FROM mv1 - WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table = + WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table = '', mysql.database = '', mysql.user = '', mysql.password = '', type = 'append-only', force_append_only = 'true');"#.to_string(); frontend.run_sql(sql).await.unwrap(); diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 698791dceed6b..6b1a864e03964 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -72,7 +72,7 @@ mod tests { async fn test_drop_sink_handler() { let sql_create_table = "create table t (v1 smallint primary key);"; let sql_create_mv = "create materialized view mv as select v1 from t;"; - let sql_create_sink = "create sink snk from mv with( connector = 'mysql')"; + let sql_create_sink = "create sink snk from mv with( connector = 'kafka')"; let sql_drop_sink = "drop sink snk;"; let frontend = LocalFrontend::new(Default::default()).await; frontend.run_sql(sql_create_table).await.unwrap(); diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index d1cd407a7e7dd..563a067adaac5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -15,6 +15,7 @@ use std::assert_matches::assert_matches; use std::io::{Error, ErrorKind}; +use anyhow::anyhow; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; @@ -23,12 +24,14 @@ use risingwave_common::constants::log_store::{ EPOCH_COLUMN_INDEX, KV_LOG_STORE_PREDEFINED_COLUMNS, SEQ_ID_COLUMN_INDEX, }; use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::util::sort_util::OrderType; +use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; use risingwave_connector::sink::{ - SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, - SINK_USER_FORCE_APPEND_ONLY_OPTION, + SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, + SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use tracing::info; @@ -94,6 +97,24 @@ impl StreamSink { format_desc, )?; + // check and ensure that the sink connector is specified and supported + match sink.properties.get(CONNECTOR_TYPE_KEY) { + Some(connector) => match_sink_name_str!( + connector.to_lowercase().as_str(), + SinkType, + Ok(()), + |other| Err(SinkError::Config(anyhow!( + "unsupported sink type {}", + other + ))) + )?, + None => { + return Err( + SinkError::Config(anyhow!("connector not specified when create sink")).into(), + ); + } + } + Ok(Self::new(input, sink)) } @@ -388,10 +409,27 @@ impl StreamNode for StreamSink { PbNodeBody::Sink(SinkNode { sink_desc: Some(self.sink_desc.to_proto()), table: Some(table.to_internal_table_prost()), - log_store_type: if self.base.ctx.session_ctx().config().get_sink_decouple() { - SinkLogStoreType::KvLogStore as i32 - } else { - SinkLogStoreType::InMemoryLogStore as i32 + log_store_type: match self.base.ctx.session_ctx().config().get_sink_decouple() { + SinkDecouple::Default => { + let enable_sink_decouple = + match_sink_name_str!( + self.sink_desc.properties.get(CONNECTOR_TYPE_KEY).expect( + "have checked connector is contained when create the `StreamSink`" + ).to_lowercase().as_str(), + SinkTypeName, + SinkTypeName::default_sink_decouple(&self.sink_desc), + |_unsupported| unreachable!( + "have checked connector is supported when create the `StreamSink`" + ) + ); + if enable_sink_decouple { + SinkLogStoreType::KvLogStore as i32 + } else { + SinkLogStoreType::InMemoryLogStore as i32 + } + } + SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32, + SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32, }, }) } diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 298a0642710cf..47f21c0a223cf 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -66,8 +66,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)) })?; - use risingwave_connector::sink::Sink; - match_sink_name_str!( sink_type.to_lowercase().as_str(), SinkType,