Skip to content

Commit

Permalink
fix(source): rate limit fail when size of chunk ends with Update is…
Browse files Browse the repository at this point in the history
… limit + 1 (#16465)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Apr 24, 2024
1 parent 97e1bd0 commit 3181be4
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 14 deletions.
2 changes: 2 additions & 0 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ impl DataChunk {
self.columns.len()
}

// TODO(rc): shall we rename this to `visible_size`? I sometimes find this confused with `capacity`.
/// `cardinality` returns the number of visible tuples
pub fn cardinality(&self) -> usize {
self.visibility.count_ones()
}

// TODO(rc): shall we rename this to `size`?
/// `capacity` returns physical length of any chunk column
pub fn capacity(&self) -> usize {
self.visibility.len()
Expand Down
3 changes: 2 additions & 1 deletion src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::{fmt, mem};

use either::Either;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use rand::prelude::SmallRng;
use rand::{Rng, SeedableRng};
Expand All @@ -40,7 +41,7 @@ use crate::types::{DataType, DefaultOrdered, ToText};
/// but always appear in pairs to represent an update operation.
/// For example, table source, aggregation and outer join can generate updates by themselves,
/// while most of the other operators only pass through updates with best effort.
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
pub enum Op {
Insert,
Delete,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/stream_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl StreamChunkBuilder {
// should be avoided, so when the last one is `UpdateDelete`, we delay the chunk until
// `UpdateInsert` comes. This means the output chunk size may exceed the given `chunk_size`,
// and theoretically at most `chunk_size + 1` if inputs are consistent.
if self.size >= self.capacity && self.ops[self.ops.len() - 1] != Op::UpdateDelete {
if self.size >= self.capacity && !self.ops[self.ops.len() - 1].is_update_delete() {
self.take()
} else {
None
Expand Down
13 changes: 8 additions & 5 deletions src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
#[for_await]
for chunk in chunk_stream {
let chunk = chunk?;
let cardinality = chunk.cardinality();
let chunk_size = chunk.capacity();

if args.rate_limit_rps.is_none() || cardinality == 0 {
if args.rate_limit_rps.is_none() || chunk_size == 0 {
// no limit, or empty chunk
yield Some(chunk);
continue;
Expand All @@ -142,12 +142,15 @@ impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {
// Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more.
// May be should be refactored to a common function later.
let limiter = limiter.as_ref().unwrap();
let limit = args.rate_limit_rps.unwrap();
assert!(cardinality <= limit as usize); // because we produce chunks with limited-sized data chunk builder.
let limit = args.rate_limit_rps.unwrap() as usize;

// Because we produce chunks with limited-sized data chunk builder and all rows
// are `Insert`s, the chunk size should never exceed the limit.
assert!(chunk_size <= limit);

// `InsufficientCapacity` should never happen because we have check the cardinality
limiter
.until_n_ready(NonZeroU32::new(cardinality as u32).unwrap())
.until_n_ready(NonZeroU32::new(chunk_size as u32).unwrap())
.await
.unwrap();
yield Some(chunk);
Expand Down
35 changes: 28 additions & 7 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,28 +133,49 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti
)
});

fn compute_chunk_permits(chunk: &StreamChunk, limit: usize) -> usize {
let chunk_size = chunk.capacity();
let ends_with_update = if chunk_size >= 2 {
// Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`.
// If something inconsistent happens in the stream, we may not have `U+` after this `U-`.
chunk.ops()[chunk_size - 2].is_update_delete()
} else {
false
};
if chunk_size == limit + 1 && ends_with_update {
// If the chunk size exceed limit because of the last `Update` operation,
// we should minus 1 to make sure the permits consumed is within the limit (max burst).
chunk_size - 1
} else {
chunk_size
}
}

#[for_await]
for chunk in stream {
let chunk = chunk?;
let cardinality = chunk.cardinality();
let chunk_size = chunk.capacity();

if rate_limit_rps.is_none() || cardinality == 0 {
if rate_limit_rps.is_none() || chunk_size == 0 {
// no limit, or empty chunk
yield chunk;
continue;
}

let limiter = limiter.as_ref().unwrap();
let limit = rate_limit_rps.unwrap();
if cardinality <= limit as usize {
let n = NonZeroU32::new(cardinality as u32).unwrap();
let limit = rate_limit_rps.unwrap() as usize;

let required_permits = compute_chunk_permits(&chunk, limit);
if required_permits <= limit {
let n = NonZeroU32::new(required_permits as u32).unwrap();
// `InsufficientCapacity` should never happen because we have check the cardinality
limiter.until_n_ready(n).await.unwrap();
yield chunk;
} else {
// Cut the chunk into smaller chunks
for chunk in chunk.split(limit as usize) {
let n = NonZeroU32::new(chunk.cardinality() as u32).unwrap();
for chunk in chunk.split(limit) {
let n = NonZeroU32::new(compute_chunk_permits(&chunk, limit) as u32).unwrap();
// chunks split should have effective chunk size <= limit
limiter.until_n_ready(n).await.unwrap();
yield chunk;
}
Expand Down

0 comments on commit 3181be4

Please sign in to comment.