diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 21746bd355487..229ac599076b5 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -136,10 +136,29 @@ where let init_epoch = first_barrier.epoch.prev; self.state_table.init_epoch(first_barrier.epoch); - // If the internal persisted state is "finished" for this executor, we are done, no need - // to_create_mv - // FIXME(kwannoel): This is unimplemented. TODO: Remove the line below once complete. - let to_create_mv = first_barrier.is_newly_added(self.actor_id); + // If the internal persisted state is "finished" for this executor, + // we are done, no need to_create_mv + // We can just check 1 vnode, since we currently don't permit + // partially complete backfill. + // TODO(kwannoel): For background ddl, we need to consider case where + // some vnodes are partially complete, and others are finished. + let arbitrary_vnode = self + .state_table + .vnodes() + .iter_ones() + .next() + .expect("All executors should have vnode"); + let key: &[Datum] = &[Some((arbitrary_vnode as i16).into())]; + let row = self.state_table.get_row(key).await?; + let is_finished = if let Some(row) = row + && let Some(is_finished) = row.datum_at(state_len -1) + { + println!("is_finished? {:?}", is_finished); + is_finished.into_bool() + } else { + false + }; + // If the snapshot is empty, we don't need to backfill. let is_snapshot_empty: bool = { let snapshot = Self::snapshot_read(&self.upstream_table, init_epoch, None, false); @@ -147,9 +166,9 @@ where snapshot.try_next().await?.unwrap().is_none() }; // Whether we still need to backfill - let to_backfill = to_create_mv && !is_snapshot_empty; + let to_backfill = !is_finished && !is_snapshot_empty; - if to_create_mv && is_snapshot_empty { + if !to_backfill { // Directly finish the progress as the snapshot is empty. self.progress.finish(first_barrier.epoch.curr); }