Skip to content

Commit

Permalink
feat(sink): support different default sink decouple for different sink (
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored and wenym1 committed Oct 10, 2023
1 parent 9c1192d commit 7636d4b
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 16 deletions.
2 changes: 1 addition & 1 deletion e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ create sink invalid_sink_type from t with (connector = 'blackhole', type = 'inva
statement error `force_append_only` must be true or false
create sink invalid_force_append_only from t with (connector = 'blackhole', force_append_only = 'invalid');

statement error db error: ERROR: QueryError: internal error: Sink error: config error: unsupported sink connector invalid
statement error db error: ERROR: QueryError: Sink error: config error: unsupported sink type invalid
create sink invalid_connector from t with (connector = 'invalid');

statement ok
Expand Down
7 changes: 4 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod over_window;
mod query_mode;
mod search_path;
pub mod sink_decouple;
mod transaction_isolation_level;
mod visibility_mode;

Expand All @@ -30,6 +31,7 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
use tracing::info;

use crate::error::{ErrorCode, RwError};
use crate::session_config::sink_decouple::SinkDecouple;
use crate::session_config::transaction_isolation_level::IsolationLevel;
pub use crate::session_config::visibility_mode::VisibilityMode;
use crate::util::epoch::Epoch;
Expand Down Expand Up @@ -335,7 +337,6 @@ type ServerVersionNum = ConfigI32<SERVER_VERSION_NUM, 90_500>;
type ForceSplitDistinctAgg = ConfigBool<FORCE_SPLIT_DISTINCT_AGG, false>;
type ClientMinMessages = ConfigString<CLIENT_MIN_MESSAGES>;
type ClientEncoding = ConfigString<CLIENT_ENCODING>;
type SinkDecouple = ConfigBool<SINK_DECOUPLE, false>;
type SynchronizeSeqscans = ConfigBool<SYNCHRONIZE_SEQSCANS, false>;
type StatementTimeout = ConfigI32<STATEMENT_TIMEOUT, 0>;
type LockTimeout = ConfigI32<LOCK_TIMEOUT, 0>;
Expand Down Expand Up @@ -1026,8 +1027,8 @@ impl ConfigMap {
&self.client_encoding
}

pub fn get_sink_decouple(&self) -> bool {
self.sink_decouple.0
pub fn get_sink_decouple(&self) -> SinkDecouple {
self.sink_decouple
}

pub fn get_standard_conforming_strings(&self) -> &str {
Expand Down
70 changes: 70 additions & 0 deletions src/common/src/session_config/sink_decouple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::error::ErrorCode::InvalidConfigValue;
use crate::error::{ErrorCode, RwError};
use crate::session_config::{ConfigEntry, CONFIG_KEYS, SINK_DECOUPLE};

#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)]
pub enum SinkDecouple {
// default sink couple config of specific sink
#[default]
Default,
// enable sink decouple
Enable,
// disable sink decouple
Disable,
}

impl<'a> TryFrom<&'a [&'a str]> for SinkDecouple {
type Error = RwError;

fn try_from(value: &'a [&'a str]) -> Result<Self, Self::Error> {
if value.len() != 1 {
return Err(ErrorCode::InternalError(format!(
"SET {} takes only one argument",
Self::entry_name()
))
.into());
}

let s = value[0];
match s.to_ascii_lowercase().as_str() {
"true" | "enable" => Ok(Self::Enable),
"false" | "disable" => Ok(Self::Disable),
"default" => Ok(Self::Default),
_ => Err(InvalidConfigValue {
config_entry: Self::entry_name().to_string(),
config_value: s.to_string(),
}
.into()),
}
}
}

impl ConfigEntry for SinkDecouple {
fn entry_name() -> &'static str {
CONFIG_KEYS[SINK_DECOUPLE]
}
}

impl ToString for SinkDecouple {
fn to_string(&self) -> String {
match self {
Self::Default => "default".to_string(),
Self::Enable => "enable".to_string(),
Self::Disable => "disable".to_string(),
}
}
}
5 changes: 5 additions & 0 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use strum_macros::{Display, EnumString};
use super::catalog::{SinkFormat, SinkFormatDesc};
use super::{Sink, SinkError, SinkParam};
use crate::common::KafkaCommon;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::{
DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogReader, LogStoreReadItem,
Expand Down Expand Up @@ -294,6 +295,10 @@ impl Sink for KafkaSink {

const SINK_NAME: &'static str = KAFKA_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
}

async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let formatter = SinkFormatterImpl::new(
&self.format_desc,
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use thiserror::Error;
pub use tracing;

use self::catalog::{SinkFormatDesc, SinkType};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::catalog::{SinkCatalog, SinkId};
use crate::sink::log_store::LogReader;
use crate::sink::writer::SinkWriter;
Expand Down Expand Up @@ -102,7 +103,8 @@ macro_rules! dispatch_sink {

#[macro_export]
macro_rules! match_sink_name_str {
({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {
({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
use $crate::sink::Sink;
match $name_str {
$(
<$sink_type>::SINK_NAME => {
Expand All @@ -114,7 +116,7 @@ macro_rules! match_sink_name_str {
)*
other => ($on_other_closure)(other),
}
};
}};
($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
$crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
}};
Expand Down Expand Up @@ -263,6 +265,10 @@ pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
type LogSinker: LogSinker;
type Coordinator: SinkCommitCoordinator;

fn default_sink_decouple(_desc: &SinkDesc) -> bool {
false
}

async fn validate(&self) -> Result<()>;
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
#[expect(clippy::unused_async)]
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub mod tests {
frontend.run_sql(sql).await.unwrap();

let sql = r#"CREATE SINK snk1 FROM mv1
WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table =
WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
'<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_string();
frontend.run_sql(sql).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mod tests {
async fn test_drop_sink_handler() {
let sql_create_table = "create table t (v1 smallint primary key);";
let sql_create_mv = "create materialized view mv as select v1 from t;";
let sql_create_sink = "create sink snk from mv with( connector = 'mysql')";
let sql_create_sink = "create sink snk from mv with( connector = 'kafka')";
let sql_drop_sink = "drop sink snk;";
let frontend = LocalFrontend::new(Default::default()).await;
frontend.run_sql(sql_create_table).await.unwrap();
Expand Down
50 changes: 44 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::assert_matches::assert_matches;
use std::io::{Error, ErrorKind};

use anyhow::anyhow;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
Expand All @@ -23,12 +24,14 @@ use risingwave_common::constants::log_store::{
EPOCH_COLUMN_INDEX, KV_LOG_STORE_PREDEFINED_COLUMNS, SEQ_ID_COLUMN_INDEX,
};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::match_sink_name_str;
use risingwave_connector::sink::catalog::desc::SinkDesc;
use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
use risingwave_connector::sink::{
SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
SINK_USER_FORCE_APPEND_ONLY_OPTION,
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use tracing::info;
Expand Down Expand Up @@ -94,6 +97,24 @@ impl StreamSink {
format_desc,
)?;

// check and ensure that the sink connector is specified and supported
match sink.properties.get(CONNECTOR_TYPE_KEY) {
Some(connector) => match_sink_name_str!(
connector.to_lowercase().as_str(),
SinkType,
Ok(()),
|other| Err(SinkError::Config(anyhow!(
"unsupported sink type {}",
other
)))
)?,
None => {
return Err(
SinkError::Config(anyhow!("connector not specified when create sink")).into(),
);
}
}

Ok(Self::new(input, sink))
}

Expand Down Expand Up @@ -388,10 +409,27 @@ impl StreamNode for StreamSink {
PbNodeBody::Sink(SinkNode {
sink_desc: Some(self.sink_desc.to_proto()),
table: Some(table.to_internal_table_prost()),
log_store_type: if self.base.ctx.session_ctx().config().get_sink_decouple() {
SinkLogStoreType::KvLogStore as i32
} else {
SinkLogStoreType::InMemoryLogStore as i32
log_store_type: match self.base.ctx.session_ctx().config().get_sink_decouple() {
SinkDecouple::Default => {
let enable_sink_decouple =
match_sink_name_str!(
self.sink_desc.properties.get(CONNECTOR_TYPE_KEY).expect(
"have checked connector is contained when create the `StreamSink`"
).to_lowercase().as_str(),
SinkTypeName,
SinkTypeName::default_sink_decouple(&self.sink_desc),
|_unsupported| unreachable!(
"have checked connector is supported when create the `StreamSink`"
)
);
if enable_sink_decouple {
SinkLogStoreType::KvLogStore as i32
} else {
SinkLogStoreType::InMemoryLogStore as i32
}
}
SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32,
SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32,
},
})
}
Expand Down
2 changes: 0 additions & 2 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ impl ExecutorBuilder for SinkExecutorBuilder {
SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY))
})?;

use risingwave_connector::sink::Sink;

match_sink_name_str!(
sink_type.to_lowercase().as_str(),
SinkType,
Expand Down

0 comments on commit 7636d4b

Please sign in to comment.