Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 15, 2023
1 parent 8b70f38 commit 28bc128
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub enum Distribution {
/// [`Distribution::HashShard`], but may have different vnode mapping.
///
/// It exists because the upstream MV can be scaled independently. So we use
/// `UpstreamHashShard` to force an exchange is inserted.
/// `UpstreamHashShard` to **force an exchange to be inserted**.
///
/// Alternatively, [`Distribution::SomeShard`] can also be used to insert an exchange, but
/// `UpstreamHashShard` contains distribution keys, which might be useful in some cases, e.g.,
Expand Down
13 changes: 1 addition & 12 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
let init_epoch = first_barrier.epoch.curr;
let init_epoch = first_barrier.epoch.prev;
self.state_table.init_epoch(first_barrier.epoch);

// If the barrier is a conf change of creating this mview, we follow the procedure of
Expand All @@ -150,17 +150,6 @@ where
if to_create_mv && is_snapshot_empty {
// Directly finish the progress as the snapshot is empty.
self.progress.finish(first_barrier.epoch.curr);
// Persist state on barrier
let mut current_pos = vec![None; pk_in_output_indices.len()];
current_pos.push(Some(false.into()));
let current_pos = OwnedRow::new(current_pos);
Self::flush_data(
&mut self.state_table,
first_barrier.epoch,
None,
&current_pos,
)
.await?;
}

// The first barrier message should be propagated.
Expand Down

0 comments on commit 28bc128

Please sign in to comment.