diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index c1be4b811cd0d..f93906898b2c4 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -346,6 +346,11 @@ where debug_assert!(builders.values().all(|b| b.is_empty())); let (_, snapshot) = backfill_stream.into_inner(); let mut remaining_vnodes = vnode_bitset.clone(); + // Records which were purely used to update the current pos, + // to persist the tombstone iteration progress. + // They need to be buffered in case the snapshot read completes, + // Then we must yield them downstream. + let mut skip_buffer = Vec::with_capacity(1000); let mut yielded = false; #[for_await] @@ -358,6 +363,20 @@ where // End of the snapshot read stream. // We let the barrier handling logic take care of upstream updates. // But we still want to exit backfill loop, so we mark snapshot read complete. + for (vnode, row) in skip_buffer { + let builder = builders.get_mut(&vnode).unwrap(); + if let Some(chunk) = builder.append_one_row(row) { + yield Message::Chunk(Self::handle_snapshot_chunk( + chunk, + vnode, + &pk_in_output_indices, + &mut backfill_state, + &mut cur_barrier_snapshot_processed_rows, + &mut total_snapshot_processed_rows, + &self.output_indices, + )?); + } + } snapshot_read_complete = true; break; } @@ -400,6 +419,7 @@ where } yielded = true; } else { + skip_buffer.push((vnode, row.clone())); let new_pos = row.project(&pk_in_output_indices); assert_eq!(new_pos.len(), pk_in_output_indices.len()); backfill_state.update_progress( @@ -415,6 +435,7 @@ where break; } } else { + skip_buffer.push((vnode, row.clone())); continue; } }