Skip to content

Commit

Permalink
fix(cdc-backfill): fix recovery of cdc table that subscribed to an em…
Browse files Browse the repository at this point in the history
…pty table (#12773)
  • Loading branch information
StrikeW authored Oct 11, 2023
1 parent d776876 commit e0059df
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions src/stream/src/executor/backfill/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
#[allow(unused_variables)]
let mut total_snapshot_processed_rows: u64 = 0;

let mut last_binlog_offset: Option<CdcOffset>;
// Read the current binlog offset as a low watermark
let mut last_binlog_offset: Option<CdcOffset> =
upstream_table_reader.current_binlog_offset().await?;

let mut consumed_binlog_offset: Option<CdcOffset> = None;

Expand All @@ -251,7 +253,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
//
// Once the backfill loop ends, we forward the upstream directly to the downstream.
if to_backfill {
last_binlog_offset = upstream_table_reader.current_binlog_offset().await?;
// drive the upstream changelog first to ensure we can receive timely changelog event,
// otherwise the upstream changelog may be blocked by the snapshot read stream
let _ = Pin::new(&mut upstream).peek().await;
Expand Down Expand Up @@ -441,18 +442,23 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}
}
} else {
} else if is_snapshot_empty {
tracing::info!(
upstream_table_id,
initial_binlog_offset = ?last_binlog_offset,
"upstream snapshot is empty, mark backfill is done and persist current binlog offset");

Self::write_backfill_state(
&mut self.source_state_handler,
upstream_table_id,
&split_id,
&mut cdc_split,
None,
last_binlog_offset,
)
.await?;
}

tracing::debug!(
tracing::info!(
actor = self.actor_id,
"CdcBackfill has already finished and forward messages directly to the downstream"
);
Expand Down Expand Up @@ -485,6 +491,11 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
cdc_split: &mut Option<SplitImpl>,
last_binlog_offset: Option<CdcOffset>,
) -> StreamExecutorResult<()> {
assert!(
last_binlog_offset.is_some(),
"last binlog offset cannot be None"
);

if let Some(split_id) = split_id.as_ref() {
let mut key = split_id.to_string();
key.push_str(BACKFILL_STATE_KEY_SUFFIX);
Expand Down

0 comments on commit e0059df

Please sign in to comment.