diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 943059355f054..2b24e57af1a0b 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -343,8 +343,17 @@ impl CdcBackfillExecutor { && *new_rate_limit != self.rate_limit_rps { self.rate_limit_rps = *new_rate_limit; - // rebuild the new reader stream with new rate limit - continue 'backfill_loop; + // staging the barrier for post processing + pending_barrier = Some(barrier); + tracing::info!( + table_id, + ?current_pk_pos, + ?snapshot_read_row_cnt, + "Break snapshot loop to apply rate limit" + ); + // break the snapshopt read loop, + // so that we can rebuild the new snapshot stream with the new rate limit + break; } } Mutation::Update(UpdateMutation {