Skip to content

Commit

Permalink
use row iter with vnode
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Mar 20, 2024
1 parent 8edc97c commit 981ba8a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 15 deletions.
52 changes: 38 additions & 14 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ use std::sync::Arc;

use either::Either;
use futures::stream::{select_all, select_with_strategy};
use futures::{stream, StreamExt, TryStreamExt};
use futures::{stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::OwnedRow;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
Expand All @@ -34,8 +33,8 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message,
mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode,
BackfillProgressPerVnode, BackfillState,
mark_chunk_ref_by_vnode, owned_row_iter_with_vnode, persist_state_per_vnode,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillState, RowAndVnodeStreamItem,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Expand Down Expand Up @@ -323,7 +322,7 @@ where
.inc_by(cur_barrier_upstream_processed_rows);
break 'backfill_loop;
}
Some((vnode, row)) => {
Some(RowAndVnodeStreamItem::Record { vnode, row }) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Self::handle_snapshot_chunk(
Expand All @@ -337,6 +336,26 @@ where
)?;
}
}
Some(RowAndVnodeStreamItem::Finished { vnode }) => {
// Consume remaining rows in the builder.
let builder = builders.get_mut(&vnode).unwrap();
if let Some(data_chunk) = builder.consume_all() {
yield Self::handle_snapshot_chunk(
data_chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?;
}
// Update state for that vnode to finished.
backfill_state.finish_progress(
vnode,
upstream_table.pk_indices().len(),
);
}
}
}
}
Expand All @@ -360,10 +379,13 @@ where
// End of the snapshot read stream.
// We let the barrier handling logic take care of upstream updates.
// But we still want to exit backfill loop, so we mark snapshot read complete.
// Note that we don't have to drain builders here,
// since we did not do snapshot read at all,
// so there should be no rows in the builders.
snapshot_read_complete = true;
break;
}
Some((vnode, row)) => {
Some(RowAndVnodeStreamItem::Record { vnode, row }) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Self::handle_snapshot_chunk(
Expand All @@ -379,6 +401,13 @@ where

break;
}
Some(RowAndVnodeStreamItem::Finished { vnode }) => {
// Update state for that vnode to finished.
backfill_state
.finish_progress(vnode, upstream_table.pk_indices().len());
// But some other vnodes may not be finished yet.
continue;
}
}
}
}
Expand Down Expand Up @@ -584,7 +613,7 @@ where
}
}

#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
#[try_stream(ok = Option<RowAndVnodeStreamItem>, error = StreamExecutorError)]
async fn make_snapshot_stream(
upstream_table: &ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
Expand Down Expand Up @@ -656,7 +685,7 @@ where
/// remaining data in `builder` must be flushed manually.
/// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
/// present, Then when we flush we contain duplicate rows.
#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
#[try_stream(ok = Option<RowAndVnodeStreamItem>, error = StreamExecutorError)]
async fn snapshot_read_per_vnode(
upstream_table: &ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
Expand Down Expand Up @@ -695,12 +724,7 @@ where
)
.await?;

let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter));

let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row));

let vnode_row_iter = Box::pin(vnode_row_iter);

let vnode_row_iter = Box::pin(owned_row_iter_with_vnode(vnode, vnode_row_iter));
iterators.push(vnode_row_iter);
}

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ pub(crate) enum RowAndVnodeStreamItem {
}

#[try_stream(ok = RowAndVnodeStreamItem, error = StreamExecutorError)]
pub(crate) async fn owned_row_iter_with_vnode<S, E>(storage_iter: S, vnode: VirtualNode)
pub(crate) async fn owned_row_iter_with_vnode<S, E>(vnode: VirtualNode, storage_iter: S)
where
StreamExecutorError: From<E>,
S: Stream<Item = Result<KeyedRow<Bytes>, E>>,
Expand Down

0 comments on commit 981ba8a

Please sign in to comment.