diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 7a9b85a3e1cf6..7bdcb7c45fa8b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -34,6 +34,7 @@ use risingwave_connector::sink::{ SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use risingwave_pb::stream_plan::SinkLogStoreType; use super::derive::{derive_columns, derive_pk}; use super::generic::GenericPlanRef; @@ -54,11 +55,16 @@ pub struct StreamSink { pub base: PlanBase, input: PlanRef, sink_desc: SinkDesc, + default_log_store_type: SinkLogStoreType, } impl StreamSink { #[must_use] - pub fn new(input: PlanRef, sink_desc: SinkDesc) -> Self { + pub fn new( + input: PlanRef, + sink_desc: SinkDesc, + default_log_store_type: SinkLogStoreType, + ) -> Self { let base = input .plan_base() .into_stream() @@ -68,6 +74,7 @@ impl StreamSink { base, input, sink_desc, + default_log_store_type, } } @@ -109,7 +116,7 @@ impl StreamSink { |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink))); // check and ensure that the sink connector is specified and supported - match sink.properties.get(CONNECTOR_TYPE_KEY) { + let default_sink_decouple_fn = match sink.properties.get(CONNECTOR_TYPE_KEY) { Some(connector) => { match_sink_name_str!( connector.to_lowercase().as_str(), @@ -119,20 +126,27 @@ impl StreamSink { if connector == TABLE_SINK && sink.target_table.is_none() { unsupported_sink(TABLE_SINK) } else { - Ok(()) + Ok(SinkType::default_sink_decouple as for<'a> fn(&'a SinkDesc) -> bool) } }, |other: &str| unsupported_sink(other) - )?; + )? } None => { return Err( SinkError::Config(anyhow!("connector not specified when create sink")).into(), ); } - } + }; + + let default_sink_decouple = default_sink_decouple_fn(&sink); + let default_log_store_type = if default_sink_decouple { + SinkLogStoreType::KvLogStore + } else { + SinkLogStoreType::InMemoryLogStore + }; - Ok(Self::new(input, sink)) + Ok(Self::new(input, sink, default_log_store_type)) } #[allow(clippy::too_many_arguments)] @@ -407,7 +421,7 @@ impl PlanTreeNodeUnary for StreamSink { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(input, self.sink_desc.clone()) + Self::new(input, self.sink_desc.clone(), self.default_log_store_type) // TODO(nanderstabel): Add assertions (assert_eq!) } } @@ -461,24 +475,7 @@ impl StreamNode for StreamSink { sink_desc: Some(self.sink_desc.to_proto()), table: Some(table.to_internal_table_prost()), log_store_type: match self.base.ctx().session_ctx().config().sink_decouple() { - SinkDecouple::Default => { - let enable_sink_decouple = - match_sink_name_str!( - self.sink_desc.properties.get(CONNECTOR_TYPE_KEY).expect( - "have checked connector is contained when create the `StreamSink`" - ).to_lowercase().as_str(), - SinkTypeName, - SinkTypeName::default_sink_decouple(&self.sink_desc), - |_unsupported| unreachable!( - "have checked connector is supported when create the `StreamSink`" - ) - ); - if enable_sink_decouple { - SinkLogStoreType::KvLogStore as i32 - } else { - SinkLogStoreType::InMemoryLogStore as i32 - } - } + SinkDecouple::Default => self.default_log_store_type as i32, SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32, SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32, },