From 5d45f27e10d20e2fc13fd8e909d4ba303a446283 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sat, 13 May 2023 12:48:35 +0800 Subject: [PATCH] fix initial epoch + write to state store if not backfill --- src/stream/src/executor/backfill.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 1eb179a5ff5b0..7c5da221ea1bc 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -131,7 +131,7 @@ where // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; - let init_epoch = first_barrier.epoch.prev; + let init_epoch = first_barrier.epoch.curr; self.state_table.init_epoch(first_barrier.epoch); // If the barrier is a conf change of creating this mview, we follow the procedure of @@ -150,6 +150,21 @@ where if to_create_mv && is_snapshot_empty { // Directly finish the progress as the snapshot is empty. self.progress.finish(first_barrier.epoch.curr); + // Persist state on barrier + if let Some(current_pos) = ¤t_pos { + let mut current_pos: Vec = current_pos.as_inner().into(); + let mut current_pos = Vec![None; pk_indices.len()]; + current_pos.push(Some(false.into())); + let current_pos = OwnedRow::new(current_pos); + Self::flush_data( + &mut self.state_table, + barrier.epoch, + old_pos.as_ref(), + ¤t_pos, + ) + .await?; + old_pos = Some(current_pos); + } } // The first barrier message should be propagated. @@ -229,6 +244,7 @@ where #[for_await] for either in backfill_stream { match either { + // Upstream Either::Left(msg) => { match msg? { Message::Barrier(barrier) => { @@ -305,6 +321,7 @@ where } } } + // Snapshot read Either::Right(msg) => { match msg? { None => {