Skip to content

Commit

Permalink
optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 16, 2024
1 parent 0072373 commit f51e190
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ where
debug_assert!(builders.values().all(|b| b.is_empty()));
let (_, snapshot) = backfill_stream.into_inner();
let mut remaining_vnodes = vnode_bitset.clone();
// Records which were purely used to update the current pos,
// to persist the tombstone iteration progress.
// They need to be buffered in case the snapshot read completes,
// Then we must yield them downstream.
let mut skip_buffer = Vec::with_capacity(1000);
let mut yielded = false;

#[for_await]
Expand All @@ -358,6 +363,20 @@ where
// End of the snapshot read stream.
// We let the barrier handling logic take care of upstream updates.
// But we still want to exit backfill loop, so we mark snapshot read complete.
for (vnode, row) in skip_buffer {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Message::Chunk(Self::handle_snapshot_chunk(
chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?);
}
}
snapshot_read_complete = true;
break;
}
Expand Down Expand Up @@ -400,6 +419,7 @@ where
}
yielded = true;
} else {
skip_buffer.push((vnode, row.clone()));
let new_pos = row.project(&pk_in_output_indices);
assert_eq!(new_pos.len(), pk_in_output_indices.len());
backfill_state.update_progress(
Expand All @@ -415,6 +435,7 @@ where
break;
}
} else {
skip_buffer.push((vnode, row.clone()));
continue;
}
}
Expand Down

0 comments on commit f51e190

Please sign in to comment.