diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index 679c7b4709da6..bea2090886ce6 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -80,7 +80,7 @@ pub enum Distribution { /// [`Distribution::HashShard`], but may have different vnode mapping. /// /// It exists because the upstream MV can be scaled independently. So we use - /// `UpstreamHashShard` to force an exchange is inserted. + /// `UpstreamHashShard` to **force an exchange to be inserted**. /// /// Alternatively, [`Distribution::SomeShard`] can also be used to insert an exchange, but /// `UpstreamHashShard` contains distribution keys, which might be useful in some cases, e.g., diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 52cae97769309..7bef075a8bd2c 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -130,7 +130,7 @@ where // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; - let init_epoch = first_barrier.epoch.curr; + let init_epoch = first_barrier.epoch.prev; self.state_table.init_epoch(first_barrier.epoch); // If the barrier is a conf change of creating this mview, we follow the procedure of @@ -149,17 +149,6 @@ where if to_create_mv && is_snapshot_empty { // Directly finish the progress as the snapshot is empty. self.progress.finish(first_barrier.epoch.curr); - // Persist state on barrier - let mut current_pos = vec![None; pk_in_output_indices.len()]; - current_pos.push(Some(false.into())); - let current_pos = OwnedRow::new(current_pos); - Self::flush_data( - &mut self.state_table, - first_barrier.epoch, - None, - ¤t_pos, - ) - .await?; } // The first barrier message should be propagated.