diff --git a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml index acdc201f0c6fb..e6d5119ad325e 100644 --- a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml @@ -210,7 +210,7 @@ emit on window close WITH (connector = 'blackhole'); explain_output: | - StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], pk: [t._row_id, t.bar] } + StreamSink { type: AppendOnly, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum] } └─StreamEowcOverWindow { window_functions: [first_value(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING), max(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), sum(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)] } └─StreamEowcSort { sort_column: t.tm } └─StreamExchange { dist: HashShard(t.bar) } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 579e87e17e021..b93ad0ff2873f 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -49,7 +49,7 @@ BatchExchange { order: [], dist: Single } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: |- @@ -94,7 +94,7 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time, bid._row_id] } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } @@ -136,7 +136,7 @@ └─BatchFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } └─BatchScan { table: bid, columns: [bid.auction, bid.price], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, price, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, price, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1024,7 +1024,7 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1127,7 +1127,7 @@ JOIN side_input FOR SYSTEM_TIME AS OF PROCTIME() S ON mod(B.auction, 10000) = S.key sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] } └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } @@ -1208,7 +1208,7 @@ └─BatchFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] } └─StreamFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) } @@ -1908,7 +1908,7 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url], distribution: SomeShard } sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)] } + StreamSink { type: AppendOnly, columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/sink.yaml b/src/frontend/planner_test/tests/testdata/output/sink.yaml index f2fbad0f9add1..88b38248c9e1d 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink.yaml @@ -9,7 +9,7 @@ table.name='t1sink', type='upsert'); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: Upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] } - id: create_upsert_jdbc_sink_with_downstream_pk2 @@ -22,7 +22,7 @@ table.name='t1sink', type='upsert'); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] } + StreamSink { type: Upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] } └─StreamExchange { dist: HashShard(t1.v3, t1.v5) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] } - id: create_appendonly_jdbc_sink @@ -35,7 +35,7 @@ type='append-only', force_append_only='true'); explain_output: | - StreamSink { type: append-only, columns: [v1, v2, v3, v5] } + StreamSink { type: ForceAppendOnly, columns: [v1, v2, v3, v5] } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] } - id: create_upsert_kafka_sink_with_downstream_pk1 sql: | @@ -47,7 +47,7 @@ primary_key='v1,v2' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: Upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] } - id: downstream_pk_same_with_upstream @@ -60,7 +60,7 @@ primary_key='v2,v1' ); explain_output: | - StreamSink { type: upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] } + StreamSink { type: Upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] } └─StreamProject { exprs: [t1.v2, t1.v1, count] } └─StreamHashAgg { group_key: [t1.v1, t1.v2], aggs: [count] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } @@ -70,6 +70,6 @@ create table t2 (a int, b int, watermark for b as b - 4) append only; explain create sink sk1 from t2 emit on window close with (connector='blackhole'); explain_output: | - StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] } + StreamSink { type: AppendOnly, columns: [a, b, t2._row_id(hidden)] } └─StreamEowcSort { sort_column: t2.b } └─StreamTableScan { table: t2, columns: [a, b, _row_id] } diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 68814531d9293..51651aee1b9b9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -57,6 +57,24 @@ pub struct StreamSink { sink_desc: SinkDesc, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum UserDefinedAppendOnly { + DefinedAppendOnly, + DefinedNonAppendOnly, + NotDefined, +} + +impl From<&SinkFormat> for UserDefinedAppendOnly { + fn from(value: &SinkFormat) -> Self { + match value { + SinkFormat::AppendOnly => UserDefinedAppendOnly::DefinedAppendOnly, + SinkFormat::Upsert | SinkFormat::Debezium => { + UserDefinedAppendOnly::DefinedNonAppendOnly + } + } + } +} + impl StreamSink { #[must_use] pub fn new(input: PlanRef, sink_desc: SinkDesc) -> Self { @@ -211,26 +229,25 @@ impl StreamSink { Ok((input, sink_desc)) } - fn is_user_defined_append_only(properties: &WithOptions) -> Result { - if let Some(sink_type) = properties.get(SINK_TYPE_OPTION) { - if sink_type != SINK_TYPE_APPEND_ONLY - && sink_type != SINK_TYPE_DEBEZIUM - && sink_type != SINK_TYPE_UPSERT - { - return Err(ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - format!( - "`{}` must be {}, {}, or {}", - SINK_TYPE_OPTION, - SINK_TYPE_APPEND_ONLY, - SINK_TYPE_DEBEZIUM, - SINK_TYPE_UPSERT - ), - ))) - .into()); - } + fn user_defined_append_only(properties: &WithOptions) -> Result { + if properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY) { + Ok(UserDefinedAppendOnly::DefinedAppendOnly) + } else if properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_DEBEZIUM) + || properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_UPSERT) + { + return Ok(UserDefinedAppendOnly::DefinedNonAppendOnly); + } else if properties.get(SINK_TYPE_OPTION).is_some() { + return Err(ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!( + "`{}` must be {}, {}, or {}", + SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_UPSERT + ), + ))) + .into()); + } else { + return Ok(UserDefinedAppendOnly::NotDefined); } - Ok(properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY)) } fn is_user_force_append_only(properties: &WithOptions) -> Result { @@ -256,38 +273,41 @@ impl StreamSink { format_desc: Option<&SinkFormatDesc>, ) -> Result { let frontend_derived_append_only = input_append_only; - let (user_defined_append_only, user_force_append_only, syntax_legacy) = match format_desc { - Some(f) => ( - f.format == SinkFormat::AppendOnly, - Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))?, - false, - ), - None => ( - Self::is_user_defined_append_only(properties)?, - Self::is_user_force_append_only(properties)?, - true, - ), - }; + use UserDefinedAppendOnly::*; + let (user_defined_append_only, user_force_append_only, from_format_clause) = + match format_desc { + Some(f) => ( + UserDefinedAppendOnly::from(&f.format), + Self::is_user_force_append_only(&WithOptions::from_inner(f.options.clone()))? + || Self::is_user_force_append_only(properties)?, + false, + ), + None => ( + Self::user_defined_append_only(properties)?, + Self::is_user_force_append_only(properties)?, + true, + ), + }; match ( frontend_derived_append_only, user_defined_append_only, user_force_append_only, ) { - (true, true, _) => Ok(SinkType::AppendOnly), - (false, true, true) => Ok(SinkType::ForceAppendOnly), - (_, false, false) => Ok(SinkType::Upsert), - (false, true, false) => { + (true, DefinedAppendOnly|NotDefined, _) => Ok(SinkType::AppendOnly), + (false, DefinedAppendOnly|NotDefined, true) => Ok(SinkType::ForceAppendOnly), + (_, NotDefined|DefinedNonAppendOnly, false) => Ok(SinkType::Upsert), + (false, DefinedAppendOnly, false) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - format!("The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. Notice that this will cause the sink executor to drop any UPDATE or DELETE message.", if syntax_legacy {"WITH"} else {"FORMAT ENCODE"}), + format!("The sink cannot be append-only. Please add \"force_append_only='true'\" in {} options to force the sink to be append-only. Notice that this will cause the sink executor to drop any UPDATE or DELETE message.", if from_format_clause {"WITH"} else {"FORMAT ENCODE"}), ))) .into()) } - (_, false, true) => { + (_, DefinedNonAppendOnly, true) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - format!("Cannot force the sink to be append-only without \"{}\".", if syntax_legacy {"type='append-only'"} else {"FORMAT PLAIN"}), + format!("Cannot force the sink to be append-only without \"{}\".", if from_format_clause {"type='append-only'"} else {"FORMAT PLAIN"}), ))) .into()) } @@ -391,11 +411,6 @@ impl_plan_tree_node_for_unary! { StreamSink } impl Distill for StreamSink { fn distill<'a>(&self) -> XmlNode<'a> { - let sink_type = if self.sink_desc.sink_type.is_append_only() { - "append-only" - } else { - "upsert" - }; let column_names = self .sink_desc .columns @@ -405,7 +420,10 @@ impl Distill for StreamSink { .collect(); let column_names = Pretty::Array(column_names); let mut vec = Vec::with_capacity(3); - vec.push(("type", Pretty::from(sink_type))); + vec.push(( + "type", + Pretty::from(format!("{:?}", self.sink_desc.sink_type)), + )); vec.push(("columns", column_names)); if self.sink_desc.sink_type.is_upsert() { let pk = IndicesDisplay {