Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 19, 2023
1 parent ce0aa3c commit 9eaf8fb
Showing 1 changed file with 29 additions and 39 deletions.
68 changes: 29 additions & 39 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::buffer::{BitmapBuilder};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::VirtualNode;

use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::types::{Datum, ToOwnedDatum};
use risingwave_common::types::{Datum};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{cmp_datum, OrderType};
Expand Down Expand Up @@ -132,7 +132,7 @@ where

let pk_order = self.upstream_table.pk_serializer().get_order_types();

let dist_key_in_pk = self.dist_key_in_pk;
let _dist_key_in_pk = self.dist_key_in_pk;

let upstream_table_id = self.upstream_table.table_id().table_id;

Expand Down Expand Up @@ -184,7 +184,7 @@ where
let mut current_pos: Option<OwnedRow> = None;

// Use this to track old persisted state.
let mut old_state: Option<OwnedRow> = None;
let mut old_state: Option<Vec<Datum>> = None;

// Keep track of rows from the snapshot.
let mut total_snapshot_processed_rows: u64 = 0;
Expand Down Expand Up @@ -292,7 +292,6 @@ where
barrier.epoch,
&mut self.state_table,
false,
&dist_key_in_pk,
&current_pos,
&mut old_state,
)
Expand Down Expand Up @@ -368,7 +367,6 @@ where
barrier.epoch,
&mut self.state_table,
true,
&dist_key_in_pk,
&current_pos,
&mut old_state,
)
Expand Down Expand Up @@ -511,15 +509,14 @@ where
epoch: EpochPair,
table: &mut StateTable<S>,
is_finished: bool,
dist_key_in_pk: &[usize],
current_pos: &Option<OwnedRow>,
old_state: &mut Option<OwnedRow>,
old_state: &mut Option<Vec<Datum>>,
) -> StreamExecutorResult<()> {
if let Some(current_pos_inner) = current_pos {
// state w/o vnodes.
let current_partial_state = Self::build_temporary_state(is_finished, current_pos_inner);
Self::flush_data(table, epoch, old_state, current_partial_state).await?;
// *old_state = Some(current_state);
Self::flush_data(table, epoch, old_state, current_partial_state.clone()).await?;
*old_state = Some(current_partial_state);
} else {
table.commit_no_data_expected(epoch);
}
Expand All @@ -528,29 +525,34 @@ where

/// For `current_pos` and `old_pos` are just pk of upstream.
/// They should be strictly increasing.
/// FIXME(kwannoel): Currently state table expects a Primary Key.
/// For that we use vnode, so we can update position per vnode to support hash-distributed
/// backfill.
/// However this also means that it computes a new `vnode` partition for each vnode.
/// State table interface should be updated, such that it can reuse this `vnode`
/// as both `PRIMARY KEY` and `vnode`.
// FIXME(kwannoel): Currently state table expects a Primary Key.
// For that we use vnode, so we can update position per vnode to support hash-distributed
// backfill.
// However this also means that it computes a new `vnode` partition for each vnode.
// State table interface should be updated, such that it can reuse this `vnode`
// as both `PRIMARY KEY` and `vnode`.
async fn flush_data(
table: &mut StateTable<S>,
epoch: EpochPair,
old_state: &Option<OwnedRow>,
old_state: &Option<Vec<Datum>>,
mut current_partial_state: Vec<Datum>,
) -> StreamExecutorResult<()> {
let vnodes = table.vnodes().clone();
if let Some(old_state) = old_state {
vnodes.iter_ones().for_each(|vnode| {
let datum = Some((vnode as i16).into());
// fill the state
current_partial_state[0] = datum;
table.write_record(Record::Update {
old_row: old_state.as_inner(),
new_row: &current_partial_state[..],
})
});
if *old_state != current_partial_state {
vnodes.iter_ones().for_each(|vnode| {
let datum = Some((vnode as i16).into());
// fill the state
current_partial_state[0] = datum;
table.write_record(Record::Update {
old_row: &old_state[..],
new_row: &current_partial_state[..],
})
});
} else {
table.commit_no_data_expected(epoch);
return Ok(());
}
} else {
vnodes.iter_ones().for_each(|vnode| {
let datum = Some((vnode as i16).into());
Expand All @@ -573,18 +575,6 @@ where
row_state
}

// /// Iterator through all vnodes
// fn build_new_state<'a>(
// vnodes: &Bitmap,
// temporary_state: &'a mut [Datum],
// ) -> impl Iterator<Item = &mut [Datum]> {
// vnodes.iter_ones().map(|vnode| {
// let datum = Some((vnode as i16).into());
// temporary_state[0] = datum;
// temporary_state
// })
// }

fn update_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> Option<OwnedRow> {
Some(
chunk
Expand Down

0 comments on commit 9eaf8fb

Please sign in to comment.