Skip to content

Commit

Permalink
Revert "feat(stream): read exactly 1 row if no snapshot read per barr…
Browse files Browse the repository at this point in the history
…ier for arrangement backfill (#14842)"

This reverts commit f21fb9b.
  • Loading branch information
StrikeW committed Feb 4, 2024
1 parent 96e61c3 commit 37dbd85
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 141 deletions.
194 changes: 57 additions & 137 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use futures::stream::{select_all, select_with_strategy};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::OwnedRow;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;
Expand All @@ -34,19 +34,17 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
#[cfg(debug_assertions)]
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message,
mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode,
BackfillProgressPerVnode, BackfillState,
compute_bounds, create_builder, get_progress_per_vnode, iter_chunks, mapping_chunk,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillState,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, Barrier, BoxedExecutor, BoxedMessageStream, Executor, ExecutorInfo,
HashMap, Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult,
Message, PkIndicesRef, StreamExecutorError,
};
use crate::task::{ActorId, CreateMviewProgress};

type Builders = HashMap<VirtualNode, DataChunkBuilder>;

/// Similar to [`super::no_shuffle_backfill::BackfillExecutor`].
/// Main differences:
/// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled
Expand Down Expand Up @@ -133,18 +131,17 @@ where
.iter()
.map(|field| field.data_type.clone())
.collect_vec();
let mut builders: Builders = upstream_table
let mut builders = upstream_table
.vnodes()
.iter_vnodes()
.map(|vnode| {
let builder = create_builder(
.map(|_| {
create_builder(
self.rate_limit,
self.chunk_size,
snapshot_data_types.clone(),
);
(vnode, builder)
)
})
.collect();
.collect_vec();

let mut upstream = self.upstream.execute();

Expand Down Expand Up @@ -222,8 +219,6 @@ where
'backfill_loop: loop {
let mut cur_barrier_snapshot_processed_rows: u64 = 0;
let mut cur_barrier_upstream_processed_rows: u64 = 0;
let mut snapshot_read_complete = false;
let mut has_snapshot_read = false;

// NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be
// dropped. Then we can write to `upstream_table` on barrier in the
Expand All @@ -234,19 +229,20 @@ where
let right_snapshot = pin!(Self::make_snapshot_stream(
&upstream_table,
backfill_state.clone(), // FIXME: Use mutable reference instead.
&mut builders,
paused,
)
.map(Either::Right));

// Prefer to select upstream, so we can stop snapshot stream as soon as the
// barrier comes.
let mut backfill_stream =
let backfill_stream =
select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
stream::PollNext::Left
});

#[for_await]
for either in &mut backfill_stream {
for either in backfill_stream {
match either {
// Upstream
Either::Left(msg) => {
Expand All @@ -272,24 +268,8 @@ where
}
// Snapshot read
Either::Right(msg) => {
has_snapshot_read = true;
match msg? {
None => {
// Consume remaining rows in the builder.
for (vnode, builder) in &mut builders {
if let Some(data_chunk) = builder.consume_all() {
yield Self::handle_snapshot_chunk(
data_chunk,
*vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?;
}
}

// End of the snapshot read stream.
// We should not mark the chunk anymore,
// otherwise, we will ignore some rows
Expand All @@ -308,61 +288,28 @@ where

break 'backfill_loop;
}
Some((vnode, row)) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Self::handle_snapshot_chunk(
chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?;
}
}
}
}
}
}
Some((vnode, chunk)) => {
let chunk_cardinality = chunk.cardinality() as u64;

// Before processing barrier, if did not snapshot read,
// do a snapshot read first.
// This is so we don't lose the tombstone iteration progress.
// If paused, we also can't read any snapshot records.
if !has_snapshot_read && !paused {
// If we have not snapshot read, builders must all be empty.
debug_assert!(builders.values().all(|b| b.is_empty()));
let (_, snapshot) = backfill_stream.into_inner();
#[for_await]
for msg in snapshot {
let Either::Right(msg) = msg else {
bail!("BUG: snapshot_read contains upstream messages");
};
match msg? {
None => {
// End of the snapshot read stream.
// We let the barrier handling logic take care of upstream updates.
// But we still want to exit backfill loop, so we mark snapshot read complete.
snapshot_read_complete = true;
break;
}
Some((vnode, row)) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Self::handle_snapshot_chunk(
chunk,
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
update_pos_by_vnode(
vnode,
&chunk,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
chunk_cardinality,
)?;
}

break;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
let chunk = Message::Chunk(mapping_chunk(
chunk,
&self.output_indices,
));
yield chunk;
}
}
}
}
Expand Down Expand Up @@ -401,20 +348,19 @@ where
// consume snapshot rows left in builder.
// NOTE(kwannoel): `zip_eq_debug` does not work here,
// we encounter "higher-ranked lifetime error".
for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| {
let chunk = b.consume_all().map(|chunk| {
for (vnode, chunk) in vnodes.iter_vnodes().zip_eq(builders.iter_mut().map(|b| {
b.consume_all().map(|chunk| {
let ops = vec![Op::Insert; chunk.capacity()];
StreamChunk::from_parts(ops, chunk)
});
(vnode, chunk)
}) {
})
})) {
if let Some(chunk) = chunk {
let chunk_cardinality = chunk.cardinality() as u64;
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
update_pos_by_vnode(
*vnode,
vnode,
&chunk,
&pk_in_output_indices,
&mut backfill_state,
Expand Down Expand Up @@ -500,10 +446,6 @@ where
yield Message::Barrier(barrier);

// We will switch snapshot at the start of the next iteration of the backfill loop.
// Unless snapshot read is already completed.
if snapshot_read_complete {
break 'backfill_loop;
}
}
}

Expand Down Expand Up @@ -581,10 +523,11 @@ where
}
}

#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
async fn make_snapshot_stream(
upstream_table: &ReplicatedStateTable<S, SD>,
#[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)]
async fn make_snapshot_stream<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
builders: &'a mut [DataChunkBuilder],
paused: bool,
) {
if paused {
Expand All @@ -594,41 +537,12 @@ where
}
} else {
#[for_await]
for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state, builders) {
yield r?;
}
}
}

fn handle_snapshot_chunk(
chunk: DataChunk,
vnode: VirtualNode,
pk_in_output_indices: &[usize],
backfill_state: &mut BackfillState,
cur_barrier_snapshot_processed_rows: &mut u64,
total_snapshot_processed_rows: &mut u64,
output_indices: &[usize],
) -> StreamExecutorResult<Message> {
let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
let snapshot_row_count_delta = chunk.cardinality() as u64;
update_pos_by_vnode(
vnode,
&chunk,
pk_in_output_indices,
backfill_state,
snapshot_row_count_delta,
)?;

let chunk_cardinality = chunk.cardinality() as u64;
*cur_barrier_snapshot_processed_rows += chunk_cardinality;
*total_snapshot_processed_rows += chunk_cardinality;
let chunk = Message::Chunk(mapping_chunk(chunk, output_indices));
Ok(chunk)
}

/// Read snapshot per vnode.
/// These streams should be sorted in storage layer.
/// 1. Get row iterator / vnode.
Expand All @@ -653,13 +567,18 @@ where
/// remaining data in `builder` must be flushed manually.
/// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
/// present, Then when we flush we contain duplicate rows.
#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
async fn snapshot_read_per_vnode(
upstream_table: &ReplicatedStateTable<S, SD>,
#[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)]
async fn snapshot_read_per_vnode<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
builders: &'a mut [DataChunkBuilder],
) {
let mut iterators = vec![];
for vnode in upstream_table.vnodes().iter_vnodes() {
for (vnode, builder) in upstream_table
.vnodes()
.iter_vnodes()
.zip_eq_debug(builders.iter_mut())
{
let backfill_progress = backfill_state.get_progress(&vnode)?;
let current_pos = match backfill_progress {
BackfillProgressPerVnode::NotStarted => None,
Expand Down Expand Up @@ -691,19 +610,20 @@ where

let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter));

let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));
let vnode_chunk_iter =
iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk));

let vnode_row_iter = Box::pin(vnode_row_iter);
let vnode_chunk_iter = Box::pin(vnode_chunk_iter);

iterators.push(vnode_row_iter);
iterators.push(vnode_chunk_iter);
}

// TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial.
let vnode_row_iter = select_all(iterators);
let vnode_chunk_iter = select_all(iterators);

// This means we iterate serially rather than in parallel across vnodes.
#[for_await]
for vnode_and_row in vnode_row_iter {
yield Some(vnode_and_row?);
for chunk in vnode_chunk_iter {
yield Some(chunk?);
}
yield None;
return Ok(());
Expand Down
5 changes: 1 addition & 4 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,7 @@ where
// This is so we don't lose the tombstone iteration progress.
// If paused, we also can't read any snapshot records.
if !has_snapshot_read && !paused {
assert!(
builder.is_empty(),
"Builder should be empty if no snapshot read"
);
assert!(builder.is_empty());
let (_, snapshot) = backfill_stream.into_inner();
#[for_await]
for msg in snapshot {
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ pub(crate) fn mark_chunk_ref_by_vnode(
let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
// Use project to avoid allocation.
for row in data.rows() {
// TODO(kwannoel): Is this logic correct for computing vnode?
// I will revisit it again when arrangement_backfill is implemented e2e.
let vnode = VirtualNode::compute_row(row, pk_in_output_indices);
let v = match backfill_state.get_progress(&vnode)? {
// We want to just forward the row, if the vnode has finished backfill.
Expand Down

0 comments on commit 37dbd85

Please sign in to comment.