diff --git a/src/common/src/util/column_index_mapping.rs b/src/common/src/util/column_index_mapping.rs index 3a6785f64e0ad..a4b3cf50d9015 100644 --- a/src/common/src/util/column_index_mapping.rs +++ b/src/common/src/util/column_index_mapping.rs @@ -36,7 +36,12 @@ impl ColIndexMapping { /// `(0..target_size)`. Each subscript is mapped to the corresponding element. pub fn new(map: Vec>, target_size: usize) -> Self { if let Some(target_max) = map.iter().filter_map(|x| *x).max_by_key(|x| *x) { - assert!(target_max < target_size) + assert!( + target_max < target_size, + "target_max: {}, target_size: {}", + target_max, + target_size + ); }; Self { target_size, map } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 4a2fc89c80468..eaead15d11e4d 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -409,15 +409,16 @@ where .collect_vec(); // Compute i2o mapping + // Note that this can be a partial mapping, since we use the i2o mapping to get + // any 1 of the output columns, and use that to fill the input column. let mut i2o_mapping = vec![None; columns.len()]; - let mut output_column_indices = vec![]; for (i, column) in columns.iter().enumerate() { if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) { i2o_mapping[i] = Some(*pos); - output_column_indices.push(i); } } - let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_indices.len()); + // We can prune any duplicate column indices + let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len()); // Compute output indices let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);