Skip to content

Commit

Permalink
fix warn
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 29, 2024
1 parent 9474e66 commit 87e2760
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use risingwave_common::catalog::Schema;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::OwnedRow;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;
Expand All @@ -35,9 +34,9 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
#[cfg(debug_assertions)]
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, get_progress_per_vnode, iter_chunks, mapping_chunk,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillState,
compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message,
mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode,
BackfillProgressPerVnode, BackfillState,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Expand Down Expand Up @@ -277,7 +276,7 @@ where
match msg? {
None => {
// Consume remaining rows in the builder.
for (vnode, builder) in builders.iter_mut() {
for (vnode, builder) in &mut builders {
if let Some(data_chunk) = builder.consume_all() {
yield Self::handle_snapshot_chunk(
data_chunk,
Expand Down Expand Up @@ -506,6 +505,10 @@ where
yield Message::Barrier(barrier);

// We will switch snapshot at the start of the next iteration of the backfill loop.
// Unless snapshot read is already completed.
if snapshot_read_complete {
break 'backfill_loop;
}
}
}

Expand Down Expand Up @@ -584,8 +587,8 @@ where
}

#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
async fn make_snapshot_stream<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
async fn make_snapshot_stream(
upstream_table: &ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
paused: bool,
) {
Expand Down Expand Up @@ -649,8 +652,8 @@ where
/// Otherwise when we scan a new snapshot, it is possible the rows in the `builder` would be
/// present, Then when we flush we contain duplicate rows.
#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
async fn snapshot_read_per_vnode<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
async fn snapshot_read_per_vnode(
upstream_table: &ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
) {
let mut iterators = vec![];
Expand Down

0 comments on commit 87e2760

Please sign in to comment.