Skip to content

Commit

Permalink
fix logic if we receive some other msg instead of barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 15, 2023
1 parent 9238c0b commit 44ea97a
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,29 +357,40 @@ where
"Backfill has already finished and forward messages directly to the downstream"
);

// Set persist state to finish on next barrier.
if let Some(Ok(msg)) = upstream.next().await {
if let Some(barrier) = msg.as_barrier() {
self.progress.finish(barrier.epoch.curr);

// Since we are done, we should persist this to
// upstream.
let mut current_pos: Vec<Datum> = current_pos.clone().unwrap().as_inner().into();
current_pos.push(Some(true.into()));
let current_pos = OwnedRow::new(current_pos);
Self::flush_data(
&mut self.state_table,
barrier.epoch,
old_pos.as_ref(),
&current_pos,
)
.await?;
// Wait for first barrier after backfill finish to come
// so we can update our progress + persist the status.
while let Some(Ok(msg)) = upstream.next().await {
match &msg {
// Set persist state to finish on next barrier.
Message::Barrier(barrier) => {
self.progress.finish(barrier.epoch.curr);
let mut current_pos: Vec<Datum> =
current_pos.clone().unwrap().as_inner().into();
current_pos.push(Some(true.into()));
let current_pos = OwnedRow::new(current_pos);
Self::flush_data(
&mut self.state_table,
barrier.epoch,
old_pos.as_ref(),
&current_pos,
)
.await?;
yield msg;
break;
}

// If it's not a barrier, just forward
Message::Chunk(_) | Message::Watermark(_) => {
if let Some(msg) = Self::mapping_message(msg, &self.output_indices) {
yield msg;
}
}
}
yield msg;
}

// Can forward messages directly to the downstream,
// Backfill is finished.
// After progress finished + state persisted,
// we can forward messages directly to the downstream,
// as backfill is finished.
#[for_await]
for msg in upstream {
if let Some(msg) = Self::mapping_message(msg?, &self.output_indices) {
Expand Down

0 comments on commit 44ea97a

Please sign in to comment.