From 7aa52fa5e12c037a2e8487c91682baffc829564a Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 25 Nov 2024 12:48:24 +0800 Subject: [PATCH] fix conflict --- src/frontend/src/handler/create_sink.rs | 2 +- src/frontend/src/optimizer/mod.rs | 9 ++++----- src/frontend/src/optimizer/plan_node/stream_sink.rs | 7 +------ 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 8d58237411477..978eb12265377 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -28,7 +28,7 @@ use risingwave_common::catalog::{ use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; -use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc}; use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index db671b6d153a6..a85a7a5faedc2 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -961,10 +961,9 @@ impl PlanRoot { self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?; assert_eq!(self.phase, PlanPhase::Stream); assert_eq!(stream_plan.convention(), Convention::Stream); - let target_columns_to_plan_mapping = target_table.as_ref().map(|t| { - let columns = t.columns_without_rw_timestamp(); - self.target_columns_to_plan_mapping(&columns, user_specified_columns) - }); + let target_columns_to_plan_mapping = target_table + .as_ref() + .map(|t| self.target_columns_to_plan_mapping(t.columns(), user_specified_columns)); StreamSink::create( stream_plan, sink_name, @@ -1024,7 +1023,7 @@ impl PlanRoot { tar_cols .iter() .enumerate() - .filter(|(_, tar_col)| tar_col.can_dml()) + .filter(|(_, tar_col)| !tar_col.is_generated()) .map(|(tar_i, tar_col)| { if user_specified_columns { visible_col_idxes_by_name.get(tar_col.name()).cloned() diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 74ce8587ef6d4..238b3d24d1ff6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -405,12 +405,7 @@ impl StreamSink { ); } }; - // For file sink, it must have sink_decouple turned on. - if !sink_decouple && sink_desc.is_file_sink() { - return Err( - SinkError::Config(anyhow!("File sink can only be created with sink_decouple enabled. Please run `set sink_decouple = true` first.")).into(), - ); - } + let log_store_type = if sink_decouple { SinkLogStoreType::KvLogStore } else {