Skip to content

Commit

Permalink
add state check
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 19, 2023
1 parent c33d8d5 commit cbd8f08
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,39 @@ where
let init_epoch = first_barrier.epoch.prev;
self.state_table.init_epoch(first_barrier.epoch);

// If the internal persisted state is "finished" for this executor, we are done, no need
// to_create_mv
// FIXME(kwannoel): This is unimplemented. TODO: Remove the line below once complete.
let to_create_mv = first_barrier.is_newly_added(self.actor_id);
// If the internal persisted state is "finished" for this executor,
// we are done, no need to_create_mv
// We can just check 1 vnode, since we currently don't permit
// partially complete backfill.
// TODO(kwannoel): For background ddl, we need to consider case where
// some vnodes are partially complete, and others are finished.
let arbitrary_vnode = self
.state_table
.vnodes()
.iter_ones()
.next()
.expect("All executors should have vnode");
let key: &[Datum] = &[Some((arbitrary_vnode as i16).into())];
let row = self.state_table.get_row(key).await?;
let is_finished = if let Some(row) = row
&& let Some(is_finished) = row.datum_at(state_len -1)
{
println!("is_finished? {:?}", is_finished);
is_finished.into_bool()
} else {
false
};

// If the snapshot is empty, we don't need to backfill.
let is_snapshot_empty: bool = {
let snapshot = Self::snapshot_read(&self.upstream_table, init_epoch, None, false);
pin_mut!(snapshot);
snapshot.try_next().await?.unwrap().is_none()
};
// Whether we still need to backfill
let to_backfill = to_create_mv && !is_snapshot_empty;
let to_backfill = !is_finished && !is_snapshot_empty;

if to_create_mv && is_snapshot_empty {
if !to_backfill {
// Directly finish the progress as the snapshot is empty.
self.progress.finish(first_barrier.epoch.curr);
}
Expand Down

0 comments on commit cbd8f08

Please sign in to comment.