diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index ace8946ef7d29..b12146a65683b 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -17,13 +17,12 @@ use std::sync::Arc; use either::Either; use futures::stream::{select_all, select_with_strategy}; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::{stream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; -use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::store::PrefetchOptions; @@ -34,8 +33,8 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; use crate::executor::backfill::utils::METADATA_STATE_LEN; use crate::executor::backfill::utils::{ compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message, - mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode, - BackfillProgressPerVnode, BackfillState, + mark_chunk_ref_by_vnode, owned_row_iter_with_vnode, persist_state_per_vnode, + update_pos_by_vnode, BackfillProgressPerVnode, BackfillState, RowAndVnodeStreamItem, }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ @@ -323,7 +322,7 @@ where .inc_by(cur_barrier_upstream_processed_rows); break 'backfill_loop; } - Some((vnode, row)) => { + Some(RowAndVnodeStreamItem::Record { vnode, row }) => { let builder = builders.get_mut(&vnode).unwrap(); if let Some(chunk) = builder.append_one_row(row) { yield Self::handle_snapshot_chunk( @@ -337,6 +336,26 @@ where )?; } } + Some(RowAndVnodeStreamItem::Finished { vnode }) => { + // Consume remaining rows in the builder. + let builder = builders.get_mut(&vnode).unwrap(); + if let Some(data_chunk) = builder.consume_all() { + yield Self::handle_snapshot_chunk( + data_chunk, + vnode, + &pk_in_output_indices, + &mut backfill_state, + &mut cur_barrier_snapshot_processed_rows, + &mut total_snapshot_processed_rows, + &self.output_indices, + )?; + } + // Update state for that vnode to finished. + backfill_state.finish_progress( + vnode, + upstream_table.pk_indices().len(), + ); + } } } } @@ -360,10 +379,13 @@ where // End of the snapshot read stream. // We let the barrier handling logic take care of upstream updates. // But we still want to exit backfill loop, so we mark snapshot read complete. + // Note that we don't have to drain builders here, + // since we did not do snapshot read at all, + // so there should be no rows in the builders. snapshot_read_complete = true; break; } - Some((vnode, row)) => { + Some(RowAndVnodeStreamItem::Record { vnode, row }) => { let builder = builders.get_mut(&vnode).unwrap(); if let Some(chunk) = builder.append_one_row(row) { yield Self::handle_snapshot_chunk( @@ -379,6 +401,13 @@ where break; } + Some(RowAndVnodeStreamItem::Finished { vnode }) => { + // Update state for that vnode to finished. + backfill_state + .finish_progress(vnode, upstream_table.pk_indices().len()); + // But some other vnodes may not be finished yet. + continue; + } } } } @@ -584,7 +613,7 @@ where } } - #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] + #[try_stream(ok = Option, error = StreamExecutorError)] async fn make_snapshot_stream( upstream_table: &ReplicatedStateTable, backfill_state: BackfillState, @@ -656,7 +685,7 @@ where /// remaining data in `builder` must be flushed manually. /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be /// present, Then when we flush we contain duplicate rows. - #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] + #[try_stream(ok = Option, error = StreamExecutorError)] async fn snapshot_read_per_vnode( upstream_table: &ReplicatedStateTable, backfill_state: BackfillState, @@ -695,12 +724,7 @@ where ) .await?; - let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter)); - - let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row)); - - let vnode_row_iter = Box::pin(vnode_row_iter); - + let vnode_row_iter = Box::pin(owned_row_iter_with_vnode(vnode, vnode_row_iter)); iterators.push(vnode_row_iter); } diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 93fd7c652cc2d..9324da10e585c 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -666,7 +666,7 @@ pub(crate) enum RowAndVnodeStreamItem { } #[try_stream(ok = RowAndVnodeStreamItem, error = StreamExecutorError)] -pub(crate) async fn owned_row_iter_with_vnode(storage_iter: S, vnode: VirtualNode) +pub(crate) async fn owned_row_iter_with_vnode(vnode: VirtualNode, storage_iter: S) where StreamExecutorError: From, S: Stream, E>>,