diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 7920e8dceee80..7ff93258d5829 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -15,8 +15,9 @@ use std::collections::HashMap; use either::Either; +use futures::stream; use futures::stream::{select_all, select_with_strategy}; -use futures::{stream, TryStreamExt}; +use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{DataChunk, Op}; use risingwave_common::bail; @@ -30,8 +31,9 @@ use crate::common::table::state_table::ReplicatedStateTable; use crate::executor::backfill::utils::METADATA_STATE_LEN; use crate::executor::backfill::utils::{ compute_bounds, create_builder, create_limiter, get_progress_per_vnode, mapping_chunk, - mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, + mapping_message, mark_chunk_ref_by_vnode, owned_row_iter_with_vnode, persist_state_per_vnode, update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState, + RowAndVnodeStreamItem, }; use crate::executor::prelude::*; use crate::task::CreateMviewProgress; @@ -309,7 +311,7 @@ where .inc_by(cur_barrier_upstream_processed_rows); break 'backfill_loop; } - Some((vnode, row)) => { + Some(RowAndVnodeStreamItem::Record { vnode, row }) => { let builder = builders.get_mut(&vnode).unwrap(); if let Some(chunk) = builder.append_one_row(row) { yield Message::Chunk(Self::handle_snapshot_chunk( @@ -323,6 +325,26 @@ where )?); } } + Some(RowAndVnodeStreamItem::Finished { vnode }) => { + // Consume remaining rows in the builder. + let builder = builders.get_mut(&vnode).unwrap(); + if let Some(data_chunk) = builder.consume_all() { + yield Message::Chunk(Self::handle_snapshot_chunk( + data_chunk, + vnode, + &pk_in_output_indices, + &mut backfill_state, + &mut cur_barrier_snapshot_processed_rows, + &mut total_snapshot_processed_rows, + &self.output_indices, + )?); + } + // Update state for that vnode to finished. + backfill_state.finish_progress( + vnode, + upstream_table.pk_indices().len(), + ); + } } } } @@ -354,10 +376,13 @@ where // End of the snapshot read stream. // We let the barrier handling logic take care of upstream updates. // But we still want to exit backfill loop, so we mark snapshot read complete. + // Note that we don't have to drain builders here, + // since we did not do snapshot read at all, + // so there should be no rows in the builders. snapshot_read_complete = true; break; } - Some((vnode, row)) => { + Some(RowAndVnodeStreamItem::Record { vnode, row }) => { let builder = builders.get_mut(&vnode).unwrap(); if let Some(chunk) = builder.append_one_row(row) { yield Message::Chunk(Self::handle_snapshot_chunk( @@ -373,6 +398,13 @@ where break; } + Some(RowAndVnodeStreamItem::Finished { vnode }) => { + // Update state for that vnode to finished. + backfill_state + .finish_progress(vnode, upstream_table.pk_indices().len()); + // But some other vnodes may not be finished yet. + continue; + } } } } @@ -605,7 +637,7 @@ where } } - #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] + #[try_stream(ok = Option, error = StreamExecutorError)] async fn make_snapshot_stream<'a>( upstream_table: &'a ReplicatedStateTable, backfill_state: BackfillState, @@ -682,7 +714,7 @@ where /// remaining data in `builder` must be flushed manually. /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be /// present, Then when we flush we contain duplicate rows. - #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] + #[try_stream(ok = Option, error = StreamExecutorError)] async fn snapshot_read_per_vnode( upstream_table: &ReplicatedStateTable, backfill_state: BackfillState, @@ -692,8 +724,11 @@ where let backfill_progress = backfill_state.get_progress(&vnode)?; let current_pos = match backfill_progress { BackfillProgressPerVnode::NotStarted => None, - BackfillProgressPerVnode::Completed { current_pos, .. } - | BackfillProgressPerVnode::InProgress { current_pos, .. } => { + BackfillProgressPerVnode::Completed { .. } => { + // If completed, we don't need to continue reading snapshot. + continue; + } + BackfillProgressPerVnode::InProgress { current_pos, .. } => { Some(current_pos.clone()) } }; @@ -718,12 +753,7 @@ where ) .await?; - let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter)); - - let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row)); - - let vnode_row_iter = Box::pin(vnode_row_iter); - + let vnode_row_iter = Box::pin(owned_row_iter_with_vnode(vnode, vnode_row_iter)); iterators.push(vnode_row_iter); } diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index f67442e723198..dc6fd13482542 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -666,6 +666,28 @@ pub(crate) fn compute_bounds( } } +pub(crate) enum RowAndVnodeStreamItem { + Finished { vnode: VirtualNode }, + Record { vnode: VirtualNode, row: OwnedRow }, +} + +#[try_stream(ok = RowAndVnodeStreamItem, error = StreamExecutorError)] +pub(crate) async fn owned_row_iter_with_vnode(vnode: VirtualNode, storage_iter: S) +where + StreamExecutorError: From, + S: Stream, E>>, +{ + pin_mut!(storage_iter); + while let Some(row) = storage_iter.next().await { + let row = row?; + yield RowAndVnodeStreamItem::Record { + vnode, + row: row.into_owned_row(), + } + } + yield RowAndVnodeStreamItem::Finished { vnode } +} + #[try_stream(ok = OwnedRow, error = StreamExecutorError)] pub(crate) async fn owned_row_iter(storage_iter: S) where