diff --git a/proto/plan_common.proto b/proto/plan_common.proto index c85ea9ef8a60..a914ab9d2da2 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -61,7 +61,7 @@ message ColumnDesc { ColumnDescVersion version = 10; - AdditionalColumn additional_columns = 11; + AdditionalColumn additional_column = 11; } message ColumnCatalog { diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 09fd3db09561..f82e96a80c0e 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -103,7 +103,7 @@ pub struct ColumnDesc { pub type_name: String, pub generated_or_default_column: Option, pub description: Option, - pub additional_columns: AdditionalColumn, + pub additional_column: AdditionalColumn, pub version: ColumnDescVersion, } @@ -117,7 +117,7 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -131,7 +131,7 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -150,7 +150,7 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, - additional_columns: additional_column_type, + additional_column: additional_column_type, version: ColumnDescVersion::Pr13707, } } @@ -170,7 +170,7 @@ impl ColumnDesc { type_name: self.type_name.clone(), generated_or_default_column: self.generated_or_default_column.clone(), description: self.description.clone(), - additional_columns: Some(self.additional_columns.clone()), + additional_column: Some(self.additional_column.clone()), version: self.version as i32, } } @@ -198,7 +198,7 @@ impl ColumnDesc { type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -221,7 +221,7 @@ impl ColumnDesc { type_name: type_name.to_string(), generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -239,7 +239,7 @@ impl ColumnDesc { type_name: field.type_name.clone(), description: None, generated_or_default_column: None, - additional_columns: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -265,8 +265,8 @@ impl ColumnDesc { impl From for ColumnDesc { fn from(prost: PbColumnDesc) -> Self { - let additional_columns = prost - .get_additional_columns() + let additional_column = prost + .get_additional_column() .unwrap_or(&AdditionalColumn { column_type: None }) .clone(); let version = prost.version(); @@ -283,7 +283,7 @@ impl From for ColumnDesc { field_descs, generated_or_default_column: prost.generated_or_default_column, description: prost.description.clone(), - additional_columns, + additional_column, version, } } @@ -305,7 +305,7 @@ impl From<&ColumnDesc> for PbColumnDesc { type_name: c.type_name.clone(), generated_or_default_column: c.generated_or_default_column.clone(), description: c.description.clone(), - additional_columns: c.additional_columns.clone().into(), + additional_column: c.additional_column.clone().into(), version: c.version as i32, } } diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs index ca154b9bf0b0..9930a5717b84 100644 --- a/src/common/src/catalog/test_utils.rs +++ b/src/common/src/catalog/test_utils.rs @@ -35,7 +35,7 @@ impl ColumnDescTestExt for ColumnDesc { column_type: Some(data_type), column_id, name: name.to_string(), - additional_columns: Some(AdditionalColumn { column_type: None }), + additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, ..Default::default() } @@ -60,7 +60,7 @@ impl ColumnDescTestExt for ColumnDesc { field_descs: fields, generated_or_default_column: None, description: None, - additional_columns: Some(AdditionalColumn { column_type: None }), + additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, } } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index a6c5c6fbef5d..8d2d4265883e 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -61,7 +61,7 @@ fn avro_field_to_column_desc( type_name: schema_name.to_string(), generated_or_default_column: None, description: None, - additional_columns: Some(AdditionalColumn { column_type: None }), + additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, }) } @@ -71,7 +71,7 @@ fn avro_field_to_column_desc( column_type: Some(data_type.to_protobuf()), column_id: *index, name: name.to_owned(), - additional_columns: Some(AdditionalColumn { column_type: None }), + additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, ..Default::default() }) diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 87aead865c6d..a1bf9c997786 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -567,7 +567,7 @@ mod tests { column_type: SourceColumnType::Normal, is_pk: false, is_hidden_addition_col: false, - additional_column_type: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, }, SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)), SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)), diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 8480138e02c0..053b9eb1fc2b 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -582,7 +582,7 @@ mod tests { column_type: SourceColumnType::Normal, is_pk: true, is_hidden_addition_col: false, - additional_column_type: AdditionalColumn { + additional_column: AdditionalColumn { column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})), }, }; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 7884eb62bf13..bcc946c28cd1 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -318,7 +318,7 @@ impl SourceStreamChunkRowWriter<'_> { mut f: impl FnMut(&SourceColumnDesc) -> AccessResult, ) -> AccessResult<()> { let mut wrapped_f = |desc: &SourceColumnDesc| { - match (&desc.column_type, &desc.additional_column_type.column_type) { + match (&desc.column_type, &desc.additional_column.column_type) { (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => { // SourceColumnType is for CDC source only. Ok(A::output_for( diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 13da5c5b86b2..4793c3d08f5c 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -180,7 +180,7 @@ impl ProtobufParserConfig { type_name: m.full_name().to_string(), generated_or_default_column: None, description: None, - additional_columns: Some(AdditionalColumn { column_type: None }), + additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, }) } else { @@ -189,7 +189,7 @@ impl ProtobufParserConfig { column_id: *index, name: field_descriptor.name().to_string(), column_type: Some(field_type.to_protobuf()), - additional_columns: Some(AdditionalColumn { column_type: None }), + additional_column: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, ..Default::default() }) diff --git a/src/connector/src/parser/unified/upsert.rs b/src/connector/src/parser/unified/upsert.rs index 8fbed9dc2ac5..9129f0d16d86 100644 --- a/src/connector/src/parser/unified/upsert.rs +++ b/src/connector/src/parser/unified/upsert.rs @@ -105,7 +105,7 @@ where } fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult { - match desc.additional_column_type.column_type { + match desc.additional_column.column_type { Some(AdditionalColumnType::Key(_)) => { if let Some(key_as_column_name) = &self.key_column_name && &desc.name == key_as_column_name diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index fbd82f6f3c16..4f4c23f881ec 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -54,7 +54,7 @@ async fn build_accessor_builder( pub fn get_key_column_name(columns: &[SourceColumnDesc]) -> Option { columns.iter().find_map(|column| { if matches!( - column.additional_column_type.column_type, + column.additional_column.column_type, Some(AdditionalColumnType::Key(_)) ) { Some(column.name.clone()) diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 2a0ef8babe99..691590e361cd 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -198,7 +198,7 @@ impl CommonSplitReader for KafkaSplitReader { // ingest kafka message header can be expensive, do it only when required let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| { matches!( - col_desc.additional_column_type.column_type, + col_desc.additional_column.column_type, Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_)) ) }); diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index 049515d6091a..a5584f6af83d 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -37,10 +37,10 @@ pub struct SourceColumnDesc { /// `is_hidden_addition_col` is used to indicate whether the column is a hidden addition column. pub is_hidden_addition_col: bool, - /// `additional_column_type` and `column_type` are orthogonal - /// `additional_column_type` is used to indicate the column is from which part of the message + /// `additional_column` and `column_type` are orthogonal + /// `additional_column` is used to indicate the column is from which part of the message /// `column_type` is used to indicate the type of the column, only used in cdc scenario - pub additional_column_type: AdditionalColumn, + pub additional_column: AdditionalColumn, } /// `SourceColumnType` is used to indicate the type of a column emitted by the Source. @@ -91,7 +91,7 @@ impl SourceColumnDesc { column_type: SourceColumnType::Normal, is_pk: false, is_hidden_addition_col: false, - additional_column_type: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, } } @@ -131,7 +131,7 @@ impl From<&ColumnDesc> for SourceColumnDesc { column_type, is_pk: false, is_hidden_addition_col: false, - additional_column_type: c.additional_columns.clone(), + additional_column: c.additional_column.clone(), } } } @@ -146,7 +146,7 @@ impl From<&SourceColumnDesc> for ColumnDesc { type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_columns: s.additional_column_type.clone(), + additional_column: s.additional_column.clone(), version: ColumnDescVersion::Pr13707, } } diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index e049be8bbe94..79ba66a0f7e3 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -139,7 +139,7 @@ impl SourceDescBuilder { // Check if partition/file/offset columns are included explicitly. for col in &self.columns { - match col.column_desc.as_ref().unwrap().get_additional_columns() { + match col.column_desc.as_ref().unwrap().get_additional_column() { Ok(AdditionalColumn { column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)), }) => { diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index a50eec922143..7cd9eab4c1be 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -703,7 +703,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, }) } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 37e79664c666..965105e1e710 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -694,7 +694,7 @@ mod tests { type_name: ".test.Country".to_string(), description: None, generated_or_default_column: None, - additional_columns: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, }, is_hidden: false diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 5f25d12650f0..abe046f08fce 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -685,7 +685,7 @@ pub(crate) async fn bind_source_pk( // return the key column names if exists columns.iter().find_map(|catalog| { if matches!( - catalog.column_desc.additional_columns.column_type, + catalog.column_desc.additional_column.column_type, Some(AdditionalColumnType::Key(_)) ) { Some(catalog.name().to_string()) @@ -697,7 +697,7 @@ pub(crate) async fn bind_source_pk( let additional_column_names = columns .iter() .filter_map(|col| { - if col.column_desc.additional_columns.column_type.is_some() { + if col.column_desc.additional_column.column_type.is_some() { Some(col.name().to_string()) } else { None @@ -848,7 +848,7 @@ fn check_and_add_timestamp_column( if is_kafka_connector(with_properties) { if columns.iter().any(|col| { matches!( - col.column_desc.additional_columns.column_type, + col.column_desc.additional_column.column_type, Some(AdditionalColumnType::Timestamp(_)) ) }) { diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ee871bc68702..b69d2e5cebbd 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -214,7 +214,7 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result> type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { column_type: None }, + additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, }, is_hidden: false, diff --git a/src/storage/src/row_serde/mod.rs b/src/storage/src/row_serde/mod.rs index 2668d75bc1b9..4cb6696cce93 100644 --- a/src/storage/src/row_serde/mod.rs +++ b/src/storage/src/row_serde/mod.rs @@ -95,7 +95,7 @@ mod test { type_name: "", generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { + additional_column: AdditionalColumn { column_type: None, }, version: Pr13707, @@ -108,7 +108,7 @@ mod test { type_name: "", generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { + additional_column: AdditionalColumn { column_type: None, }, version: Pr13707, @@ -141,7 +141,7 @@ mod test { type_name: "", generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { + additional_column: AdditionalColumn { column_type: None, }, version: Pr13707, @@ -154,7 +154,7 @@ mod test { type_name: "", generated_or_default_column: None, description: None, - additional_columns: AdditionalColumn { + additional_column: AdditionalColumn { column_type: None, }, version: Pr13707, diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 7df7cc2ea837..8cdba698bec1 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -71,7 +71,7 @@ pub fn get_split_offset_col_idx( let mut split_idx = None; let mut offset_idx = None; for (idx, column) in column_descs.iter().enumerate() { - match column.additional_column_type { + match column.additional_column { AdditionalColumn { column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)), } => { diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index a99be097e881..28d923ffb69c 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -96,7 +96,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { // the column is from a legacy version && desc.version == ColumnDescVersion::Unspecified as i32 { - desc.additional_columns = Some(AdditionalColumn { + desc.additional_column = Some(AdditionalColumn { column_type: Some(AdditionalColumnType::Key( AdditionalColumnKey {}, )), @@ -110,7 +110,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { { // compatible code: handle legacy column `_rw_kafka_timestamp` // the column is auto added for all kafka source to empower batch query on source - // solution: rewrite the column `additional_columns` to Timestamp + // solution: rewrite the column `additional_column` to Timestamp let _ = source_columns.iter_mut().map(|c| { let _ = c.column_desc.as_mut().map(|desc| { @@ -125,7 +125,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { // the column is from a legacy version && desc.version == ColumnDescVersion::Unspecified as i32 { - desc.additional_columns = Some(AdditionalColumn { + desc.additional_column = Some(AdditionalColumn { column_type: Some(AdditionalColumnType::Timestamp( AdditionalColumnTimestamp {}, )),