Skip to content

Commit

Permalink
add append one row
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 14, 2024
1 parent 3e61c44 commit f9dd01e
Showing 1 changed file with 39 additions and 8 deletions.
47 changes: 39 additions & 8 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ where
debug_assert!(builders.values().all(|b| b.is_empty()));
let (_, snapshot) = backfill_stream.into_inner();
let mut remaining_vnodes = vnodes.as_ref().clone();
let mut yielded = false;

#[for_await]
for msg in snapshot {
Expand All @@ -369,14 +370,44 @@ where
// for that vnode once we have read at least 1 record from it.
let vnode_idx = vnode.to_index();
if !remaining_vnodes.is_set(vnode_idx) {
let new_pos = row.project(&pk_in_output_indices);
assert_eq!(new_pos.len(), pk_in_output_indices.len());
backfill_state.update_progress(
vnode,
new_pos.to_owned_row(),
false,
0,
)?;
// FIXME(kwannoel): This is not needed.
// The main point of reading the snapshot here is by
// skip over long tombstone ranges, not to guarantee that snapshot read makes progress.
// Because skipping over long tombstone ranges is a transient step,
// once we skip over the initial run, we don't need to do it again.
//
// If we hit this code section, it means snapshot read is really slow.
// Even if we yielded a record here, the backfill progress will still be slow,
// since we only return one record per epoch.
// Consider barrier interval at 1s.
// And upstream has 1M records.
// So we take 1M seconds to finish backfill, which is 11.57 days.
// So that's basically the same as never finishing backfill.
if !yielded {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Message::Chunk(Self::handle_snapshot_chunk(
chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?);
}
yielded = true;
} else {
let new_pos = row.project(&pk_in_output_indices);
assert_eq!(new_pos.len(), pk_in_output_indices.len());
backfill_state.update_progress(
vnode,
new_pos.to_owned_row(),
false,
0,
)?;
}

remaining_vnodes.set_bit(vnode_idx);
if remaining_vnodes.is_empty() {
break;
Expand Down

0 comments on commit f9dd01e

Please sign in to comment.