Skip to content

Commit

Permalink
refactor(frontend): refine decouple config (#15939)
Browse files Browse the repository at this point in the history
Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Mar 27, 2024
1 parent 3f4e493 commit 3368bce
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 34 deletions.
9 changes: 7 additions & 2 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
use serde::ser::{SerializeSeq, SerializeStruct};
use serde::Serialize;
Expand Down Expand Up @@ -326,8 +327,12 @@ impl Sink for ClickHouseSink {

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

async fn validate(&self) -> Result<()> {
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use rdkafka::types::RDKafkaErrorCode;
use rdkafka::ClientConfig;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};
Expand Down Expand Up @@ -306,8 +307,12 @@ impl Sink for KafkaSink {

const SINK_NAME: &'static str = KAFKA_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::Client as KinesisClient;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -75,8 +76,12 @@ impl Sink for KinesisSink {

const SINK_NAME: &'static str = KINESIS_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

async fn validate(&self) -> Result<()> {
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::metrics::{
LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge,
};
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_pb::catalog::PbSinkType;
use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
use risingwave_rpc_client::error::RpcError;
Expand Down Expand Up @@ -331,8 +332,12 @@ pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
type LogSinker: LogSinker;
type Coordinator: SinkCommitCoordinator;

fn default_sink_decouple(_desc: &SinkDesc) -> bool {
false
/// `user_specified` is the value of `sink_decouple` config.
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Disable | SinkDecouple::Default => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

async fn validate(&self) -> Result<()>;
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context as _};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use rumqttc::v5::mqttbytes::QoS;
use rumqttc::v5::ConnectionError;
use serde_derive::Deserialize;
Expand Down Expand Up @@ -116,8 +117,12 @@ impl Sink for MqttSink {

const SINK_NAME: &'static str = MQTT_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

async fn validate(&self) -> Result<()> {
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::{anyhow, Context as _};
use async_nats::jetstream::context::Context;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -97,8 +98,12 @@ impl Sink for NatsSink {

const SINK_NAME: &'static str = NATS_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

async fn validate(&self) -> Result<()> {
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use pulsar::producer::{Message, SendFuture};
use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;
Expand Down Expand Up @@ -169,8 +170,12 @@ impl Sink for PulsarSink {

const SINK_NAME: &'static str = PULSAR_SINK;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
desc.sink_type.is_append_only()
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(desc.sink_type.is_append_only()),
SinkDecouple::Disable => Ok(false),
SinkDecouple::Enable => Ok(true),
}
}

async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use prost::Message;
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::DataType;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::{
Expand Down Expand Up @@ -145,8 +146,12 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {

const SINK_NAME: &'static str = R::SINK_NAME;

fn default_sink_decouple(desc: &SinkDesc) -> bool {
R::default_sink_decouple(desc)
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
match user_specified {
SinkDecouple::Default => Ok(R::default_sink_decouple(desc)),
SinkDecouple::Enable => Ok(true),
SinkDecouple::Disable => Ok(false),
}
}

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Expand Down
30 changes: 12 additions & 18 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use icelake::types::Transform;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnCatalog, TableId};
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::types::{DataType, StructType};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_connector::match_sink_name_str;
Expand Down Expand Up @@ -169,16 +168,12 @@ pub struct StreamSink {
pub base: PlanBase<Stream>,
input: PlanRef,
sink_desc: SinkDesc,
default_log_store_type: SinkLogStoreType,
log_store_type: SinkLogStoreType,
}

impl StreamSink {
#[must_use]
pub fn new(
input: PlanRef,
sink_desc: SinkDesc,
default_log_store_type: SinkLogStoreType,
) -> Self {
pub fn new(input: PlanRef, sink_desc: SinkDesc, log_store_type: SinkLogStoreType) -> Self {
let base = input
.plan_base()
.into_stream()
Expand All @@ -188,7 +183,7 @@ impl StreamSink {
base,
input,
sink_desc,
default_log_store_type,
log_store_type,
}
}

Expand Down Expand Up @@ -232,7 +227,7 @@ impl StreamSink {
|sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));

// check and ensure that the sink connector is specified and supported
let default_sink_decouple = match sink.properties.get(CONNECTOR_TYPE_KEY) {
let sink_decouple = match sink.properties.get(CONNECTOR_TYPE_KEY) {
Some(connector) => {
match_sink_name_str!(
connector.to_lowercase().as_str(),
Expand All @@ -242,7 +237,10 @@ impl StreamSink {
if connector == TABLE_SINK && sink.target_table.is_none() {
unsupported_sink(TABLE_SINK)
} else {
Ok(SinkType::default_sink_decouple(&sink))
SinkType::is_sink_decouple(
&sink,
&input.ctx().session_ctx().config().sink_decouple(),
)
}
},
|other: &str| unsupported_sink(other)
Expand All @@ -255,13 +253,13 @@ impl StreamSink {
}
};

let default_log_store_type = if default_sink_decouple {
let log_store_type = if sink_decouple {
SinkLogStoreType::KvLogStore
} else {
SinkLogStoreType::InMemoryLogStore
};

Ok(Self::new(input, sink, default_log_store_type))
Ok(Self::new(input, sink, log_store_type))
}

fn derive_iceberg_sink_distribution(
Expand Down Expand Up @@ -519,7 +517,7 @@ impl PlanTreeNodeUnary for StreamSink {
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(input, self.sink_desc.clone(), self.default_log_store_type)
Self::new(input, self.sink_desc.clone(), self.log_store_type)
// TODO(nanderstabel): Add assertions (assert_eq!)
}
}
Expand Down Expand Up @@ -572,11 +570,7 @@ impl StreamNode for StreamSink {
PbNodeBody::Sink(SinkNode {
sink_desc: Some(self.sink_desc.to_proto()),
table: Some(table.to_internal_table_prost()),
log_store_type: match self.base.ctx().session_ctx().config().sink_decouple() {
SinkDecouple::Default => self.default_log_store_type as i32,
SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32,
SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32,
},
log_store_type: self.log_store_type as i32,
})
}
}
Expand Down

0 comments on commit 3368bce

Please sign in to comment.