diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 0afea6720cad..fa39c87d74ea 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -20,12 +20,12 @@ use futures::stream::{select_all, select_with_strategy}; use futures::{stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::{DataChunk, Op, StreamChunk}; +use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; -use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; @@ -34,19 +34,17 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; #[cfg(debug_assertions)] use crate::executor::backfill::utils::METADATA_STATE_LEN; use crate::executor::backfill::utils::{ - compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message, - mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode, - BackfillProgressPerVnode, BackfillState, + compute_bounds, create_builder, get_progress_per_vnode, iter_chunks, mapping_chunk, + mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, + update_pos_by_vnode, BackfillProgressPerVnode, BackfillState, }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ expect_first_barrier, Barrier, BoxedExecutor, BoxedMessageStream, Executor, ExecutorInfo, - HashMap, Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, + Message, PkIndicesRef, StreamExecutorError, }; use crate::task::{ActorId, CreateMviewProgress}; -type Builders = HashMap; - /// Similar to [`super::no_shuffle_backfill::BackfillExecutor`]. /// Main differences: /// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled @@ -133,18 +131,17 @@ where .iter() .map(|field| field.data_type.clone()) .collect_vec(); - let mut builders: Builders = upstream_table + let mut builders = upstream_table .vnodes() .iter_vnodes() - .map(|vnode| { - let builder = create_builder( + .map(|_| { + create_builder( self.rate_limit, self.chunk_size, snapshot_data_types.clone(), - ); - (vnode, builder) + ) }) - .collect(); + .collect_vec(); let mut upstream = self.upstream.execute(); @@ -222,8 +219,6 @@ where 'backfill_loop: loop { let mut cur_barrier_snapshot_processed_rows: u64 = 0; let mut cur_barrier_upstream_processed_rows: u64 = 0; - let mut snapshot_read_complete = false; - let mut has_snapshot_read = false; // NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be // dropped. Then we can write to `upstream_table` on barrier in the @@ -234,19 +229,20 @@ where let right_snapshot = pin!(Self::make_snapshot_stream( &upstream_table, backfill_state.clone(), // FIXME: Use mutable reference instead. + &mut builders, paused, ) .map(Either::Right)); // Prefer to select upstream, so we can stop snapshot stream as soon as the // barrier comes. - let mut backfill_stream = + let backfill_stream = select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| { stream::PollNext::Left }); #[for_await] - for either in &mut backfill_stream { + for either in backfill_stream { match either { // Upstream Either::Left(msg) => { @@ -272,24 +268,8 @@ where } // Snapshot read Either::Right(msg) => { - has_snapshot_read = true; match msg? { None => { - // Consume remaining rows in the builder. - for (vnode, builder) in &mut builders { - if let Some(data_chunk) = builder.consume_all() { - yield Self::handle_snapshot_chunk( - data_chunk, - *vnode, - &pk_in_output_indices, - &mut backfill_state, - &mut cur_barrier_snapshot_processed_rows, - &mut total_snapshot_processed_rows, - &self.output_indices, - )?; - } - } - // End of the snapshot read stream. // We should not mark the chunk anymore, // otherwise, we will ignore some rows @@ -308,61 +288,28 @@ where break 'backfill_loop; } - Some((vnode, row)) => { - let builder = builders.get_mut(&vnode).unwrap(); - if let Some(chunk) = builder.append_one_row(row) { - yield Self::handle_snapshot_chunk( - chunk, - vnode, - &pk_in_output_indices, - &mut backfill_state, - &mut cur_barrier_snapshot_processed_rows, - &mut total_snapshot_processed_rows, - &self.output_indices, - )?; - } - } - } - } - } - } + Some((vnode, chunk)) => { + let chunk_cardinality = chunk.cardinality() as u64; - // Before processing barrier, if did not snapshot read, - // do a snapshot read first. - // This is so we don't lose the tombstone iteration progress. - // If paused, we also can't read any snapshot records. - if !has_snapshot_read && !paused { - // If we have not snapshot read, builders must all be empty. - debug_assert!(builders.values().all(|b| b.is_empty())); - let (_, snapshot) = backfill_stream.into_inner(); - #[for_await] - for msg in snapshot { - let Either::Right(msg) = msg else { - bail!("BUG: snapshot_read contains upstream messages"); - }; - match msg? { - None => { - // End of the snapshot read stream. - // We let the barrier handling logic take care of upstream updates. - // But we still want to exit backfill loop, so we mark snapshot read complete. - snapshot_read_complete = true; - break; - } - Some((vnode, row)) => { - let builder = builders.get_mut(&vnode).unwrap(); - if let Some(chunk) = builder.append_one_row(row) { - yield Self::handle_snapshot_chunk( - chunk, + // Raise the current position. + // As snapshot read streams are ordered by pk, so we can + // just use the last row to update `current_pos`. + update_pos_by_vnode( vnode, + &chunk, &pk_in_output_indices, &mut backfill_state, - &mut cur_barrier_snapshot_processed_rows, - &mut total_snapshot_processed_rows, - &self.output_indices, + chunk_cardinality, )?; - } - break; + cur_barrier_snapshot_processed_rows += chunk_cardinality; + total_snapshot_processed_rows += chunk_cardinality; + let chunk = Message::Chunk(mapping_chunk( + chunk, + &self.output_indices, + )); + yield chunk; + } } } } @@ -401,20 +348,19 @@ where // consume snapshot rows left in builder. // NOTE(kwannoel): `zip_eq_debug` does not work here, // we encounter "higher-ranked lifetime error". - for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| { - let chunk = b.consume_all().map(|chunk| { + for (vnode, chunk) in vnodes.iter_vnodes().zip_eq(builders.iter_mut().map(|b| { + b.consume_all().map(|chunk| { let ops = vec![Op::Insert; chunk.capacity()]; StreamChunk::from_parts(ops, chunk) - }); - (vnode, chunk) - }) { + }) + })) { if let Some(chunk) = chunk { let chunk_cardinality = chunk.cardinality() as u64; // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. update_pos_by_vnode( - *vnode, + vnode, &chunk, &pk_in_output_indices, &mut backfill_state, @@ -500,10 +446,6 @@ where yield Message::Barrier(barrier); // We will switch snapshot at the start of the next iteration of the backfill loop. - // Unless snapshot read is already completed. - if snapshot_read_complete { - break 'backfill_loop; - } } } @@ -581,10 +523,11 @@ where } } - #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] - async fn make_snapshot_stream( - upstream_table: &ReplicatedStateTable, + #[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)] + async fn make_snapshot_stream<'a>( + upstream_table: &'a ReplicatedStateTable, backfill_state: BackfillState, + builders: &'a mut [DataChunkBuilder], paused: bool, ) { if paused { @@ -594,41 +537,12 @@ where } } else { #[for_await] - for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) { + for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state, builders) { yield r?; } } } - fn handle_snapshot_chunk( - chunk: DataChunk, - vnode: VirtualNode, - pk_in_output_indices: &[usize], - backfill_state: &mut BackfillState, - cur_barrier_snapshot_processed_rows: &mut u64, - total_snapshot_processed_rows: &mut u64, - output_indices: &[usize], - ) -> StreamExecutorResult { - let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk); - // Raise the current position. - // As snapshot read streams are ordered by pk, so we can - // just use the last row to update `current_pos`. - let snapshot_row_count_delta = chunk.cardinality() as u64; - update_pos_by_vnode( - vnode, - &chunk, - pk_in_output_indices, - backfill_state, - snapshot_row_count_delta, - )?; - - let chunk_cardinality = chunk.cardinality() as u64; - *cur_barrier_snapshot_processed_rows += chunk_cardinality; - *total_snapshot_processed_rows += chunk_cardinality; - let chunk = Message::Chunk(mapping_chunk(chunk, output_indices)); - Ok(chunk) - } - /// Read snapshot per vnode. /// These streams should be sorted in storage layer. /// 1. Get row iterator / vnode. @@ -653,13 +567,18 @@ where /// remaining data in `builder` must be flushed manually. /// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be /// present, Then when we flush we contain duplicate rows. - #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] - async fn snapshot_read_per_vnode( - upstream_table: &ReplicatedStateTable, + #[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)] + async fn snapshot_read_per_vnode<'a>( + upstream_table: &'a ReplicatedStateTable, backfill_state: BackfillState, + builders: &'a mut [DataChunkBuilder], ) { let mut iterators = vec![]; - for vnode in upstream_table.vnodes().iter_vnodes() { + for (vnode, builder) in upstream_table + .vnodes() + .iter_vnodes() + .zip_eq_debug(builders.iter_mut()) + { let backfill_progress = backfill_state.get_progress(&vnode)?; let current_pos = match backfill_progress { BackfillProgressPerVnode::NotStarted => None, @@ -691,19 +610,20 @@ where let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter)); - let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row)); + let vnode_chunk_iter = + iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk)); - let vnode_row_iter = Box::pin(vnode_row_iter); + let vnode_chunk_iter = Box::pin(vnode_chunk_iter); - iterators.push(vnode_row_iter); + iterators.push(vnode_chunk_iter); } - // TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial. - let vnode_row_iter = select_all(iterators); + let vnode_chunk_iter = select_all(iterators); + // This means we iterate serially rather than in parallel across vnodes. #[for_await] - for vnode_and_row in vnode_row_iter { - yield Some(vnode_and_row?); + for chunk in vnode_chunk_iter { + yield Some(chunk?); } yield None; return Ok(()); diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 824bc51e13b0..6d95a3ceba52 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -332,10 +332,7 @@ where // This is so we don't lose the tombstone iteration progress. // If paused, we also can't read any snapshot records. if !has_snapshot_read && !paused { - assert!( - builder.is_empty(), - "Builder should be empty if no snapshot read" - ); + assert!(builder.is_empty()); let (_, snapshot) = backfill_stream.into_inner(); #[for_await] for msg in snapshot { diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index c11dd6847125..bde6abc11fdf 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -341,6 +341,8 @@ pub(crate) fn mark_chunk_ref_by_vnode( let mut new_visibility = BitmapBuilder::with_capacity(ops.len()); // Use project to avoid allocation. for row in data.rows() { + // TODO(kwannoel): Is this logic correct for computing vnode? + // I will revisit it again when arrangement_backfill is implemented e2e. let vnode = VirtualNode::compute_row(row, pk_in_output_indices); let v = match backfill_state.get_progress(&vnode)? { // We want to just forward the row, if the vnode has finished backfill.