Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Aug 12, 2024
1 parent 00d066d commit 840aacd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
}
};
if !inited {
bail!("failed to start cdc connector");
bail!("failed to start cdc connector.\nHINT: increase `cdc_source_wait_streaming_start_timeout` session variable to a large value and retry.");
}
}
tracing::info!(?source_id, "cdc connector started");
Expand Down
13 changes: 11 additions & 2 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,17 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
&& *new_rate_limit != self.rate_limit_rps
{
self.rate_limit_rps = *new_rate_limit;
// rebuild the new reader stream with new rate limit
continue 'backfill_loop;
// staging the barrier for post processing
pending_barrier = Some(barrier);
tracing::info!(
table_id,
?current_pk_pos,
?snapshot_read_row_cnt,
"Break snapshot loop to apply rate limit"
);
// break the snapshopt read loop,
// so that we can rebuild the new snapshot stream with the new rate limit
break;
}
}
Mutation::Update(UpdateMutation {
Expand Down

0 comments on commit 840aacd

Please sign in to comment.