Skip to content

Commit

Permalink
support partial backfill finish
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 17, 2024
1 parent b2bd5a9 commit 8241328
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 14 deletions.
58 changes: 44 additions & 14 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use std::collections::HashMap;

use either::Either;
use futures::stream;
use futures::stream::{select_all, select_with_strategy};
use futures::{stream, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bail;
Expand All @@ -30,8 +31,9 @@ use crate::common::table::state_table::ReplicatedStateTable;
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, create_limiter, get_progress_per_vnode, mapping_chunk,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter_with_vnode, persist_state_per_vnode,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState,
RowAndVnodeStreamItem,
};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
Expand Down Expand Up @@ -309,7 +311,7 @@ where
.inc_by(cur_barrier_upstream_processed_rows);
break 'backfill_loop;
}
Some((vnode, row)) => {
Some(RowAndVnodeStreamItem::Record { vnode, row }) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Message::Chunk(Self::handle_snapshot_chunk(
Expand All @@ -323,6 +325,26 @@ where
)?);
}
}
Some(RowAndVnodeStreamItem::Finished { vnode }) => {
// Consume remaining rows in the builder.
let builder = builders.get_mut(&vnode).unwrap();
if let Some(data_chunk) = builder.consume_all() {
yield Message::Chunk(Self::handle_snapshot_chunk(
data_chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?);
}
// Update state for that vnode to finished.
backfill_state.finish_progress(
vnode,
upstream_table.pk_indices().len(),
);
}
}
}
}
Expand Down Expand Up @@ -354,10 +376,13 @@ where
// End of the snapshot read stream.
// We let the barrier handling logic take care of upstream updates.
// But we still want to exit backfill loop, so we mark snapshot read complete.
// Note that we don't have to drain builders here,
// since we did not do snapshot read at all,
// so there should be no rows in the builders.
snapshot_read_complete = true;
break;
}
Some((vnode, row)) => {
Some(RowAndVnodeStreamItem::Record { vnode, row }) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Message::Chunk(Self::handle_snapshot_chunk(
Expand All @@ -373,6 +398,13 @@ where

break;
}
Some(RowAndVnodeStreamItem::Finished { vnode }) => {
// Update state for that vnode to finished.
backfill_state
.finish_progress(vnode, upstream_table.pk_indices().len());
// But some other vnodes may not be finished yet.
continue;
}
}
}
}
Expand Down Expand Up @@ -605,7 +637,7 @@ where
}
}

#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
#[try_stream(ok = Option<RowAndVnodeStreamItem>, error = StreamExecutorError)]
async fn make_snapshot_stream<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
Expand Down Expand Up @@ -682,7 +714,7 @@ where
/// remaining data in `builder` must be flushed manually.
/// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
/// present, Then when we flush we contain duplicate rows.
#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
#[try_stream(ok = Option<RowAndVnodeStreamItem>, error = StreamExecutorError)]
async fn snapshot_read_per_vnode(
upstream_table: &ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
Expand All @@ -692,8 +724,11 @@ where
let backfill_progress = backfill_state.get_progress(&vnode)?;
let current_pos = match backfill_progress {
BackfillProgressPerVnode::NotStarted => None,
BackfillProgressPerVnode::Completed { current_pos, .. }
| BackfillProgressPerVnode::InProgress { current_pos, .. } => {
BackfillProgressPerVnode::Completed { .. } => {
// If completed, we don't need to continue reading snapshot.
continue;
}
BackfillProgressPerVnode::InProgress { current_pos, .. } => {
Some(current_pos.clone())
}
};
Expand All @@ -718,12 +753,7 @@ where
)
.await?;

let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter));

let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));

let vnode_row_iter = Box::pin(vnode_row_iter);

let vnode_row_iter = Box::pin(owned_row_iter_with_vnode(vnode, vnode_row_iter));
iterators.push(vnode_row_iter);
}

Expand Down
22 changes: 22 additions & 0 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,28 @@ pub(crate) fn compute_bounds(
}
}

pub(crate) enum RowAndVnodeStreamItem {
Finished { vnode: VirtualNode },
Record { vnode: VirtualNode, row: OwnedRow },
}

#[try_stream(ok = RowAndVnodeStreamItem, error = StreamExecutorError)]
pub(crate) async fn owned_row_iter_with_vnode<S, E>(vnode: VirtualNode, storage_iter: S)
where
StreamExecutorError: From<E>,
S: Stream<Item = Result<KeyedRow<Bytes>, E>>,
{
pin_mut!(storage_iter);
while let Some(row) = storage_iter.next().await {
let row = row?;
yield RowAndVnodeStreamItem::Record {
vnode,
row: row.into_owned_row(),
}
}
yield RowAndVnodeStreamItem::Finished { vnode }
}

#[try_stream(ok = OwnedRow, error = StreamExecutorError)]
pub(crate) async fn owned_row_iter<S, E>(storage_iter: S)
where
Expand Down

0 comments on commit 8241328

Please sign in to comment.