From 181680b58fdbbadf49f8c672d3fc8d43fe84d6c0 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 21 Dec 2022 21:46:55 +0800 Subject: [PATCH] fix option item and progress Signed-off-by: Bugen Zhao --- src/stream/src/executor/backfill.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 414b4ce1511df..9607dd62345b3 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -18,7 +18,7 @@ use std::ops::Bound; use async_stack_trace::StackTrace; use either::Either; use futures::stream::select_with_strategy; -use futures::{pin_mut, stream, StreamExt}; +use futures::{pin_mut, stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; @@ -119,13 +119,15 @@ where let is_snapshot_empty: bool = { let snapshot = Self::snapshot_read(&self.table, init_epoch, None, false); pin_mut!(snapshot); - snapshot.next().await.is_none() + snapshot.try_next().await?.unwrap().is_none() }; let to_backfill = is_create_mv && !is_snapshot_empty; if !to_backfill { - // Directly finish the progress. For recovery, this is a no-op. - self.progress.finish(first_barrier.epoch.curr); + if is_create_mv { + // Directly finish the progress. For recovery, this is a no-op. + self.progress.finish(first_barrier.epoch.curr); + } // The first barrier message should be propagated. yield Message::Barrier(first_barrier);