From 317b36108cd41b767ee376e91c8bfabe6174591e Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Wed, 24 Jan 2024 18:36:15 +0800 Subject: [PATCH] fix: missing columns in format_encode_options (#14771) --- .../src/from_proto/source/trad_source.rs | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 29c7ba5e0158a..cb6f7536aa316 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -16,7 +16,9 @@ use risingwave_common::catalog::{ default_key_column_name_version_mapping, ColumnId, TableId, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_connector::source::reader::desc::SourceDescBuilder; -use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts}; +use risingwave_connector::source::{ + should_copy_to_format_encode_options, ConnectorProperties, SourceCtrlOpts, UPSTREAM_SOURCE_KEY, +}; use risingwave_pb::data::data_type::TypeName as PbTypeName; use risingwave_pb::plan_common::{ AdditionalColumnType, ColumnDescVersion, FormatType, PbEncodeType, @@ -52,7 +54,23 @@ impl ExecutorBuilder for SourceExecutorBuilder { let executor = { let source_id = TableId::new(source.source_id); let source_name = source.source_name.clone(); - let source_info = source.get_info()?; + let mut source_info = source.get_info()?.clone(); + + if source_info.format_encode_options.is_empty() { + // compatible code: quick fix for , + // will move the logic to FragmentManager::init in release 1.7. + let connector = source + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .unwrap_or(&String::default()) + .to_owned(); + source_info.format_encode_options.extend( + source.with_properties.iter().filter_map(|(k, v)| { + should_copy_to_format_encode_options(k, &connector) + .then_some((k.to_owned(), v.to_owned())) + }), + ); + } let mut source_columns = source.columns.clone(); @@ -113,7 +131,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { params.env.source_metrics(), source.row_id_index.map(|x| x as _), source.with_properties.clone(), - source_info.clone(), + source_info, params.env.connector_params(), params.env.config().developer.connector_message_buffer_size, // `pk_indices` is used to ensure that a message will be skipped instead of parsed