Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): read exactly 1 row if no snapshot read per barrier for arrangement backfill #14842

Merged
merged 8 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 137 additions & 57 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use futures::stream::{select_all, select_with_strategy};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::OwnedRow;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;
Expand All @@ -34,17 +34,19 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
#[cfg(debug_assertions)]
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, get_progress_per_vnode, iter_chunks, mapping_chunk,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillState,
compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message,
mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode,
BackfillProgressPerVnode, BackfillState,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, Barrier, BoxedExecutor, BoxedMessageStream, Executor, ExecutorInfo,
Message, PkIndicesRef, StreamExecutorError,
HashMap, Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult,
};
use crate::task::{ActorId, CreateMviewProgress};

type Builders = HashMap<VirtualNode, DataChunkBuilder>;

/// Similar to [`super::no_shuffle_backfill::BackfillExecutor`].
/// Main differences:
/// - [`ArrangementBackfillExecutor`] can reside on a different CN, so it can be scaled
Expand Down Expand Up @@ -131,17 +133,18 @@ where
.iter()
.map(|field| field.data_type.clone())
.collect_vec();
let mut builders = upstream_table
let mut builders: Builders = upstream_table
.vnodes()
.iter_vnodes()
.map(|_| {
create_builder(
.map(|vnode| {
let builder = create_builder(
self.rate_limit,
self.chunk_size,
snapshot_data_types.clone(),
)
);
(vnode, builder)
})
.collect_vec();
.collect();

let mut upstream = self.upstream.execute();

Expand Down Expand Up @@ -219,6 +222,8 @@ where
'backfill_loop: loop {
let mut cur_barrier_snapshot_processed_rows: u64 = 0;
let mut cur_barrier_upstream_processed_rows: u64 = 0;
let mut snapshot_read_complete = false;
let mut has_snapshot_read = false;

// NOTE(kwannoel): Scope it so that immutable reference to `upstream_table` can be
// dropped. Then we can write to `upstream_table` on barrier in the
Expand All @@ -229,20 +234,19 @@ where
let right_snapshot = pin!(Self::make_snapshot_stream(
&upstream_table,
backfill_state.clone(), // FIXME: Use mutable reference instead.
&mut builders,
paused,
)
.map(Either::Right));

// Prefer to select upstream, so we can stop snapshot stream as soon as the
// barrier comes.
let backfill_stream =
let mut backfill_stream =
select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
stream::PollNext::Left
});

#[for_await]
for either in backfill_stream {
for either in &mut backfill_stream {
match either {
// Upstream
Either::Left(msg) => {
Expand All @@ -268,8 +272,24 @@ where
}
// Snapshot read
Either::Right(msg) => {
has_snapshot_read = true;
match msg? {
None => {
// Consume remaining rows in the builder.
for (vnode, builder) in &mut builders {
if let Some(data_chunk) = builder.consume_all() {
yield Self::handle_snapshot_chunk(
data_chunk,
*vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?;
}
}

// End of the snapshot read stream.
// We should not mark the chunk anymore,
// otherwise, we will ignore some rows
Expand All @@ -288,28 +308,61 @@ where

break 'backfill_loop;
}
Some((vnode, chunk)) => {
let chunk_cardinality = chunk.cardinality() as u64;
Some((vnode, row)) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Self::handle_snapshot_chunk(
chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?;
}
}
}
}
}
}

// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
update_pos_by_vnode(
// Before processing barrier, if did not snapshot read,
// do a snapshot read first.
// This is so we don't lose the tombstone iteration progress.
// If paused, we also can't read any snapshot records.
if !has_snapshot_read && !paused {
// If we have not snapshot read, builders must all be empty.
debug_assert!(builders.values().all(|b| b.is_empty()));
let (_, snapshot) = backfill_stream.into_inner();
#[for_await]
for msg in snapshot {
let Either::Right(msg) = msg else {
bail!("BUG: snapshot_read contains upstream messages");
};
match msg? {
None => {
// End of the snapshot read stream.
// We let the barrier handling logic take care of upstream updates.
// But we still want to exit backfill loop, so we mark snapshot read complete.
snapshot_read_complete = true;
break;
}
Some((vnode, row)) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Self::handle_snapshot_chunk(
chunk,
vnode,
&chunk,
&pk_in_output_indices,
&mut backfill_state,
chunk_cardinality,
)?;

cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
let chunk = Message::Chunk(mapping_chunk(
chunk,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
));
yield chunk;
)?;
}

break;
}
}
}
Expand Down Expand Up @@ -348,19 +401,20 @@ where
// consume snapshot rows left in builder.
// NOTE(kwannoel): `zip_eq_debug` does not work here,
// we encounter "higher-ranked lifetime error".
for (vnode, chunk) in vnodes.iter_vnodes().zip_eq(builders.iter_mut().map(|b| {
b.consume_all().map(|chunk| {
for (vnode, chunk) in builders.iter_mut().map(|(vnode, b)| {
let chunk = b.consume_all().map(|chunk| {
let ops = vec![Op::Insert; chunk.capacity()];
StreamChunk::from_parts(ops, chunk)
})
})) {
});
(vnode, chunk)
}) {
if let Some(chunk) = chunk {
let chunk_cardinality = chunk.cardinality() as u64;
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
update_pos_by_vnode(
vnode,
*vnode,
&chunk,
&pk_in_output_indices,
&mut backfill_state,
Expand Down Expand Up @@ -451,6 +505,10 @@ where
yield Message::Barrier(barrier);

// We will switch snapshot at the start of the next iteration of the backfill loop.
// Unless snapshot read is already completed.
if snapshot_read_complete {
break 'backfill_loop;
}
}
}

Expand Down Expand Up @@ -528,11 +586,10 @@ where
}
}

#[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)]
async fn make_snapshot_stream<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
async fn make_snapshot_stream(
upstream_table: &ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
builders: &'a mut [DataChunkBuilder],
paused: bool,
) {
if paused {
Expand All @@ -542,12 +599,41 @@ where
}
} else {
#[for_await]
for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state, builders) {
for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
yield r?;
}
}
}

fn handle_snapshot_chunk(
chunk: DataChunk,
vnode: VirtualNode,
pk_in_output_indices: &[usize],
backfill_state: &mut BackfillState,
cur_barrier_snapshot_processed_rows: &mut u64,
total_snapshot_processed_rows: &mut u64,
output_indices: &[usize],
) -> StreamExecutorResult<Message> {
let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
let snapshot_row_count_delta = chunk.cardinality() as u64;
update_pos_by_vnode(
vnode,
&chunk,
pk_in_output_indices,
backfill_state,
snapshot_row_count_delta,
)?;

let chunk_cardinality = chunk.cardinality() as u64;
*cur_barrier_snapshot_processed_rows += chunk_cardinality;
*total_snapshot_processed_rows += chunk_cardinality;
let chunk = Message::Chunk(mapping_chunk(chunk, output_indices));
Ok(chunk)
}

/// Read snapshot per vnode.
/// These streams should be sorted in storage layer.
/// 1. Get row iterator / vnode.
Expand All @@ -572,18 +658,13 @@ where
/// remaining data in `builder` must be flushed manually.
/// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
/// present, Then when we flush we contain duplicate rows.
#[try_stream(ok = Option<(VirtualNode, StreamChunk)>, error = StreamExecutorError)]
async fn snapshot_read_per_vnode<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
async fn snapshot_read_per_vnode(
upstream_table: &ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
builders: &'a mut [DataChunkBuilder],
) {
let mut iterators = vec![];
for (vnode, builder) in upstream_table
.vnodes()
.iter_vnodes()
.zip_eq_debug(builders.iter_mut())
{
for vnode in upstream_table.vnodes().iter_vnodes() {
let backfill_progress = backfill_state.get_progress(&vnode)?;
let current_pos = match backfill_progress {
BackfillProgressPerVnode::NotStarted => None,
Expand Down Expand Up @@ -615,20 +696,19 @@ where

let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter));

let vnode_chunk_iter =
iter_chunks(vnode_row_iter, builder).map_ok(move |chunk| (vnode, chunk));
let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));

let vnode_chunk_iter = Box::pin(vnode_chunk_iter);
let vnode_row_iter = Box::pin(vnode_row_iter);

iterators.push(vnode_chunk_iter);
iterators.push(vnode_row_iter);
}

let vnode_chunk_iter = select_all(iterators);
// TODO(kwannoel): We can provide an option between snapshot read in parallel vs serial.
let vnode_row_iter = select_all(iterators);

// This means we iterate serially rather than in parallel across vnodes.
#[for_await]
for chunk in vnode_chunk_iter {
yield Some(chunk?);
for vnode_and_row in vnode_row_iter {
yield Some(vnode_and_row?);
}
yield None;
return Ok(());
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,10 @@ where
// This is so we don't lose the tombstone iteration progress.
// If paused, we also can't read any snapshot records.
if !has_snapshot_read && !paused {
assert!(builder.is_empty());
assert!(
builder.is_empty(),
"Builder should be empty if no snapshot read"
);
let (_, snapshot) = backfill_stream.into_inner();
#[for_await]
for msg in snapshot {
Expand Down
2 changes: 0 additions & 2 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,6 @@ pub(crate) fn mark_chunk_ref_by_vnode(
let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
// Use project to avoid allocation.
for row in data.rows() {
// TODO(kwannoel): Is this logic correct for computing vnode?
// I will revisit it again when arrangement_backfill is implemented e2e.
let vnode = VirtualNode::compute_row(row, pk_in_output_indices);
let v = match backfill_state.get_progress(&vnode)? {
// We want to just forward the row, if the vnode has finished backfill.
Expand Down
Loading