From 9eaf8fb0b01535a905a22fba6fc8e21394990e16 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 19 May 2023 18:17:33 +0800 Subject: [PATCH] fix --- src/stream/src/executor/backfill.rs | 68 ++++++++++++----------------- 1 file changed, 29 insertions(+), 39 deletions(-) diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 3b3a9e134c99a..08385572f4de7 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -24,11 +24,11 @@ use futures::{pin_mut, stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::buffer::{Bitmap, BitmapBuilder}; +use risingwave_common::buffer::{BitmapBuilder}; use risingwave_common::catalog::Schema; -use risingwave_common::hash::VirtualNode; + use risingwave_common::row::{self, OwnedRow, Row, RowExt}; -use risingwave_common::types::{Datum, ToOwnedDatum}; +use risingwave_common::types::{Datum}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{cmp_datum, OrderType}; @@ -132,7 +132,7 @@ where let pk_order = self.upstream_table.pk_serializer().get_order_types(); - let dist_key_in_pk = self.dist_key_in_pk; + let _dist_key_in_pk = self.dist_key_in_pk; let upstream_table_id = self.upstream_table.table_id().table_id; @@ -184,7 +184,7 @@ where let mut current_pos: Option = None; // Use this to track old persisted state. - let mut old_state: Option = None; + let mut old_state: Option> = None; // Keep track of rows from the snapshot. let mut total_snapshot_processed_rows: u64 = 0; @@ -292,7 +292,6 @@ where barrier.epoch, &mut self.state_table, false, - &dist_key_in_pk, ¤t_pos, &mut old_state, ) @@ -368,7 +367,6 @@ where barrier.epoch, &mut self.state_table, true, - &dist_key_in_pk, ¤t_pos, &mut old_state, ) @@ -511,15 +509,14 @@ where epoch: EpochPair, table: &mut StateTable, is_finished: bool, - dist_key_in_pk: &[usize], current_pos: &Option, - old_state: &mut Option, + old_state: &mut Option>, ) -> StreamExecutorResult<()> { if let Some(current_pos_inner) = current_pos { // state w/o vnodes. let current_partial_state = Self::build_temporary_state(is_finished, current_pos_inner); - Self::flush_data(table, epoch, old_state, current_partial_state).await?; - // *old_state = Some(current_state); + Self::flush_data(table, epoch, old_state, current_partial_state.clone()).await?; + *old_state = Some(current_partial_state); } else { table.commit_no_data_expected(epoch); } @@ -528,29 +525,34 @@ where /// For `current_pos` and `old_pos` are just pk of upstream. /// They should be strictly increasing. - /// FIXME(kwannoel): Currently state table expects a Primary Key. - /// For that we use vnode, so we can update position per vnode to support hash-distributed - /// backfill. - /// However this also means that it computes a new `vnode` partition for each vnode. - /// State table interface should be updated, such that it can reuse this `vnode` - /// as both `PRIMARY KEY` and `vnode`. + // FIXME(kwannoel): Currently state table expects a Primary Key. + // For that we use vnode, so we can update position per vnode to support hash-distributed + // backfill. + // However this also means that it computes a new `vnode` partition for each vnode. + // State table interface should be updated, such that it can reuse this `vnode` + // as both `PRIMARY KEY` and `vnode`. async fn flush_data( table: &mut StateTable, epoch: EpochPair, - old_state: &Option, + old_state: &Option>, mut current_partial_state: Vec, ) -> StreamExecutorResult<()> { let vnodes = table.vnodes().clone(); if let Some(old_state) = old_state { - vnodes.iter_ones().for_each(|vnode| { - let datum = Some((vnode as i16).into()); - // fill the state - current_partial_state[0] = datum; - table.write_record(Record::Update { - old_row: old_state.as_inner(), - new_row: ¤t_partial_state[..], - }) - }); + if *old_state != current_partial_state { + vnodes.iter_ones().for_each(|vnode| { + let datum = Some((vnode as i16).into()); + // fill the state + current_partial_state[0] = datum; + table.write_record(Record::Update { + old_row: &old_state[..], + new_row: ¤t_partial_state[..], + }) + }); + } else { + table.commit_no_data_expected(epoch); + return Ok(()); + } } else { vnodes.iter_ones().for_each(|vnode| { let datum = Some((vnode as i16).into()); @@ -573,18 +575,6 @@ where row_state } - // /// Iterator through all vnodes - // fn build_new_state<'a>( - // vnodes: &Bitmap, - // temporary_state: &'a mut [Datum], - // ) -> impl Iterator { - // vnodes.iter_ones().map(|vnode| { - // let datum = Some((vnode as i16).into()); - // temporary_state[0] = datum; - // temporary_state - // }) - // } - fn update_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> Option { Some( chunk