From 840aacd3f0ee172a4a37cd6a86e87cbe17a6a74a Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 9 Aug 2024 23:05:50 +0800 Subject: [PATCH] fix --- src/connector/src/source/cdc/source/reader.rs | 2 +- .../src/executor/backfill/cdc/cdc_backfill.rs | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 135daa5e0480..b29ef1312bbd 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -161,7 +161,7 @@ impl SplitReader for CdcSplitReader { } }; if !inited { - bail!("failed to start cdc connector"); + bail!("failed to start cdc connector.\nHINT: increase `cdc_source_wait_streaming_start_timeout` session variable to a large value and retry."); } } tracing::info!(?source_id, "cdc connector started"); diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 943059355f05..2b24e57af1a0 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -343,8 +343,17 @@ impl CdcBackfillExecutor { && *new_rate_limit != self.rate_limit_rps { self.rate_limit_rps = *new_rate_limit; - // rebuild the new reader stream with new rate limit - continue 'backfill_loop; + // staging the barrier for post processing + pending_barrier = Some(barrier); + tracing::info!( + table_id, + ?current_pk_pos, + ?snapshot_read_row_cnt, + "Break snapshot loop to apply rate limit" + ); + // break the snapshopt read loop, + // so that we can rebuild the new snapshot stream with the new rate limit + break; } } Mutation::Update(UpdateMutation {