diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 76f9271862e29..f39b9ab6d40e4 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -20,12 +20,12 @@ use futures::stream::{select_all, select_with_strategy}; use futures::{stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; @@ -34,17 +34,19 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; #[cfg(debug_assertions)] use crate::executor::backfill::utils::METADATA_STATE_LEN; use crate::executor::backfill::utils::{ - compute_bounds, create_builder, get_progress_per_vnode, iter_chunks, mapping_chunk, - mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, - update_pos_by_vnode, BackfillProgressPerVnode, BackfillState, + compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message, + mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode, + BackfillProgressPerVnode, BackfillState, }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ expect_first_barrier, Barrier, BoxedExecutor, BoxedMessageStream, Executor, ExecutorInfo, - Message, PkIndicesRef, StreamExecutorError, + HashMap, Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, }; use crate::task::{ActorId, CreateMviewProgress}; +type Builders = HashMap; + /// Similar to [`super::no_shuffle_backfill::BackfillExecutor`]. /// Main differences: /// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled @@ -131,17 +133,18 @@ where .iter() .map(|field| field.data_type.clone()) .collect_vec(); - let mut builders = upstream_table + let mut builders: Builders = upstream_table .vnodes() .iter_vnodes() - .map(|_| { - create_builder( + .map(|vnode| { + let builder = create_builder( self.rate_limit, self.chunk_size, snapshot_data_types.clone(), - ) + ); + (vnode, builder) }) - .collect_vec(); + .collect(); let mut upstream = self.upstream.execute(); @@ -219,6 +222,8 @@ where 'backfill_loop: loop { let mut cur_barrier_snapshot_processed_rows: u64 = 0; let mut cur_barrier_upstream_processed_rows: u64 = 0; + let mut snapshot_read_complete = false; + let mut has_snapshot_read = false; // NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be // dropped. Then we can write to `upstream_table` on barrier in the @@ -229,20 +234,19 @@ where let right_snapshot = pin!(Self::make_snapshot_stream( &upstream_table, backfill_state.clone(), // FIXME: Use mutable reference instead. - &mut builders, paused, ) .map(Either::Right)); // Prefer to select upstream, so we can stop snapshot stream as soon as the // barrier comes. - let backfill_stream = + let mut backfill_stream = select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| { stream::PollNext::Left }); #[for_await] - for either in backfill_stream { + for either in &mut backfill_stream { match either { // Upstream Either::Left(msg) => { @@ -268,8 +272,24 @@ where } // Snapshot read Either::Right(msg) => { + has_snapshot_read = true; match msg? { None => { + // Consume remaining rows in the builder. + for (vnode, builder) in &mut builders { + if let Some(data_chunk) = builder.consume_all() { + yield Self::handle_snapshot_chunk( + data_chunk, + *vnode, + &pk_in_output_indices, + &mut backfill_state, + &mut cur_barrier_snapshot_processed_rows, + &mut total_snapshot_processed_rows, + &self.output_indices, + )?; + } + } + // End of the snapshot read stream. // We should not mark the chunk anymore, // otherwise, we will ignore some rows @@ -288,28 +308,61 @@ where break 'backfill_loop; } - Some((vnode, chunk)) => { - let chunk_cardinality = chunk.cardinality() as u64; + Some((vnode, row)) => { + let builder = builders.get_mut(&vnode).unwrap(); + if let Some(chunk) = builder.append_one_row(row) { + yield Self::handle_snapshot_chunk( + chunk, + vnode, + &pk_in_output_indices, + &mut backfill_state, + &mut cur_barrier_snapshot_processed_rows, + &mut total_snapshot_processed_rows, + &self.output_indices, + )?; + } + } + } + } + } + } - // Raise the current position. - // As snapshot read streams are ordered by pk, so we can - // just use the last row to update `current_pos`. - update_pos_by_vnode( + // Before processing barrier, if did not snapshot read, + // do a snapshot read first. + // This is so we don't lose the tombstone iteration progress. + // If paused, we also can't read any snapshot records. + if !has_snapshot_read && !paused { + // If we have not snapshot read, builders must all be empty. + debug_assert!(builders.values().all(|b| b.is_empty())); + let (_, snapshot) = backfill_stream.into_inner(); + #[for_await] + for msg in snapshot { + let Either::Right(msg) = msg else { + bail!("BUG: snapshot_read contains upstream messages"); + }; + match msg? { + None => { + // End of the snapshot read stream. + // We let the barrier handling logic take care of upstream updates. + // But we still want to exit backfill loop, so we mark snapshot read complete. + snapshot_read_complete = true; + break; + } + Some((vnode, row)) => { + let builder = builders.get_mut(&vnode).unwrap(); + if let Some(chunk) = builder.append_one_row(row) { + yield Self::handle_snapshot_chunk( + chunk, vnode, - &chunk, &pk_in_output_indices, &mut backfill_state, - chunk_cardinality, - )?; - - cur_barrier_snapshot_processed_rows += chunk_cardinality; - total_snapshot_processed_rows += chunk_cardinality; - let chunk = Message::Chunk(mapping_chunk( - chunk, + &mut cur_barrier_snapshot_processed_rows, + &mut total_snapshot_processed_rows, &self.output_indices, - )); - yield chunk; + )?; } + + break; } } } @@ -348,19 +401,20 @@ where // consume snapshot rows left in builder. // NOTE(kwannoel): `zip_eq_debug` does not work here, // we encounter "higher-ranked lifetime error". - for (vnode, chunk) in vnodes.iter_vnodes().zip_eq(builders.iter_mut().map(|b| { - b.consume_all().map(|chunk| { + for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| { + let chunk = b.consume_all().map(|chunk| { let ops = vec![Op::Insert; chunk.capacity()]; StreamChunk::from_parts(ops, chunk) - }) - })) { + }); + (vnode, chunk) + }) { if let Some(chunk) = chunk { let chunk_cardinality = chunk.cardinality() as u64; // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. update_pos_by_vnode( - vnode, + *vnode, &chunk, &pk_in_output_indices, &mut backfill_state, @@ -451,6 +505,10 @@ where yield Message::Barrier(barrier); // We will switch snapshot at the start of the next iteration of the backfill loop. + // Unless snapshot read is already completed. + if snapshot_read_complete { + break 'backfill_loop; + } } } @@ -528,11 +586,10 @@ where } } - #[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)] - async fn make_snapshot_stream<'a>( - upstream_table: &'a ReplicatedStateTable, + #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] + async fn make_snapshot_stream( + upstream_table: &ReplicatedStateTable, backfill_state: BackfillState, - builders: &'a mut [DataChunkBuilder], paused: bool, ) { if paused { @@ -542,12 +599,41 @@ where } } else { #[for_await] - for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state, builders) { + for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) { yield r?; } } } + fn handle_snapshot_chunk( + chunk: DataChunk, + vnode: VirtualNode, + pk_in_output_indices: &[usize], + backfill_state: &mut BackfillState, + cur_barrier_snapshot_processed_rows: &mut u64, + total_snapshot_processed_rows: &mut u64, + output_indices: &[usize], + ) -> StreamExecutorResult { + let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk); + // Raise the current position. + // As snapshot read streams are ordered by pk, so we can + // just use the last row to update `current_pos`. + let snapshot_row_count_delta = chunk.cardinality() as u64; + update_pos_by_vnode( + vnode, + &chunk, + pk_in_output_indices, + backfill_state, + snapshot_row_count_delta, + )?; + + let chunk_cardinality = chunk.cardinality() as u64; + *cur_barrier_snapshot_processed_rows += chunk_cardinality; + *total_snapshot_processed_rows += chunk_cardinality; + let chunk = Message::Chunk(mapping_chunk(chunk, output_indices)); + Ok(chunk) + } + /// Read snapshot per vnode. /// These streams should be sorted in storage layer. /// 1. Get row iterator / vnode. @@ -572,18 +658,13 @@ where /// remaining data in `builder` must be flushed manually. /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be /// present, Then when we flush we contain duplicate rows. - #[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)] - async fn snapshot_read_per_vnode<'a>( - upstream_table: &'a ReplicatedStateTable, + #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] + async fn snapshot_read_per_vnode( + upstream_table: &ReplicatedStateTable, backfill_state: BackfillState, - builders: &'a mut [DataChunkBuilder], ) { let mut iterators = vec![]; - for (vnode, builder) in upstream_table - .vnodes() - .iter_vnodes() - .zip_eq_debug(builders.iter_mut()) - { + for vnode in upstream_table.vnodes().iter_vnodes() { let backfill_progress = backfill_state.get_progress(&vnode)?; let current_pos = match backfill_progress { BackfillProgressPerVnode::NotStarted => None, @@ -615,20 +696,19 @@ where let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter)); - let vnode_chunk_iter = - iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk)); + let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row)); - let vnode_chunk_iter = Box::pin(vnode_chunk_iter); + let vnode_row_iter = Box::pin(vnode_row_iter); - iterators.push(vnode_chunk_iter); + iterators.push(vnode_row_iter); } - let vnode_chunk_iter = select_all(iterators); + // TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial. + let vnode_row_iter = select_all(iterators); - // This means we iterate serially rather than in parallel across vnodes. #[for_await] - for chunk in vnode_chunk_iter { - yield Some(chunk?); + for vnode_and_row in vnode_row_iter { + yield Some(vnode_and_row?); } yield None; return Ok(()); diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 6d95a3ceba522..824bc51e13b00 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -332,7 +332,10 @@ where // This is so we don't lose the tombstone iteration progress. // If paused, we also can't read any snapshot records. if !has_snapshot_read && !paused { - assert!(builder.is_empty()); + assert!( + builder.is_empty(), + "Builder should be empty if no snapshot read" + ); let (_, snapshot) = backfill_stream.into_inner(); #[for_await] for msg in snapshot { diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 8937d52607748..6f9e209b1e694 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -343,8 +343,6 @@ pub(crate) fn mark_chunk_ref_by_vnode( let mut new_visibility = BitmapBuilder::with_capacity(ops.len()); // Use project to avoid allocation. for row in data.rows() { - // TODO(kwannoel): Is this logic correct for computing vnode? - // I will revisit it again when arrangement_backfill is implemented e2e. let vnode = VirtualNode::compute_row(row, pk_in_output_indices); let v = match backfill_state.get_progress(&vnode)? { // We want to just forward the row, if the vnode has finished backfill.