diff --git a/src/stream/src/executor/backfill/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc_backfill.rs index 2f522ae8eeb0c..b383f2f5bc790 100644 --- a/src/stream/src/executor/backfill/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc_backfill.rs @@ -227,7 +227,9 @@ impl CdcBackfillExecutor { #[allow(unused_variables)] let mut total_snapshot_processed_rows: u64 = 0; - let mut last_binlog_offset: Option; + // Read the current binlog offset as a low watermark + let mut last_binlog_offset: Option = + upstream_table_reader.current_binlog_offset().await?; let mut consumed_binlog_offset: Option = None; @@ -251,7 +253,6 @@ impl CdcBackfillExecutor { // // Once the backfill loop ends, we forward the upstream directly to the downstream. if to_backfill { - last_binlog_offset = upstream_table_reader.current_binlog_offset().await?; // drive the upstream changelog first to ensure we can receive timely changelog event, // otherwise the upstream changelog may be blocked by the snapshot read stream let _ = Pin::new(&mut upstream).peek().await; @@ -441,18 +442,23 @@ impl CdcBackfillExecutor { } } } - } else { + } else if is_snapshot_empty { + tracing::info!( + upstream_table_id, + initial_binlog_offset = ?last_binlog_offset, + "upstream snapshot is empty, mark backfill is done and persist current binlog offset"); + Self::write_backfill_state( &mut self.source_state_handler, upstream_table_id, &split_id, &mut cdc_split, - None, + last_binlog_offset, ) .await?; } - tracing::debug!( + tracing::info!( actor = self.actor_id, "CdcBackfill has already finished and forward messages directly to the downstream" ); @@ -485,6 +491,11 @@ impl CdcBackfillExecutor { cdc_split: &mut Option, last_binlog_offset: Option, ) -> StreamExecutorResult<()> { + assert!( + last_binlog_offset.is_some(), + "last binlog offset cannot be None" + ); + if let Some(split_id) = split_id.as_ref() { let mut key = split_id.to_string(); key.push_str(BACKFILL_STATE_KEY_SUFFIX);