Skip to content

Commit

Permalink
fix: missing columns in format_encode_options (#14771)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 authored Jan 24, 2024
1 parent c67e4e4 commit 317b361
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use risingwave_common::catalog::{
default_key_column_name_version_mapping, ColumnId, TableId, KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_connector::source::reader::desc::SourceDescBuilder;
use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts};
use risingwave_connector::source::{
should_copy_to_format_encode_options, ConnectorProperties, SourceCtrlOpts, UPSTREAM_SOURCE_KEY,
};
use risingwave_pb::data::data_type::TypeName as PbTypeName;
use risingwave_pb::plan_common::{
AdditionalColumnType, ColumnDescVersion, FormatType, PbEncodeType,
Expand Down Expand Up @@ -52,7 +54,23 @@ impl ExecutorBuilder for SourceExecutorBuilder {
let executor = {
let source_id = TableId::new(source.source_id);
let source_name = source.source_name.clone();
let source_info = source.get_info()?;
let mut source_info = source.get_info()?.clone();

if source_info.format_encode_options.is_empty() {
// compatible code: quick fix for <https://github.com/risingwavelabs/risingwave/issues/14755>,
// will move the logic to FragmentManager::init in release 1.7.
let connector = source
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.unwrap_or(&String::default())
.to_owned();
source_info.format_encode_options.extend(
source.with_properties.iter().filter_map(|(k, v)| {
should_copy_to_format_encode_options(k, &connector)
.then_some((k.to_owned(), v.to_owned()))
}),
);
}

let mut source_columns = source.columns.clone();

Expand Down Expand Up @@ -113,7 +131,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
params.env.source_metrics(),
source.row_id_index.map(|x| x as _),
source.with_properties.clone(),
source_info.clone(),
source_info,
params.env.connector_params(),
params.env.config().developer.connector_message_buffer_size,
// `pk_indices` is used to ensure that a message will be skipped instead of parsed
Expand Down

0 comments on commit 317b361

Please sign in to comment.