Skip to content

Commit

Permalink
add more trace logs + fix progress.finish bug + improve test
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 2, 2023
1 parent 3ac2895 commit f316af9
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 18 deletions.
29 changes: 24 additions & 5 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,11 +464,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
if let Message::Barrier(barrier) = &msg {
// persist the backfill state
state_impl.commit_state(barrier.epoch).await?;

// mark progress as finished
if let Some(progress) = self.progress.as_mut() {
progress.finish(barrier.epoch.curr, total_snapshot_processed_rows);
}
yield msg;
// break after the state have been saved
break;
Expand All @@ -477,6 +472,30 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}

// NOTE(kwannoel):
// Progress can only be finished after at least 1 barrier.
// This is to make sure that downstream state table is flushed,
// before the mview is made visible.
// Otherwise the mview could be inconsistent with upstream.
// It also cannot be immediately `finished()` after yielding the barrier,
// by using the current_epoch of that barrier, since that will now be the previous epoch.
// When notifying the global barrier manager, local barrier manager always uses the
// current epoch, so it won't see the `finished` state when that happens,
// leading to the stream job never finishing.
// Instead we must wait for the next barrier, and finish the progress there.
if let Some(progress) = self.progress.as_mut() {
while let Some(Ok(msg)) = upstream.next().await {
if let Message::Barrier(barrier) = &msg {
let epoch = barrier.epoch;
progress.finish(epoch.curr, total_snapshot_processed_rows);
yield msg;
break;
} else {
yield msg;
}
}
}

// After backfill progress finished
// we can forward messages directly to the downstream,
// as backfill is finished.
Expand Down
54 changes: 42 additions & 12 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ where
// just use the last row to update `current_pos`.
current_pos =
Some(get_new_pos(&chunk, &pk_in_output_indices));
tracing::trace!(
epoch = snapshot_read_epoch,
?current_pos,
"snapshot_read: update current_pos"
);

let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
Expand Down Expand Up @@ -338,6 +343,11 @@ where
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
current_pos = Some(get_new_pos(&chunk, &pk_in_output_indices));
tracing::trace!(
epoch = snapshot_read_epoch,
?current_pos,
"snapshot_read_before_barrier: update current_pos"
);

let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
Expand Down Expand Up @@ -372,6 +382,11 @@ where
let ops = vec![Op::Insert; chunk.capacity()];
let chunk = StreamChunk::from_parts(ops, chunk);
current_pos = Some(get_new_pos(&chunk, &pk_in_output_indices));
tracing::trace!(
epoch = ?barrier.epoch,
?current_pos,
"barrier: update current_pos from residual snapshot rows"
);

cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
Expand Down Expand Up @@ -456,6 +471,7 @@ where
if let Some(msg) = mapping_message(msg, &self.output_indices) {
// If not finished then we need to update state, otherwise no need.
if let Message::Barrier(barrier) = &msg {
let epoch = barrier.epoch;
if is_finished {
// If already finished, no need persist any state.
} else {
Expand All @@ -476,7 +492,7 @@ where
debug_assert_ne!(current_pos, None);

Self::persist_state(
barrier.epoch,
epoch,
&mut self.state_table,
true,
&current_pos,
Expand All @@ -486,29 +502,43 @@ where
)
.await?;
tracing::trace!(
epoch = ?barrier.epoch,
?epoch,
?current_pos,
total_snapshot_processed_rows,
"Backfill position persisted after completion"
);
}

// For both backfill finished before recovery,
// and backfill which just finished, we need to update mview tracker,
// it does not persist this information.
self.progress
.finish(barrier.epoch.curr, total_snapshot_processed_rows);
tracing::trace!(
epoch = ?barrier.epoch,
"Updated CreateMaterializedTracker"
);
yield msg;
break;
}
yield msg;
}
}

// NOTE(kwannoel):
// Progress can only be finished after at least 1 barrier.
// This is to make sure that downstream state table is flushed,
// before the mview is made visible.
// Otherwise the mview could be inconsistent with upstream.
// It also cannot be immediately `finished()` after yielding the barrier,
// by using the current_epoch of that barrier, since that will now be the previous epoch.
// When notifying the global barrier manager, local barrier manager always uses the
// current epoch, so it won't see the `finished` state when that happens,
// leading to the stream job never finishing.
// Instead we must wait for the next barrier, and finish the progress there.
while let Some(Ok(msg)) = upstream.next().await {
if let Message::Barrier(barrier) = &msg {
let epoch = barrier.epoch;
self.progress
.finish(epoch.curr, total_snapshot_processed_rows);
tracing::trace!(?epoch, "Updated CreateMaterializedTracker");
yield msg;
break;
} else {
yield msg;
}
}

tracing::trace!(
"Backfill has already finished and forward messages directly to the downstream"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
session.run(SEED_TABLE_500).await?;
session
.run("INSERT INTO t SELECT generate_series FROM generate_series(501, 1000);")
.await?;
session.flush().await?;

kill_and_wait_recover(&cluster).await;
Expand All @@ -107,6 +109,11 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {

let mv1_count = session.run("SELECT COUNT(v1) FROM mv1").await?;

let missing_rows = session
.run("SELECT v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)")
.await?;
tracing::debug!(missing_rows);

assert_eq!(t_count, mv1_count);

// Make sure that if MV killed and restarted
Expand Down

0 comments on commit f316af9

Please sign in to comment.