Skip to content

Commit

Permalink
feat(sink): support different default sink decouple for different sink
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 26, 2023
1 parent 09a1dcb commit 2579800
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 13 deletions.
7 changes: 4 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod over_window;
mod query_mode;
mod search_path;
pub mod sink_decouple;
mod transaction_isolation_level;
mod visibility_mode;

Expand All @@ -30,6 +31,7 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
use tracing::info;

use crate::error::{ErrorCode, RwError};
use crate::session_config::sink_decouple::SinkDecouple;
use crate::session_config::transaction_isolation_level::IsolationLevel;
pub use crate::session_config::visibility_mode::VisibilityMode;
use crate::util::epoch::Epoch;
Expand Down Expand Up @@ -333,7 +335,6 @@ type ServerVersionNum = ConfigI32<SERVER_VERSION_NUM, 90_500>;
type ForceSplitDistinctAgg = ConfigBool<FORCE_SPLIT_DISTINCT_AGG, false>;
type ClientMinMessages = ConfigString<CLIENT_MIN_MESSAGES>;
type ClientEncoding = ConfigString<CLIENT_ENCODING>;
type SinkDecouple = ConfigBool<SINK_DECOUPLE, false>;
type SynchronizeSeqscans = ConfigBool<SYNCHRONIZE_SEQSCANS, false>;
type StatementTimeout = ConfigI32<STATEMENT_TIMEOUT, 0>;
type LockTimeout = ConfigI32<LOCK_TIMEOUT, 0>;
Expand Down Expand Up @@ -1012,8 +1013,8 @@ impl ConfigMap {
&self.client_encoding
}

pub fn get_sink_decouple(&self) -> bool {
self.sink_decouple.0
pub fn get_sink_decouple(&self) -> SinkDecouple {
self.sink_decouple
}

pub fn get_standard_conforming_strings(&self) -> &str {
Expand Down
70 changes: 70 additions & 0 deletions src/common/src/session_config/sink_decouple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::error::ErrorCode::InvalidConfigValue;
use crate::error::{ErrorCode, RwError};
use crate::session_config::{ConfigEntry, CONFIG_KEYS, SINK_DECOUPLE};

#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)]
pub enum SinkDecouple {
// default sink couple config of specific sink
#[default]
Default,
// enable sink decouple
Enable,
// disable sink decouple
Disable,
}

impl<'a> TryFrom<&'a [&'a str]> for SinkDecouple {
type Error = RwError;

fn try_from(value: &'a [&'a str]) -> Result<Self, Self::Error> {
if value.len() != 1 {
return Err(ErrorCode::InternalError(format!(
"SET {} takes only one argument",
Self::entry_name()
))
.into());
}

let s = value[0];
match s.to_ascii_lowercase().as_str() {
"true" | "enable" => Ok(Self::Enable),
"false" | "disable" => Ok(Self::Disable),
"default" => Ok(Self::Default),
_ => Err(InvalidConfigValue {
config_entry: Self::entry_name().to_string(),
config_value: s.to_string(),
}
.into()),
}
}
}

impl ConfigEntry for SinkDecouple {
fn entry_name() -> &'static str {
CONFIG_KEYS[SINK_DECOUPLE]
}
}

impl ToString for SinkDecouple {
fn to_string(&self) -> String {
match self {
Self::Default => "default".to_string(),
Self::Enable => "enable".to_string(),
Self::Disable => "disable".to_string(),
}
}
}
9 changes: 7 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ macro_rules! dispatch_sink {

#[macro_export]
macro_rules! match_sink_name_str {
({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {
({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
use $crate::sink::Sink;
match $name_str {
$(
<$sink_type>::SINK_NAME => {
Expand All @@ -112,7 +113,7 @@ macro_rules! match_sink_name_str {
)*
other => ($on_other_closure)(other),
}
};
}};
($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
$crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
}};
Expand Down Expand Up @@ -225,6 +226,10 @@ pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
type LogSinker: LogSinker;
type Coordinator: SinkCommitCoordinator;

fn default_sink_decouple() -> bool {
false
}

async fn validate(&self) -> Result<()>;
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
#[expect(clippy::unused_async)]
Expand Down
50 changes: 44 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::assert_matches::assert_matches;
use std::io::{Error, ErrorKind};

use anyhow::anyhow;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
Expand All @@ -23,12 +24,14 @@ use risingwave_common::constants::log_store::{
EPOCH_COLUMN_INDEX, KV_LOG_STORE_PREDEFINED_COLUMNS, SEQ_ID_COLUMN_INDEX,
};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::match_sink_name_str;
use risingwave_connector::sink::catalog::desc::SinkDesc;
use risingwave_connector::sink::catalog::{SinkId, SinkType};
use risingwave_connector::sink::{
SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
SINK_USER_FORCE_APPEND_ONLY_OPTION,
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use tracing::info;
Expand Down Expand Up @@ -92,6 +95,24 @@ impl StreamSink {
properties,
)?;

// check and ensure that the sink connector is specified and supported
match sink.properties.get(CONNECTOR_TYPE_KEY) {
Some(connector) => match_sink_name_str!(
connector.to_lowercase().as_str(),
SinkType,
Ok(()),
|other| Err(SinkError::Config(anyhow!(
"unsupported sink type {}",
other
)))
)?,
None => {
return Err(
SinkError::Config(anyhow!("connector not specified when create sink")).into(),
);
}
}

Ok(Self::new(input, sink))
}

Expand Down Expand Up @@ -367,10 +388,27 @@ impl StreamNode for StreamSink {
PbNodeBody::Sink(SinkNode {
sink_desc: Some(self.sink_desc.to_proto()),
table: Some(table.to_internal_table_prost()),
log_store_type: if self.base.ctx.session_ctx().config().get_sink_decouple() {
SinkLogStoreType::KvLogStore as i32
} else {
SinkLogStoreType::InMemoryLogStore as i32
log_store_type: match self.base.ctx.session_ctx().config().get_sink_decouple() {
SinkDecouple::Default => {
let enable_sink_decouple =
match_sink_name_str!(
self.sink_desc.properties.get(CONNECTOR_TYPE_KEY).expect(
"have checked connector is contained when create the `StreamSink`"
).to_lowercase().as_str(),
SinkTypeName,
SinkTypeName::default_sink_decouple(),
|_unsupported| unreachable!(
"have checked connector is supported when create the `StreamSink`"
)
);
if enable_sink_decouple {
SinkLogStoreType::KvLogStore as i32
} else {
SinkLogStoreType::InMemoryLogStore as i32
}
}
SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32,
SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32,
},
})
}
Expand Down
2 changes: 0 additions & 2 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ impl ExecutorBuilder for SinkExecutorBuilder {
SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY))
})?;

use risingwave_connector::sink::Sink;

match_sink_name_str!(
sink_type.to_lowercase().as_str(),
SinkType,
Expand Down

0 comments on commit 2579800

Please sign in to comment.