Skip to content

Commit

Permalink
fix initial epoch + write to state store if not backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 15, 2023
1 parent 44ea97a commit 5d45f27
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
let init_epoch = first_barrier.epoch.prev;
let init_epoch = first_barrier.epoch.curr;
self.state_table.init_epoch(first_barrier.epoch);

// If the barrier is a conf change of creating this mview, we follow the procedure of
Expand All @@ -150,6 +150,21 @@ where
if to_create_mv && is_snapshot_empty {
// Directly finish the progress as the snapshot is empty.
self.progress.finish(first_barrier.epoch.curr);
// Persist state on barrier
if let Some(current_pos) = &current_pos {
let mut current_pos: Vec<Datum> = current_pos.as_inner().into();
let mut current_pos = Vec![None; pk_indices.len()];
current_pos.push(Some(false.into()));
let current_pos = OwnedRow::new(current_pos);
Self::flush_data(
&mut self.state_table,
barrier.epoch,
old_pos.as_ref(),
&current_pos,
)
.await?;
old_pos = Some(current_pos);
}
}

// The first barrier message should be propagated.
Expand Down Expand Up @@ -229,6 +244,7 @@ where
#[for_await]
for either in backfill_stream {
match either {
// Upstream
Either::Left(msg) => {
match msg? {
Message::Barrier(barrier) => {
Expand Down Expand Up @@ -305,6 +321,7 @@ where
}
}
}
// Snapshot read
Either::Right(msg) => {
match msg? {
None => {
Expand Down

0 comments on commit 5d45f27

Please sign in to comment.