diff --git a/e2e_test/sink/sink_into_table/parallelism.slt b/e2e_test/sink/sink_into_table/parallelism.slt index 8359d2731d196..d5bbecf10f01a 100644 --- a/e2e_test/sink/sink_into_table/parallelism.slt +++ b/e2e_test/sink/sink_into_table/parallelism.slt @@ -8,16 +8,16 @@ statement ok SET STREAMING_PARALLELISM TO 2; statement ok -create table t_simple (v1 int, v2 int); +create table t_simple (v1 int, v2 int) append only; statement ok -create table m_simple (v1 int primary key, v2 int); +create table m_simple (v1 int, v2 int) append only; statement ok SET STREAMING_PARALLELISM TO 3; statement ok -create sink s_simple_1 into m_simple as select v1, v2 from t_simple; +create sink s_simple_1 into m_simple as select v1, v2 from t_simple with (type = 'append-only'); query I select distinct parallelism from rw_fragment_parallelism where name in ('t_simple', 'm_simple', 's_simple_1'); diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 675b99ad0e145..c03e19c985e6b 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -850,6 +850,7 @@ impl TestCase { false, None, None, + false, ) { Ok(sink_plan) => { ret.sink_plan = Some(explain_plan(&sink_plan.into())); diff --git a/src/frontend/planner_test/tests/testdata/input/sink_into_table.yaml b/src/frontend/planner_test/tests/testdata/input/sink_into_table.yaml index 1191cd6a68966..24b3df8902faf 100644 --- a/src/frontend/planner_test/tests/testdata/input/sink_into_table.yaml +++ b/src/frontend/planner_test/tests/testdata/input/sink_into_table.yaml @@ -9,3 +9,15 @@ explain create sink ss into t from s with (type = 'append-only'); expected_outputs: - explain_output +- sql: | + create table t1 (a int primary key, b int); + create table t2 (a int, b int primary key); + explain create sink s into t1 from t2; + expected_outputs: + - explain_output +- sql: | + create table t1 (a int primary key, b int); + create table t2 (a int, b int primary key); + explain create sink s into t1 as select b from t2; + expected_outputs: + - explain_output diff --git a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml index 090d086482ec3..3a45cddf1f6c3 100644 --- a/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/output/emit_on_window_close.yaml @@ -206,7 +206,7 @@ emit on window close WITH (connector = 'blackhole'); explain_output: | - StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], pk: [t._row_id, t.bar] } + StreamSink { type: upsert, columns: [tm, foo, bar, t._row_id(hidden), lag, max, sum], downstream_pk: [] } └─StreamEowcOverWindow { window_functions: [first_value(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND 2 PRECEDING), max(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), sum(t.foo) OVER(PARTITION BY t.bar ORDER BY t.tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)] } └─StreamEowcSort { sort_column: t.tm } └─StreamExchange { dist: HashShard(t.bar) } diff --git a/src/frontend/planner_test/tests/testdata/output/sink.yaml b/src/frontend/planner_test/tests/testdata/output/sink.yaml index 369ffc748fb73..16571bb87952f 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink.yaml @@ -9,7 +9,7 @@ table.name='t1sink', type='upsert'); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] } - id: create_upsert_jdbc_sink_with_downstream_pk2 @@ -22,7 +22,7 @@ table.name='t1sink', type='upsert'); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5], downstream_pk: [t1.v3, t1.v5] } └─StreamExchange { dist: HashShard(t1.v3, t1.v5) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5] } - id: create_upsert_jdbc_sink_with_downstream_pk1 @@ -36,7 +36,7 @@ type='upsert'); explain_output: |+ Fragment 0 - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] } ├── tables: [ Sink: 0 ] ├── output: [ t1.v1, t1.v2, t1.v3, t1.v5, t1.v4 ] ├── stream key: [ t1.v3, t1.v4 ] @@ -88,7 +88,7 @@ type='upsert'); explain_output: |+ Fragment 0 - StreamSink { type: upsert, columns: [v1, v2, v3, v5], pk: [t1.v1, t1.v2] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5], downstream_pk: [t1.v3, t1.v5] } ├── tables: [ Sink: 0 ] ├── output: [ t1.v1, t1.v2, t1.v3, t1.v5 ] ├── stream key: [ t1.v1, t1.v2 ] @@ -150,7 +150,7 @@ primary_key='v1,v2' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], pk: [t1.v3, t1.v4] } + StreamSink { type: upsert, columns: [v1, v2, v3, v5, t1.v4(hidden)], downstream_pk: [t1.v1, t1.v2] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v5, v4] } - id: downstream_pk_same_with_upstream @@ -163,7 +163,7 @@ primary_key='v2,v1' ); explain_output: | - StreamSink { type: upsert, columns: [v2, v1, count], pk: [t1.v1, t1.v2] } + StreamSink { type: upsert, columns: [v2, v1, count], downstream_pk: [t1.v2, t1.v1] } └─StreamProject { exprs: [t1.v2, t1.v1, count] } └─StreamHashAgg { group_key: [t1.v1, t1.v2], aggs: [count] } └─StreamExchange { dist: HashShard(t1.v1, t1.v2) } @@ -173,7 +173,7 @@ create table t2 (a int, b int, watermark for b as b - 4) append only; explain create sink sk1 from t2 emit on window close with (connector='blackhole'); explain_output: | - StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] } + StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], downstream_pk: [] } └─StreamEowcSort { sort_column: t2.b } └─StreamTableScan { table: t2, columns: [a, b, _row_id] } - id: create_mock_iceberg_sink_append_only_with_sparse_partition @@ -236,7 +236,7 @@ primary_key = 'v1' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] } + StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] } └─StreamExchange { dist: HashShard($expr1) } └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } @@ -258,5 +258,5 @@ primary_key = 'v1' ); explain_output: | - StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] } + StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] } └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml b/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml index 1fc6df6613a98..ac9485bc61abb 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink_into_table.yaml @@ -12,3 +12,20 @@ StreamProject { exprs: [s.x, Proctime as $expr1, (Proctime - '00:01:00':Interval) as $expr2, null:Serial], output_watermarks: [$expr1, $expr2] } └─StreamSink { type: append-only, columns: [x, s._row_id(hidden)] } └─StreamTableScan { table: s, columns: [x, _row_id] } +- sql: | + create table t1 (a int primary key, b int); + create table t2 (a int, b int primary key); + explain create sink s into t1 from t2; + explain_output: | + StreamProject { exprs: [t2.a, t2.b] } + └─StreamSink { type: upsert, columns: [a, b], downstream_pk: [t2.a] } + └─StreamExchange { dist: HashShard(t2.a) } + └─StreamTableScan { table: t2, columns: [a, b] } +- sql: | + create table t1 (a int primary key, b int); + create table t2 (a int, b int primary key); + explain create sink s into t1 as select b from t2; + explain_output: | + StreamProject { exprs: [t2.b, null:Int32] } + └─StreamSink { type: upsert, columns: [b], downstream_pk: [t2.b] } + └─StreamTableScan { table: t2, columns: [b] } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 59bbe49652a00..cc587930fee5e 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -28,7 +28,7 @@ use risingwave_common::catalog::{ use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; -use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc}; use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::kafka::KAFKA_SINK; use risingwave_connector::sink::{ @@ -234,8 +234,6 @@ pub async fn gen_sink_plan( } } - let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id()); - let sink_plan = plan_root.gen_sink_plan( sink_table_name, definition, @@ -245,8 +243,9 @@ pub async fn gen_sink_plan( sink_from_table_name, format_desc, without_backfill, - target_table, + target_table_catalog.clone(), partition_info, + user_specified_columns, )?; let sink_desc = sink_plan.sink_desc().clone(); @@ -277,23 +276,6 @@ pub async fn gen_sink_plan( unreachable!("can not derive generated columns in a sink's catalog, but meet one"); } } - - let user_defined_primary_key_table = table_catalog.row_id_index.is_none(); - let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly - || sink_catalog.sink_type == SinkType::ForceAppendOnly; - - if !user_defined_primary_key_table && !sink_is_append_only { - return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), - ))); - } - - if table_catalog.append_only && !sink_is_append_only { - return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), - ))); - } - let exprs = derive_default_column_project_for_sink( &sink_catalog, sink_plan.schema(), diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e08f6b2c4dd4a..b91270d0607ea 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::num::NonZeroU32; use std::ops::DerefMut; +use std::sync::Arc; pub mod plan_node; @@ -41,7 +42,7 @@ mod plan_expr_visitor; mod rule; use std::assert_matches::assert_matches; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use fixedbitset::FixedBitSet; use itertools::Itertools as _; @@ -51,7 +52,7 @@ use plan_expr_rewriter::ConstEvalRewriter; use property::Order; use risingwave_common::bail; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId, + ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, }; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -83,6 +84,7 @@ use crate::optimizer::plan_node::{ use crate::optimizer::plan_visitor::TemporalJoinValidator; use crate::optimizer::property::Distribution; use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved}; +use crate::TableCatalog; /// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`. /// and required distribution and order. And `PlanRoot` can generate corresponding streaming or @@ -943,8 +945,9 @@ impl PlanRoot { sink_from_table_name: String, format_desc: Option, without_backfill: bool, - target_table: Option, + target_table: Option>, partition_info: Option, + user_specified_columns: bool, ) -> Result { let stream_scan_type = if without_backfill { StreamScanType::UpstreamOnly @@ -962,12 +965,16 @@ impl PlanRoot { self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?; assert_eq!(self.phase, PlanPhase::Stream); assert_eq!(stream_plan.convention(), Convention::Stream); + let target_columns_to_plan_mapping = target_table + .as_ref() + .map(|t| self.target_columns_to_plan_mapping(t.columns(), user_specified_columns)); StreamSink::create( stream_plan, sink_name, db_name, sink_from_table_name, target_table, + target_columns_to_plan_mapping, self.required_dist.clone(), self.required_order.clone(), self.out_fields.clone(), @@ -997,6 +1004,39 @@ impl PlanRoot { .config() .streaming_use_snapshot_backfill() } + + /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema + pub fn target_columns_to_plan_mapping( + &self, + tar_cols: &[ColumnCatalog], + user_specified_columns: bool, + ) -> Vec> { + #[allow(clippy::disallowed_methods)] + let visible_cols: Vec<(usize, String)> = self + .out_fields + .ones() + .zip_eq(self.out_names.iter().cloned()) + .collect_vec(); + + let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec(); + let visible_col_idxes_by_name = visible_cols + .iter() + .map(|(i, name)| (name.as_ref(), *i)) + .collect::>(); + + tar_cols + .iter() + .enumerate() + .filter(|(_, tar_col)| !tar_col.is_generated()) + .map(|(tar_i, tar_col)| { + if user_specified_columns { + visible_col_idxes_by_name.get(tar_col.name()).cloned() + } else { + (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0) + } + }) + .collect() + } } fn find_version_column_index( diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 48dc4dad85c5c..74ce8587ef6d4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -14,13 +14,14 @@ use std::assert_matches::assert_matches; use std::io::{Error, ErrorKind}; +use std::sync::Arc; use anyhow::anyhow; use fixedbitset::FixedBitSet; use icelake::types::Transform; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, CreateType, TableId}; +use risingwave_common::catalog::{ColumnCatalog, CreateType}; use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::match_sink_name_str; @@ -43,7 +44,7 @@ use super::utils::{ childless_record, infer_kv_log_store_table_catalog_inner, Distill, IndicesDisplay, }; use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject}; -use crate::error::{ErrorCode, Result}; +use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::plan_can_use_background_ddl; @@ -195,85 +196,6 @@ impl StreamSink { &self.sink_desc } - #[allow(clippy::too_many_arguments)] - pub fn create( - input: PlanRef, - name: String, - db_name: String, - sink_from_table_name: String, - target_table: Option, - user_distributed_by: RequiredDist, - user_order_by: Order, - user_cols: FixedBitSet, - out_names: Vec, - definition: String, - properties: WithOptionsSecResolved, - format_desc: Option, - partition_info: Option, - ) -> Result { - let columns = derive_columns(input.schema(), out_names, &user_cols)?; - let (input, mut sink) = Self::derive_sink_desc( - input, - user_distributed_by, - name, - db_name, - sink_from_table_name, - target_table, - user_order_by, - columns, - definition, - properties, - format_desc, - partition_info, - )?; - - let unsupported_sink = - |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink))); - - // check and ensure that the sink connector is specified and supported - let sink_decouple = match sink.properties.get(CONNECTOR_TYPE_KEY) { - Some(connector) => { - match_sink_name_str!( - connector.to_lowercase().as_str(), - SinkType, - { - // the table sink is created by with properties - if connector == TABLE_SINK && sink.target_table.is_none() { - unsupported_sink(TABLE_SINK) - } else { - SinkType::set_default_commit_checkpoint_interval( - &mut sink, - &input.ctx().session_ctx().config().sink_decouple(), - )?; - SinkType::is_sink_decouple( - &input.ctx().session_ctx().config().sink_decouple(), - ) - } - }, - |other: &str| unsupported_sink(other) - )? - } - None => { - return Err( - SinkError::Config(anyhow!("connector not specified when create sink")).into(), - ); - } - }; - // For file sink, it must have sink_decouple turned on. - if !sink_decouple && sink.is_file_sink() { - return Err( - SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(), - ); - } - let log_store_type = if sink_decouple { - SinkLogStoreType::KvLogStore - } else { - SinkLogStoreType::InMemoryLogStore - }; - - Ok(Self::new(input, sink, log_store_type)) - } - fn derive_iceberg_sink_distribution( input: PlanRef, partition_info: Option, @@ -308,27 +230,67 @@ impl StreamSink { } #[allow(clippy::too_many_arguments)] - fn derive_sink_desc( + pub fn create( mut input: PlanRef, - user_distributed_by: RequiredDist, name: String, db_name: String, - sink_from_name: String, - target_table: Option, + sink_from_table_name: String, + target_table: Option>, + target_table_mapping: Option>>, + user_distributed_by: RequiredDist, user_order_by: Order, - columns: Vec, + user_cols: FixedBitSet, + out_names: Vec, definition: String, properties: WithOptionsSecResolved, format_desc: Option, partition_info: Option, - ) -> Result<(PlanRef, SinkDesc)> { + ) -> Result { let sink_type = Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; + + let columns = derive_columns(input.schema(), out_names, &user_cols)?; let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); - let mut downstream_pk = - Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + let mut downstream_pk = { + let from_properties = + Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + if let Some(t) = &target_table { + let user_defined_primary_key_table = t.row_id_index.is_none(); + let sink_is_append_only = + sink_type == SinkType::AppendOnly || sink_type == SinkType::ForceAppendOnly; + + if !user_defined_primary_key_table && !sink_is_append_only { + return Err(RwError::from(ErrorCode::BindError( + "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), + ))); + } + + if t.append_only && !sink_is_append_only { + return Err(RwError::from(ErrorCode::BindError( + "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), + ))); + } + + if sink_type != SinkType::Upsert { + vec![] + } else { + let target_table_mapping = target_table_mapping.unwrap(); + t.pk() + .iter() + .map(|c| { + target_table_mapping[c.column_index].ok_or( + ErrorCode::SinkError(Box::new(Error::new(ErrorKind::InvalidInput, + "When using non append only sink into table, the primary key of the table must be included in the sink result.".to_string() + ))).into())}) + .try_collect::<_, _, RwError>()? + } + } else { + from_properties + } + }; let mut extra_partition_col_idx = None; + let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => { @@ -392,11 +354,11 @@ impl StreamSink { CreateType::Foreground }; let (properties, secret_refs) = properties.into_parts(); - let sink_desc = SinkDesc { + let mut sink_desc = SinkDesc { id: SinkId::placeholder(), name, db_name, - sink_from_name, + sink_from_name: sink_from_table_name, definition, columns, plan_pk: pk, @@ -406,11 +368,56 @@ impl StreamSink { secret_refs, sink_type, format_desc, - target_table, + target_table: target_table.as_ref().map(|catalog| catalog.id()), extra_partition_col_idx, create_type, }; - Ok((input, sink_desc)) + + let unsupported_sink = + |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink))); + + // check and ensure that the sink connector is specified and supported + let sink_decouple = match sink_desc.properties.get(CONNECTOR_TYPE_KEY) { + Some(connector) => { + match_sink_name_str!( + connector.to_lowercase().as_str(), + SinkType, + { + // the table sink is created by with properties + if connector == TABLE_SINK && sink_desc.target_table.is_none() { + unsupported_sink(TABLE_SINK) + } else { + SinkType::set_default_commit_checkpoint_interval( + &mut sink_desc, + &input.ctx().session_ctx().config().sink_decouple(), + )?; + SinkType::is_sink_decouple( + &input.ctx().session_ctx().config().sink_decouple(), + ) + } + }, + |other: &str| unsupported_sink(other) + )? + } + None => { + return Err( + SinkError::Config(anyhow!("connector not specified when create sink")).into(), + ); + } + }; + // For file sink, it must have sink_decouple turned on. + if !sink_decouple && sink_desc.is_file_sink() { + return Err( + SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(), + ); + } + let log_store_type = if sink_decouple { + SinkLogStoreType::KvLogStore + } else { + SinkLogStoreType::InMemoryLogStore + }; + + Ok(Self::new(input, sink_desc, log_store_type)) } fn is_user_defined_append_only(properties: &WithOptionsSecResolved) -> Result { @@ -572,16 +579,11 @@ impl Distill for StreamSink { vec.push(("type", Pretty::from(sink_type))); vec.push(("columns", column_names)); if self.sink_desc.sink_type.is_upsert() { - let pk = IndicesDisplay { - indices: &self - .sink_desc - .plan_pk - .iter() - .map(|k| k.column_index) - .collect_vec(), + let sink_pk = IndicesDisplay { + indices: &self.sink_desc.downstream_pk.clone(), schema: self.base.schema(), }; - vec.push(("pk", pk.distill())); + vec.push(("downstream_pk", sink_pk.distill())); } childless_record("StreamSink", vec) }