Skip to content

Commit

Permalink
no need to split chunks in cdc snapshot read
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Apr 23, 2024
1 parent cae6488 commit 3c552b4
Showing 1 changed file with 8 additions and 13 deletions.
21 changes: 8 additions & 13 deletions src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,14 @@ impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
// May be should be refactored to a common function later.
let limiter = limiter.as_ref().unwrap();
let limit = args.rate_limit_rps.unwrap();
if cardinality <= limit as usize {
let n = NonZeroU32::new(cardinality as u32).unwrap();
// `InsufficientCapacity` should never happen because we have check the cardinality
limiter.until_n_ready(n).await.unwrap();
yield Some(chunk);
} else {
// Cut the chunk into smaller chunks
for chunk in chunk.split(limit as usize) {
let n = NonZeroU32::new(chunk.cardinality() as u32).unwrap();
limiter.until_n_ready(n).await.unwrap();
yield Some(chunk);
}
}
assert!(cardinality <= limit as usize); // because we produce chunks with limited-sized data chunk builder.

// `InsufficientCapacity` should never happen because we have check the cardinality
limiter
.until_n_ready(NonZeroU32::new(cardinality as u32).unwrap())
.await
.unwrap();
yield Some(chunk);
}

yield None;
Expand Down

0 comments on commit 3c552b4

Please sign in to comment.