diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 1cee14cd6eb52..2b51414df44b3 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -345,6 +345,7 @@ where debug_assert!(builders.values().all(|b| b.is_empty())); let (_, snapshot) = backfill_stream.into_inner(); let mut remaining_vnodes = vnodes.as_ref().clone(); + let mut yielded = false; #[for_await] for msg in snapshot { @@ -369,14 +370,44 @@ where // for that vnode once we have read at least 1 record from it. let vnode_idx = vnode.to_index(); if !remaining_vnodes.is_set(vnode_idx) { - let new_pos = row.project(&pk_in_output_indices); - assert_eq!(new_pos.len(), pk_in_output_indices.len()); - backfill_state.update_progress( - vnode, - new_pos.to_owned_row(), - false, - 0, - )?; + // FIXME(kwannoel): This is not needed. + // The main point of reading the snapshot here is by + // skip over long tombstone ranges, not to guarantee that snapshot read makes progress. + // Because skipping over long tombstone ranges is a transient step, + // once we skip over the initial run, we don't need to do it again. + // + // If we hit this code section, it means snapshot read is really slow. + // Even if we yielded a record here, the backfill progress will still be slow, + // since we only return one record per epoch. + // Consider barrier interval at 1s. + // And upstream has 1M records. + // So we take 1M seconds to finish backfill, which is 11.57 days. + // So that's basically the same as never finishing backfill. + if !yielded { + let builder = builders.get_mut(&vnode).unwrap(); + if let Some(chunk) = builder.append_one_row(row) { + yield Message::Chunk(Self::handle_snapshot_chunk( + chunk, + vnode, + &pk_in_output_indices, + &mut backfill_state, + &mut cur_barrier_snapshot_processed_rows, + &mut total_snapshot_processed_rows, + &self.output_indices, + )?); + } + yielded = true; + } else { + let new_pos = row.project(&pk_in_output_indices); + assert_eq!(new_pos.len(), pk_in_output_indices.len()); + backfill_state.update_progress( + vnode, + new_pos.to_owned_row(), + false, + 0, + )?; + } + remaining_vnodes.set_bit(vnode_idx); if remaining_vnodes.is_empty() { break;