Skip to content

Commit

Permalink
store default log store type in stream sink
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 16, 2024
1 parent 924d49d commit 00c42d3
Showing 1 changed file with 22 additions and 25 deletions.
47 changes: 22 additions & 25 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use risingwave_connector::sink::{
SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::SinkLogStoreType;

use super::derive::{derive_columns, derive_pk};
use super::generic::GenericPlanRef;
Expand All @@ -54,11 +55,16 @@ pub struct StreamSink {
pub base: PlanBase<Stream>,
input: PlanRef,
sink_desc: SinkDesc,
default_log_store_type: SinkLogStoreType,
}

impl StreamSink {
#[must_use]
pub fn new(input: PlanRef, sink_desc: SinkDesc) -> Self {
pub fn new(
input: PlanRef,
sink_desc: SinkDesc,
default_log_store_type: SinkLogStoreType,
) -> Self {
let base = input
.plan_base()
.into_stream()
Expand All @@ -68,6 +74,7 @@ impl StreamSink {
base,
input,
sink_desc,
default_log_store_type,
}
}

Expand Down Expand Up @@ -109,7 +116,7 @@ impl StreamSink {
|sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));

// check and ensure that the sink connector is specified and supported
match sink.properties.get(CONNECTOR_TYPE_KEY) {
let default_sink_decouple_fn = match sink.properties.get(CONNECTOR_TYPE_KEY) {
Some(connector) => {
match_sink_name_str!(
connector.to_lowercase().as_str(),
Expand All @@ -119,20 +126,27 @@ impl StreamSink {
if connector == TABLE_SINK && sink.target_table.is_none() {
unsupported_sink(TABLE_SINK)
} else {
Ok(())
Ok(SinkType::default_sink_decouple as for<'a> fn(&'a SinkDesc) -> bool)
}
},
|other: &str| unsupported_sink(other)
)?;
)?
}
None => {
return Err(
SinkError::Config(anyhow!("connector not specified when create sink")).into(),
);
}
}
};

let default_sink_decouple = default_sink_decouple_fn(&sink);
let default_log_store_type = if default_sink_decouple {
SinkLogStoreType::KvLogStore
} else {
SinkLogStoreType::InMemoryLogStore
};

Ok(Self::new(input, sink))
Ok(Self::new(input, sink, default_log_store_type))
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -407,7 +421,7 @@ impl PlanTreeNodeUnary for StreamSink {
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(input, self.sink_desc.clone())
Self::new(input, self.sink_desc.clone(), self.default_log_store_type)
// TODO(nanderstabel): Add assertions (assert_eq!)
}
}
Expand Down Expand Up @@ -461,24 +475,7 @@ impl StreamNode for StreamSink {
sink_desc: Some(self.sink_desc.to_proto()),
table: Some(table.to_internal_table_prost()),
log_store_type: match self.base.ctx().session_ctx().config().sink_decouple() {
SinkDecouple::Default => {
let enable_sink_decouple =
match_sink_name_str!(
self.sink_desc.properties.get(CONNECTOR_TYPE_KEY).expect(
"have checked connector is contained when create the `StreamSink`"
).to_lowercase().as_str(),
SinkTypeName,
SinkTypeName::default_sink_decouple(&self.sink_desc),
|_unsupported| unreachable!(
"have checked connector is supported when create the `StreamSink`"
)
);
if enable_sink_decouple {
SinkLogStoreType::KvLogStore as i32
} else {
SinkLogStoreType::InMemoryLogStore as i32
}
}
SinkDecouple::Default => self.default_log_store_type as i32,
SinkDecouple::Enable => SinkLogStoreType::KvLogStore as i32,
SinkDecouple::Disable => SinkLogStoreType::InMemoryLogStore as i32,
},
Expand Down

0 comments on commit 00c42d3

Please sign in to comment.