From 267fd7a942d89f0dfe44c67b0b2654bcd667e117 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 19 Dec 2023 19:04:33 +0800 Subject: [PATCH 1/8] fix(sink): derive append only from optimizer when no format declared --- .../src/optimizer/plan_node/stream_sink.rs | 79 ++++++++++++------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 68814531d9293..7d2d7f5d85131 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -57,6 +57,24 @@ pub struct StreamSink { sink_desc: SinkDesc, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum UserDefinedAppendOnly { + DefinedAppendOnly, + DefinedNonAppendOnly, + NotDefined, +} + +impl From<&SinkFormat> for UserDefinedAppendOnly { + fn from(value: &SinkFormat) -> Self { + match value { + SinkFormat::AppendOnly => UserDefinedAppendOnly::DefinedAppendOnly, + SinkFormat::Upsert | SinkFormat::Debezium => { + UserDefinedAppendOnly::DefinedNonAppendOnly + } + } + } +} + impl StreamSink { #[must_use] pub fn new(input: PlanRef, sink_desc: SinkDesc) -> Self { @@ -211,12 +229,15 @@ impl StreamSink { Ok((input, sink_desc)) } - fn is_user_defined_append_only(properties: &WithOptions) -> Result { - if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) { - if sink_type != SINK_TYPE_APPEND_ONLY - && sink_type != SINK_TYPE_DEBEZIUM - && sink_type != SINK_TYPE_UPSERT - { + fn user_defined_append_only(properties: &WithOptions) -> Result { + if properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY) { + return Ok(UserDefinedAppendOnly::DefinedAppendOnly); + } else if properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_DEBEZIUM) + || properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_UPSERT) + { + return Ok(UserDefinedAppendOnly::DefinedNonAppendOnly); + } else { + if properties.get(SINK_TYPE_OPTION).is_some() { return Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, format!( @@ -228,9 +249,10 @@ impl StreamSink { ), ))) .into()); + } else { + return Ok(UserDefinedAppendOnly::NotDefined); } - } - Ok(properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY)) + }; } fn is_user_force_append_only(properties: &WithOptions) -> Result { @@ -256,38 +278,41 @@ impl StreamSink { format_desc: Option<&SinkFormatDesc>, ) -> Result { let frontend_derived_append_only = input_append_only; - let (user_defined_append_only, user_force_append_only, syntax_legacy) = match format_desc { - Some(f) => ( - f.format == SinkFormat::AppendOnly, - Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))?, - false, - ), - None => ( - Self::is_user_defined_append_only(properties)?, - Self::is_user_force_append_only(properties)?, - true, - ), - }; + + use UserDefinedAppendOnly::*; + let (user_defined_append_only, user_force_append_only, from_format_clause) = + match format_desc { + Some(f) => ( + UserDefinedAppendOnly::from(&f.format), + Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))?, + false, + ), + None => ( + Self::user_defined_append_only(properties)?, + Self::is_user_force_append_only(properties)?, + true, + ), + }; match ( frontend_derived_append_only, user_defined_append_only, user_force_append_only, ) { - (true, true, _) => Ok(SinkType::AppendOnly), - (false, true, true) => Ok(SinkType::ForceAppendOnly), - (_, false, false) => Ok(SinkType::Upsert), - (false, true, false) => { + (true, DefinedAppendOnly|NotDefined, _) => Ok(SinkType::AppendOnly), + (false, DefinedAppendOnly, true) => Ok(SinkType::ForceAppendOnly), + (_, NotDefined|DefinedNonAppendOnly, false) => Ok(SinkType::Upsert), + (false, DefinedAppendOnly, false) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - format!("The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. Notice that this will cause the sink executor to drop any UPDATE or DELETE message.", if syntax_legacy {"WITH"} else {"FORMAT ENCODE"}), + format!("The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. Notice that this will cause the sink executor to drop any UPDATE or DELETE message.", if from_format_clause {"WITH"} else {"FORMAT ENCODE"}), ))) .into()) } - (_, false, true) => { + (_, DefinedNonAppendOnly|NotDefined, true) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - format!("Cannot force the sink to be append-only without \"{}\".", if syntax_legacy {"type='append-only'"} else {"FORMAT PLAIN"}), + format!("Cannot force the sink to be append-only without \"{}\".", if from_format_clause {"type='append-only'"} else {"FORMAT PLAIN"}), ))) .into()) } From ef0f8b1feebf3158882f4e49b6ff24be3a3675a8 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 19 Dec 2023 19:05:05 +0800 Subject: [PATCH 2/8] planner test --- .../tests/testdata/output/emit_on_window_close.yaml | 2 +- src/frontend/planner_test/tests/testdata/output/sink.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml index acdc201f0c6fb..657b7dc85b0af 100644 --- a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml @@ -210,7 +210,7 @@ emit on window close WITH (connector = 'blackhole'); explain_output: | - StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], pk: [t._row_id, t.bar] } + StreamSink { type: append-only, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum] } └─StreamEowcOverWindow { window_functions: [first_value(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING), max(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), sum(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)] } └─StreamEowcSort { sort_column: t.tm } └─StreamExchange { dist: HashShard(t.bar) } diff --git a/src/frontend/planner_test/tests/testdata/output/sink.yaml b/src/frontend/planner_test/tests/testdata/output/sink.yaml index f2fbad0f9add1..c66b0ad7641eb 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink.yaml @@ -70,6 +70,6 @@ create table t2 (a int, b int, watermark for b as b - 4) append only; explain create sink sk1 from t2 emit on window close with (connector='blackhole'); explain_output: | - StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] } + StreamSink { type: append-only, columns: [a, b, t2._row_id(hidden)] } └─StreamEowcSort { sort_column: t2.b } └─StreamTableScan { table: t2, columns: [a, b, _row_id] } From 8158d8b4396919e5638a474accbd7241e77967ed Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 19 Dec 2023 19:20:04 +0800 Subject: [PATCH 3/8] allow nont defined force append only --- src/frontend/src/optimizer/plan_node/stream_sink.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 7d2d7f5d85131..db6a5c9ebaa90 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -300,7 +300,7 @@ impl StreamSink { user_force_append_only, ) { (true, DefinedAppendOnly|NotDefined, _) => Ok(SinkType::AppendOnly), - (false, DefinedAppendOnly, true) => Ok(SinkType::ForceAppendOnly), + (false, DefinedAppendOnly|NotDefined, true) => Ok(SinkType::ForceAppendOnly), (_, NotDefined|DefinedNonAppendOnly, false) => Ok(SinkType::Upsert), (false, DefinedAppendOnly, false) => { Err(ErrorCode::SinkError(Box::new(Error::new( @@ -309,7 +309,7 @@ impl StreamSink { ))) .into()) } - (_, DefinedNonAppendOnly|NotDefined, true) => { + (_, DefinedNonAppendOnly, true) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, format!("Cannot force the sink to be append-only without \"{}\".", if from_format_clause {"type='append-only'"} else {"FORMAT PLAIN"}), From cfab268bd8b5620115cb68fad799152c98d625ae Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 19 Dec 2023 19:21:35 +0800 Subject: [PATCH 4/8] allow force append only in with option --- src/frontend/src/optimizer/plan_node/stream_sink.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index db6a5c9ebaa90..1bc448dd5e1ef 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -284,7 +284,8 @@ impl StreamSink { match format_desc { Some(f) => ( UserDefinedAppendOnly::from(&f.format), - Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))?, + Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))? + || Self::is_user_force_append_only(properties)?, false, ), None => ( From 26bb5abe19fbd3373467c0eaeb179674adef0ea6 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 19 Dec 2023 19:35:13 +0800 Subject: [PATCH 5/8] explain sink type --- src/frontend/src/optimizer/plan_node/stream_sink.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 1bc448dd5e1ef..59591a4a45bfc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -278,7 +278,6 @@ impl StreamSink { format_desc: Option<&SinkFormatDesc>, ) -> Result { let frontend_derived_append_only = input_append_only; - use UserDefinedAppendOnly::*; let (user_defined_append_only, user_force_append_only, from_format_clause) = match format_desc { @@ -417,11 +416,6 @@ impl_plan_tree_node_for_unary! { StreamSink } impl Distill for StreamSink { fn distill<'a>(&self) -> XmlNode<'a> { - let sink_type = if self.sink_desc.sink_type.is_append_only() { - "append-only" - } else { - "upsert" - }; let column_names = self .sink_desc .columns @@ -431,7 +425,10 @@ impl Distill for StreamSink { .collect(); let column_names = Pretty::Array(column_names); let mut vec = Vec::with_capacity(3); - vec.push(("type", Pretty::from(sink_type))); + vec.push(( + "type", + Pretty::from(format!("{:?}", self.sink_desc.sink_type)), + )); vec.push(("columns", column_names)); if self.sink_desc.sink_type.is_upsert() { let pk = IndicesDisplay { From 713dc0ae52072e5d7d6545d9d08c0dac14a5c5f0 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Wed, 20 Dec 2023 14:38:40 +0800 Subject: [PATCH 6/8] apply planner test --- .../testdata/output/emit_on_window_close.yaml | 2 +- .../tests/testdata/output/nexmark.yaml | 14 +++++++------- .../planner_test/tests/testdata/output/sink.yaml | 12 ++++++------ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml index 657b7dc85b0af..e6d5119ad325e 100644 --- a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml @@ -210,7 +210,7 @@ emit on window close WITH (connector = 'blackhole'); explain_output: | - StreamSink { type: append-only, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum] } + StreamSink { type: AppendOnly, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum] } └─StreamEowcOverWindow { window_functions: [first_value(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING), max(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), sum(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)] } └─StreamEowcSort { sort_column: t.tm } └─StreamExchange { dist: HashShard(t.bar) } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 579e87e17e021..b93ad0ff2873f 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -49,7 +49,7 @@ BatchExchange { order: [], dist: Single } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: |- @@ -94,7 +94,7 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time, bid._row_id] } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } @@ -136,7 +136,7 @@ └─BatchFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } └─BatchScan { table: bid, columns: [bid.auction, bid.price], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, price, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, price, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1024,7 +1024,7 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1127,7 +1127,7 @@ JOIN side_input FOR SYSTEM_TIME AS OF PROCTIME() S ON mod(B.auction, 10000) = S.key sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] } └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } @@ -1208,7 +1208,7 @@ └─BatchFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] } └─StreamFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) } @@ -1908,7 +1908,7 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/sink.yaml b/src/frontend/planner_test/tests/testdata/output/sink.yaml index c66b0ad7641eb..88b38248c9e1d 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink.yaml @@ -9,7 +9,7 @@ table.name='t1sink', type='upsert'); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: Upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] } - id: create_upsert_jdbc_sink_with_downstream_pk2 @@ -22,7 +22,7 @@ table.name='t1sink', type='upsert'); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] } + StreamSink { type: Upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] } └─StreamExchange { dist: HashShard(t1.v3, t1.v5) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] } - id: create_appendonly_jdbc_sink @@ -35,7 +35,7 @@ type='append-only', force_append_only='true'); explain_output: | - StreamSink { type: append-only, columns: [v1, v2, v3, v5] } + StreamSink { type: ForceAppendOnly, columns: [v1, v2, v3, v5] } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] } - id: create_upsert_kafka_sink_with_downstream_pk1 sql: | @@ -47,7 +47,7 @@ primary_key='v1,v2' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: Upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] } - id: downstream_pk_same_with_upstream @@ -60,7 +60,7 @@ primary_key='v2,v1' ); explain_output: | - StreamSink { type: upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] } + StreamSink { type: Upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] } └─StreamProject { exprs: [t1.v2, t1.v1, count] } └─StreamHashAgg { group_key: [t1.v1, t1.v2], aggs: [count] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } @@ -70,6 +70,6 @@ create table t2 (a int, b int, watermark for b as b - 4) append only; explain create sink sk1 from t2 emit on window close with (connector='blackhole'); explain_output: | - StreamSink { type: append-only, columns: [a, b, t2._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [a, b, t2._row_id(hidden)] } └─StreamEowcSort { sort_column: t2.b } └─StreamTableScan { table: t2, columns: [a, b, _row_id] } From f23c22486563215c4784ab35afafa7444c39408f Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Wed, 20 Dec 2023 14:57:46 +0800 Subject: [PATCH 7/8] clippy --- .../src/optimizer/plan_node/stream_sink.rs | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 59591a4a45bfc..538d1e1cb9f08 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -231,28 +231,26 @@ impl StreamSink { fn user_defined_append_only(properties: &WithOptions) -> Result { if properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY) { - return Ok(UserDefinedAppendOnly::DefinedAppendOnly); + Ok(UserDefinedAppendOnly::DefinedAppendOnly) } else if properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_DEBEZIUM) || properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_UPSERT) { return Ok(UserDefinedAppendOnly::DefinedNonAppendOnly); + } else if properties.get(SINK_TYPE_OPTION).is_some() { + return Err(ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!( + "`{}` must be {}, {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_DEBEZIUM, + SINK_TYPE_UPSERT + ), + ))) + .into()); } else { - if properties.get(SINK_TYPE_OPTION).is_some() { - return Err(ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - format!( - "`{}` must be {}, {}, or {}", - SINK_TYPE_OPTION, - SINK_TYPE_APPEND_ONLY, - SINK_TYPE_DEBEZIUM, - SINK_TYPE_UPSERT - ), - ))) - .into()); - } else { - return Ok(UserDefinedAppendOnly::NotDefined); - } - }; + return Ok(UserDefinedAppendOnly::NotDefined); + } } fn is_user_force_append_only(properties: &WithOptions) -> Result { From 89922a6e5a522a107948b5c3108b49697f434b4b Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 21 Dec 2023 14:57:25 +0800 Subject: [PATCH 8/8] fmt --- src/frontend/src/optimizer/plan_node/stream_sink.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 538d1e1cb9f08..51651aee1b9b9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -241,10 +241,7 @@ impl StreamSink { ErrorKind::InvalidInput, format!( "`{}` must be {}, {}, or {}", - SINK_TYPE_OPTION, - SINK_TYPE_APPEND_ONLY, - SINK_TYPE_DEBEZIUM, - SINK_TYPE_UPSERT + SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_UPSERT ), ))) .into());