Skip to content

Commit

Permalink
no ordered
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Dec 21, 2022
1 parent fa12413 commit c80435e
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ where
let is_create_mv = first_barrier.is_add_dispatcher(self.actor_id);
// If the snapshot is empty, we don't need to backfill.
let is_snapshot_empty: bool = {
let snapshot = Self::snapshot_read(&self.table, init_epoch, None);
let snapshot = Self::snapshot_read(&self.table, init_epoch, None, false);
pin_mut!(snapshot);
snapshot.next().await.is_none()
};
Expand Down Expand Up @@ -180,7 +180,7 @@ where
let left_upstream = upstream.by_ref().map(Either::Left);

let right_snapshot = Box::pin(
Self::snapshot_read(&self.table, snapshot_read_epoch, current_pos.clone())
Self::snapshot_read(&self.table, snapshot_read_epoch, current_pos.clone(), true)
.map(Either::Right),
);

Expand Down Expand Up @@ -294,7 +294,12 @@ where

#[expect(clippy::needless_lifetimes, reason = "code generated by try_stream")]
#[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
async fn snapshot_read(table: &StorageTable<S>, epoch: u64, current_pos: Option<OwnedRow>) {
async fn snapshot_read(
table: &StorageTable<S>,
epoch: u64,
current_pos: Option<OwnedRow>,
ordered: bool,
) {
// `current_pos` is None means it needs to scan from the beginning, so we use Unbounded to
// scan. Otherwise, use Excluded.
let range_bounds = if let Some(current_pos) = current_pos {
Expand All @@ -309,7 +314,7 @@ where
HummockReadEpoch::NoWait(epoch),
OwnedRow::empty(),
range_bounds,
true,
ordered,
)
.await?;

Expand Down

0 comments on commit c80435e

Please sign in to comment.