From c33d8d578d8b9dfd8338e1a8c22e35963bdbde09 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 19 May 2023 19:03:23 +0800 Subject: [PATCH] fmt --- src/stream/src/executor/backfill.rs | 34 ++++++++++++++++++----------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index bfdf7e3e4e13..21746bd35548 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -123,6 +123,7 @@ where async fn execute_inner(mut self) { // The primary key columns, in the output columns of the upstream_table scan. let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap(); + let state_len = pk_in_output_indices.len() + 2; // +1 for backfill_finished, +1 for vnode key. let pk_order = self.upstream_table.pk_serializer().get_order_types(); @@ -175,7 +176,12 @@ where // `None` means it starts from the beginning. let mut current_pos: Option = None; - // Use this to track old persisted state. + // Use these to persist state. + // They contain the backfill position, + // as well as the progress. + // However, they do not contain the vnode mapping at index 0. + // That is filled in when we flush the state table. + let mut current_state: Vec = vec![None; state_len]; let mut old_state: Option> = None; // Keep track of rows from the snapshot. @@ -286,6 +292,7 @@ where false, ¤t_pos, &mut old_state, + &mut current_state, ) .await?; @@ -361,6 +368,7 @@ where true, ¤t_pos, &mut old_state, + &mut current_state, ) .await?; yield msg; @@ -503,12 +511,13 @@ where is_finished: bool, current_pos: &Option, old_state: &mut Option>, + current_state: &mut [Datum], ) -> StreamExecutorResult<()> { if let Some(current_pos_inner) = current_pos { // state w/o vnodes. - let current_partial_state = Self::build_temporary_state(is_finished, current_pos_inner); - Self::flush_data(table, epoch, old_state, current_partial_state.clone()).await?; - *old_state = Some(current_partial_state); + Self::build_temporary_state(current_state, is_finished, current_pos_inner); + Self::flush_data(table, epoch, old_state, current_state).await?; + *old_state = Some(current_state.into()); } else { table.commit_no_data_expected(epoch); } @@ -526,20 +535,21 @@ where async fn flush_data( table: &mut StateTable, epoch: EpochPair, - old_state: &Option>, - mut current_partial_state: Vec, + old_state: &mut Option>, + current_partial_state: &mut [Datum], ) -> StreamExecutorResult<()> { let vnodes = table.vnodes().clone(); if let Some(old_state) = old_state { // There are updates to existing state, persist. - if *old_state != current_partial_state { + if old_state[1..] != current_partial_state[1..] { vnodes.iter_ones().for_each(|vnode| { let datum = Some((vnode as i16).into()); // fill the state - current_partial_state[0] = datum; + current_partial_state[0] = datum.clone(); + old_state[0] = datum; table.write_record(Record::Update { old_row: &old_state[..], - new_row: ¤t_partial_state[..], + new_row: &(*current_partial_state), }) }); } else { @@ -554,7 +564,7 @@ where // fill the state current_partial_state[0] = datum; table.write_record(Record::Insert { - new_row: ¤t_partial_state[..], + new_row: &(*current_partial_state), }) }); } @@ -563,11 +573,9 @@ where // We want to avoid building a row for every vnode. // Instead we can just modify a single row, and dispatch it to state table to write. - fn build_temporary_state(is_finished: bool, current_pos: &OwnedRow) -> Vec { - let mut row_state = vec![None; current_pos.len() + 2]; + fn build_temporary_state(row_state: &mut [Datum], is_finished: bool, current_pos: &OwnedRow) { row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner()); row_state[current_pos.len() + 1] = Some(is_finished.into()); - row_state } fn update_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> Option {