From fd808374156f660c7956a82e4e685ebe583538a9 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Fri, 29 Sep 2023 12:02:36 +0800 Subject: [PATCH] derive_sink_type handles both old and new syntax --- src/frontend/src/handler/create_sink.rs | 12 +++++-- .../src/optimizer/plan_node/stream_sink.rs | 35 ++++++++++++++----- src/frontend/src/utils/with_options.rs | 4 +++ 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 7bec7ec87e4a8..1eaf02eca9f35 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -19,7 +19,9 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc}; -use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION}; +use risingwave_connector::sink::{ + CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, +}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ CreateSink, CreateSinkStatement, EmitMode, ObjectName, Query, Select, SelectItem, SetExpr, @@ -124,8 +126,12 @@ pub fn gen_sink_plan( Some(f) => Some(bind_sink_format_desc(f)?), None => match with_options.get(SINK_TYPE_OPTION) { // Case B: old syntax `type = '...'` - Some(t) => SinkFormatDesc::from_legacy_type(connector, t)?.inspect(|_| { - session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.") + Some(t) => SinkFormatDesc::from_legacy_type(connector, t)?.map(|mut f| { + session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`."); + if let Some(v) = with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) { + f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into()); + } + f }), // Case C: no format + encode required None => None, diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index aa3f26368df88..d1cd407a7e7dd 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -25,7 +25,7 @@ use risingwave_common::constants::log_store::{ use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::sort_util::OrderType; use risingwave_connector::sink::catalog::desc::SinkDesc; -use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; use risingwave_connector::sink::{ SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, @@ -109,7 +109,8 @@ impl StreamSink { properties: WithOptions, format_desc: Option, ) -> Result<(PlanRef, SinkDesc)> { - let sink_type = Self::derive_sink_type(input.append_only(), &properties)?; + let sink_type = + Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); let downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; @@ -165,7 +166,7 @@ impl StreamSink { Ok((input, sink_desc)) } - fn derive_sink_type(input_append_only: bool, properties: &WithOptions) -> Result { + fn is_user_defined_append_only(properties: &WithOptions) -> Result { if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) { if sink_type != SINK_TYPE_APPEND_ONLY && sink_type != SINK_TYPE_DEBEZIUM @@ -184,7 +185,10 @@ impl StreamSink { .into()); } } + Ok(properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY)) + } + fn is_user_force_append_only(properties: &WithOptions) -> Result { if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION) && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true") && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false") @@ -198,12 +202,25 @@ impl StreamSink { ))) .into()); } + Ok(properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true")) + } + fn derive_sink_type( + input_append_only: bool, + properties: &WithOptions, + format_desc: Option<&SinkFormatDesc>, + ) -> Result { let frontend_derived_append_only = input_append_only; - let user_defined_append_only = - properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY); - let user_force_append_only = - properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true"); + let (user_defined_append_only, user_force_append_only) = match format_desc { + Some(f) => ( + f.format == SinkFormat::AppendOnly, + Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))?, + ), + None => ( + Self::is_user_defined_append_only(properties)?, + Self::is_user_force_append_only(properties)?, + ), + }; match ( frontend_derived_append_only, @@ -216,14 +233,14 @@ impl StreamSink { (false, true, false) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - "The sink cannot be append-only. Please add \"force_append_only='true'\" in WITH options to force the sink to be append-only. Notice that this will cause the sink executor to drop any UPDATE or DELETE message.", + "The sink cannot be append-only. Please add \"force_append_only='true'\" in options to force the sink to be append-only. Notice that this will cause the sink executor to drop any UPDATE or DELETE message.", ))) .into()) } (_, false, true) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - "Cannot force the sink to be append-only without \"type='append-only'\"in WITH options.", + "Cannot force the sink to be append-only without \"FORMAT PLAIN\" or \"type='append-only'\".", ))) .into()) } diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index baf03184c4a14..4b0a70ef856dc 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -61,6 +61,10 @@ impl WithOptions { } } + pub fn from_inner(inner: BTreeMap) -> Self { + Self { inner } + } + /// Get the reference of the inner map. pub fn inner(&self) -> &BTreeMap { &self.inner