Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 19, 2023
1 parent 006e783 commit c33d8d5
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ where
async fn execute_inner(mut self) {
// The primary key columns, in the output columns of the upstream_table scan.
let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap();
let state_len = pk_in_output_indices.len() + 2; // +1 for backfill_finished, +1 for vnode key.

let pk_order = self.upstream_table.pk_serializer().get_order_types();

Expand Down Expand Up @@ -175,7 +176,12 @@ where
// `None` means it starts from the beginning.
let mut current_pos: Option<OwnedRow> = None;

// Use this to track old persisted state.
// Use these to persist state.
// They contain the backfill position,
// as well as the progress.
// However, they do not contain the vnode mapping at index 0.
// That is filled in when we flush the state table.
let mut current_state: Vec<Datum> = vec![None; state_len];
let mut old_state: Option<Vec<Datum>> = None;

// Keep track of rows from the snapshot.
Expand Down Expand Up @@ -286,6 +292,7 @@ where
false,
&current_pos,
&mut old_state,
&mut current_state,
)
.await?;

Expand Down Expand Up @@ -361,6 +368,7 @@ where
true,
&current_pos,
&mut old_state,
&mut current_state,
)
.await?;
yield msg;
Expand Down Expand Up @@ -503,12 +511,13 @@ where
is_finished: bool,
current_pos: &Option<OwnedRow>,
old_state: &mut Option<Vec<Datum>>,
current_state: &mut [Datum],
) -> StreamExecutorResult<()> {
if let Some(current_pos_inner) = current_pos {
// state w/o vnodes.
let current_partial_state = Self::build_temporary_state(is_finished, current_pos_inner);
Self::flush_data(table, epoch, old_state, current_partial_state.clone()).await?;
*old_state = Some(current_partial_state);
Self::build_temporary_state(current_state, is_finished, current_pos_inner);
Self::flush_data(table, epoch, old_state, current_state).await?;
*old_state = Some(current_state.into());
} else {
table.commit_no_data_expected(epoch);
}
Expand All @@ -526,20 +535,21 @@ where
async fn flush_data(
table: &mut StateTable<S>,
epoch: EpochPair,
old_state: &Option<Vec<Datum>>,
mut current_partial_state: Vec<Datum>,
old_state: &mut Option<Vec<Datum>>,
current_partial_state: &mut [Datum],
) -> StreamExecutorResult<()> {
let vnodes = table.vnodes().clone();
if let Some(old_state) = old_state {
// There are updates to existing state, persist.
if *old_state != current_partial_state {
if old_state[1..] != current_partial_state[1..] {
vnodes.iter_ones().for_each(|vnode| {
let datum = Some((vnode as i16).into());
// fill the state
current_partial_state[0] = datum;
current_partial_state[0] = datum.clone();
old_state[0] = datum;
table.write_record(Record::Update {
old_row: &old_state[..],
new_row: &current_partial_state[..],
new_row: &(*current_partial_state),
})
});
} else {
Expand All @@ -554,7 +564,7 @@ where
// fill the state
current_partial_state[0] = datum;
table.write_record(Record::Insert {
new_row: &current_partial_state[..],
new_row: &(*current_partial_state),
})
});
}
Expand All @@ -563,11 +573,9 @@ where

// We want to avoid building a row for every vnode.
// Instead we can just modify a single row, and dispatch it to state table to write.
fn build_temporary_state(is_finished: bool, current_pos: &OwnedRow) -> Vec<Datum> {
let mut row_state = vec![None; current_pos.len() + 2];
fn build_temporary_state(row_state: &mut [Datum], is_finished: bool, current_pos: &OwnedRow) {
row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
row_state[current_pos.len() + 1] = Some(is_finished.into());
row_state
}

fn update_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> Option<OwnedRow> {
Expand Down

0 comments on commit c33d8d5

Please sign in to comment.