Skip to content

Commit

Permalink
fix(backfill): fix arrangement backfill's target size for i2o_mapping…
Browse files Browse the repository at this point in the history
… of replicated rows (#15697)
  • Loading branch information
kwannoel authored Mar 18, 2024
1 parent af79c28 commit 23f2531
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
7 changes: 6 additions & 1 deletion src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ impl ColIndexMapping {
/// `(0..target_size)`. Each subscript is mapped to the corresponding element.
pub fn new(map: Vec<Option<usize>>, target_size: usize) -> Self {
if let Some(target_max) = map.iter().filter_map(|x| *x).max_by_key(|x| *x) {
assert!(target_max < target_size)
assert!(
target_max < target_size,
"target_max: {}, target_size: {}",
target_max,
target_size
);
};
Self { target_size, map }
}
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,15 +409,16 @@ where
.collect_vec();

// Compute i2o mapping
// Note that this can be a partial mapping, since we use the i2o mapping to get
// any 1 of the output columns, and use that to fill the input column.
let mut i2o_mapping = vec![None; columns.len()];
let mut output_column_indices = vec![];
for (i, column) in columns.iter().enumerate() {
if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
i2o_mapping[i] = Some(*pos);
output_column_indices.push(i);
}
}
let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_indices.len());
// We can prune any duplicate column indices
let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());

// Compute output indices
let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
Expand Down

0 comments on commit 23f2531

Please sign in to comment.