Skip to content

Commit

Permalink
fix option item and progress
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Dec 21, 2022
1 parent c80435e commit 181680b
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::ops::Bound;
use async_stack_trace::StackTrace;
use either::Either;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt};
use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
Expand Down Expand Up @@ -119,13 +119,15 @@ where
let is_snapshot_empty: bool = {
let snapshot = Self::snapshot_read(&self.table, init_epoch, None, false);
pin_mut!(snapshot);
snapshot.next().await.is_none()
snapshot.try_next().await?.unwrap().is_none()
};
let to_backfill = is_create_mv && !is_snapshot_empty;

if !to_backfill {
// Directly finish the progress. For recovery, this is a no-op.
self.progress.finish(first_barrier.epoch.curr);
if is_create_mv {
// Directly finish the progress. For recovery, this is a no-op.
self.progress.finish(first_barrier.epoch.curr);
}

// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);
Expand Down

0 comments on commit 181680b

Please sign in to comment.