diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index a10d859c11ef2..5edb050a5e484 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -284,11 +284,16 @@ pub fn source_add_partition_offset_cols( skip_col_id: bool, ) -> ([bool; 2], [ColumnDesc; 2]) { let mut columns_exist = [false; 2]; - let mut last_column_id = if skip_col_id { - // col id will be filled outside later. Here just use a placeholder. - ColumnId::new(0) - } else { - max_column_id(columns) + + let mut last_column_id = max_column_id(columns); + let mut assign_col_id = || { + if skip_col_id { + // col id will be filled outside later. Here just use a placeholder. + ColumnId::placeholder() + } else { + last_column_id = last_column_id.next(); + last_column_id + } }; let additional_columns: Vec<_> = { @@ -298,11 +303,10 @@ pub fn source_add_partition_offset_cols( ["partition", "file", "offset"] .iter() .filter_map(|col_type| { - last_column_id = last_column_id.next(); if compat_col_types.contains(col_type) { Some( build_additional_column_desc( - last_column_id, + assign_col_id(), connector_name, col_type, None, diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 49fdaf71a1c67..2fd87cc4dcae6 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1548,7 +1548,7 @@ pub async fn bind_create_source_or_table_with_connector( let (columns_exist, additional_columns) = source_add_partition_offset_cols( &columns, &with_properties.get_connector().unwrap(), - true, + true, // col_id filled below at col_id_gen.generate ); for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { if !existed {