Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(backfill): mark finished partitions early in arrangement backfill #15823

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use std::collections::HashMap;

use either::Either;
use futures::stream;
use futures::stream::{select_all, select_with_strategy};
use futures::{stream, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bail;
Expand All @@ -30,8 +31,9 @@ use crate::common::table::state_table::ReplicatedStateTable;
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, create_limiter, get_progress_per_vnode, mapping_chunk,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter_with_vnode, persist_state_per_vnode,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState,
RowAndVnodeStreamItem,
};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
Expand Down Expand Up @@ -309,7 +311,7 @@ where
.inc_by(cur_barrier_upstream_processed_rows);
break 'backfill_loop;
}
Some((vnode, row)) => {
Some(RowAndVnodeStreamItem::Record { vnode, row }) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Message::Chunk(Self::handle_snapshot_chunk(
Expand All @@ -323,6 +325,26 @@ where
)?);
}
}
Some(RowAndVnodeStreamItem::Finished { vnode }) => {
// Consume remaining rows in the builder.
let builder = builders.get_mut(&vnode).unwrap();
if let Some(data_chunk) = builder.consume_all() {
yield Message::Chunk(Self::handle_snapshot_chunk(
data_chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?);
}
// Update state for that vnode to finished.
backfill_state.finish_progress(
vnode,
upstream_table.pk_indices().len(),
);
}
}
}
}
Expand Down Expand Up @@ -354,10 +376,13 @@ where
// End of the snapshot read stream.
// We let the barrier handling logic take care of upstream updates.
// But we still want to exit backfill loop, so we mark snapshot read complete.
// Note that we don't have to drain builders here,
// since we did not do snapshot read at all,
// so there should be no rows in the builders.
snapshot_read_complete = true;
break;
}
Some((vnode, row)) => {
Some(RowAndVnodeStreamItem::Record { vnode, row }) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Message::Chunk(Self::handle_snapshot_chunk(
Expand All @@ -373,6 +398,13 @@ where

break;
}
Some(RowAndVnodeStreamItem::Finished { vnode }) => {
// Update state for that vnode to finished.
backfill_state
.finish_progress(vnode, upstream_table.pk_indices().len());
// But some other vnodes may not be finished yet.
continue;
}
}
}
}
Expand Down Expand Up @@ -605,7 +637,7 @@ where
}
}

#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
#[try_stream(ok = Option<RowAndVnodeStreamItem>, error = StreamExecutorError)]
async fn make_snapshot_stream<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
Expand Down Expand Up @@ -682,7 +714,7 @@ where
/// remaining data in `builder` must be flushed manually.
/// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
/// present, Then when we flush we contain duplicate rows.
#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
#[try_stream(ok = Option<RowAndVnodeStreamItem>, error = StreamExecutorError)]
async fn snapshot_read_per_vnode(
upstream_table: &ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
Expand All @@ -692,8 +724,11 @@ where
let backfill_progress = backfill_state.get_progress(&vnode)?;
let current_pos = match backfill_progress {
BackfillProgressPerVnode::NotStarted => None,
BackfillProgressPerVnode::Completed { current_pos, .. }
| BackfillProgressPerVnode::InProgress { current_pos, .. } => {
BackfillProgressPerVnode::Completed { .. } => {
// If completed, we don't need to continue reading snapshot.
continue;
}
BackfillProgressPerVnode::InProgress { current_pos, .. } => {
Some(current_pos.clone())
}
};
Expand All @@ -718,12 +753,7 @@ where
)
.await?;

let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter));

let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));

let vnode_row_iter = Box::pin(vnode_row_iter);

let vnode_row_iter = Box::pin(owned_row_iter_with_vnode(vnode, vnode_row_iter));
iterators.push(vnode_row_iter);
}

Expand Down
22 changes: 22 additions & 0 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,28 @@ pub(crate) fn compute_bounds(
}
}

pub(crate) enum RowAndVnodeStreamItem {
Finished { vnode: VirtualNode },
Record { vnode: VirtualNode, row: OwnedRow },
}

#[try_stream(ok = RowAndVnodeStreamItem, error = StreamExecutorError)]
pub(crate) async fn owned_row_iter_with_vnode<S, E>(vnode: VirtualNode, storage_iter: S)
where
StreamExecutorError: From<E>,
S: Stream<Item = Result<KeyedRow<Bytes>, E>>,
{
pin_mut!(storage_iter);
while let Some(row) = storage_iter.next().await {
let row = row?;
yield RowAndVnodeStreamItem::Record {
vnode,
row: row.into_owned_row(),
}
}
yield RowAndVnodeStreamItem::Finished { vnode }
}

#[try_stream(ok = OwnedRow, error = StreamExecutorError)]
pub(crate) async fn owned_row_iter<S, E>(storage_iter: S)
where
Expand Down
Loading