diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 8234188e9a4fc..0538dbf1535ee 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -13,7 +13,9 @@ // limitations under the License. use risingwave_common::catalog::{default_key_column_name_version_mapping, ColumnId, TableId}; -use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts}; +use risingwave_connector::source::{ + should_copy_to_format_encode_options, ConnectorProperties, SourceCtrlOpts, UPSTREAM_SOURCE_KEY, +}; use risingwave_pb::data::data_type::TypeName as PbTypeName; use risingwave_pb::plan_common::{ AdditionalColumnType, ColumnDescVersion, FormatType, PbEncodeType, @@ -52,7 +54,23 @@ impl ExecutorBuilder for SourceExecutorBuilder { let executor = { let source_id = TableId::new(source.source_id); let source_name = source.source_name.clone(); - let source_info = source.get_info()?; + let mut source_info = source.get_info()?.clone(); + + if source_info.format_encode_options.is_empty() { + // compatible code: quick fix for , + // will move the logic to FragmentManager::init in release 1.7. + let connector = source + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .unwrap_or(&String::default()) + .to_owned(); + source_info.format_encode_options.extend( + source.with_properties.iter().filter_map(|(k, v)| { + should_copy_to_format_encode_options(k, &connector) + .then_some((k.to_owned(), v.to_owned())) + }), + ); + } let mut source_columns = source.columns.clone(); @@ -88,7 +106,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { params.env.source_metrics(), source.row_id_index.map(|x| x as _), source.with_properties.clone(), - source_info.clone(), + source_info, params.env.connector_params(), params.env.config().developer.connector_message_buffer_size, // `pk_indices` is used to ensure that a message will be skipped instead of parsed