diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index 849979abc515..cdb012a3185c 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -140,11 +140,13 @@ impl DataChunk { self.columns.len() } + // TODO(rc): shall we rename this to `visible_size`? I sometimes find this confused with `capacity`. /// `cardinality` returns the number of visible tuples pub fn cardinality(&self) -> usize { self.visibility.count_ones() } + // TODO(rc): shall we rename this to `size`? /// `capacity` returns physical length of any chunk column pub fn capacity(&self) -> usize { self.visibility.len() diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 157e98a42954..b2aa6d0ae07e 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::{fmt, mem}; use either::Either; +use enum_as_inner::EnumAsInner; use itertools::Itertools; use rand::prelude::SmallRng; use rand::{Rng, SeedableRng}; @@ -40,7 +41,7 @@ use crate::types::{DataType, DefaultOrdered, ToText}; /// but always appear in pairs to represent an update operation. /// For example, table source, aggregation and outer join can generate updates by themselves, /// while most of the other operators only pass through updates with best effort. -#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)] pub enum Op { Insert, Delete, diff --git a/src/common/src/array/stream_chunk_builder.rs b/src/common/src/array/stream_chunk_builder.rs index 2d70c925d696..48d1bade0bb8 100644 --- a/src/common/src/array/stream_chunk_builder.rs +++ b/src/common/src/array/stream_chunk_builder.rs @@ -82,7 +82,7 @@ impl StreamChunkBuilder { // should be avoided, so when the last one is `UpdateDelete`, we delay the chunk until // `UpdateInsert` comes. This means the output chunk size may exceed the given `chunk_size`, // and theoretically at most `chunk_size + 1` if inputs are consistent. - if self.size >= self.capacity && self.ops[self.ops.len() - 1] != Op::UpdateDelete { + if self.size >= self.capacity && !self.ops[self.ops.len() - 1].is_update_delete() { self.take() } else { None diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 939d0a6477de..bee582d4142e 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -131,9 +131,9 @@ impl UpstreamTableRead for UpstreamTableReader { #[for_await] for chunk in chunk_stream { let chunk = chunk?; - let cardinality = chunk.cardinality(); + let chunk_size = chunk.capacity(); - if args.rate_limit_rps.is_none() || cardinality == 0 { + if args.rate_limit_rps.is_none() || chunk_size == 0 { // no limit, or empty chunk yield Some(chunk); continue; @@ -142,12 +142,15 @@ impl UpstreamTableRead for UpstreamTableReader { // Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more. // May be should be refactored to a common function later. let limiter = limiter.as_ref().unwrap(); - let limit = args.rate_limit_rps.unwrap(); - assert!(cardinality <= limit as usize); // because we produce chunks with limited-sized data chunk builder. + let limit = args.rate_limit_rps.unwrap() as usize; + + // Because we produce chunks with limited-sized data chunk builder and all rows + // are `Insert`s, the chunk size should never exceed the limit. + assert!(chunk_size <= limit); // `InsufficientCapacity` should never happen because we have check the cardinality limiter - .until_n_ready(NonZeroU32::new(cardinality as u32).unwrap()) + .until_n_ready(NonZeroU32::new(chunk_size as u32).unwrap()) .await .unwrap(); yield Some(chunk); diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index ada64725d97d..cab3527a32f2 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -133,28 +133,49 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti ) }); + fn compute_chunk_permits(chunk: &StreamChunk, limit: usize) -> usize { + let chunk_size = chunk.capacity(); + let ends_with_update = if chunk_size >= 2 { + // Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`. + // If something inconsistent happens in the stream, we may not have `U+` after this `U-`. + chunk.ops()[chunk_size - 2].is_update_delete() + } else { + false + }; + if chunk_size == limit + 1 && ends_with_update { + // If the chunk size exceed limit because of the last `Update` operation, + // we should minus 1 to make sure the permits consumed is within the limit (max burst). + chunk_size - 1 + } else { + chunk_size + } + } + #[for_await] for chunk in stream { let chunk = chunk?; - let cardinality = chunk.cardinality(); + let chunk_size = chunk.capacity(); - if rate_limit_rps.is_none() || cardinality == 0 { + if rate_limit_rps.is_none() || chunk_size == 0 { // no limit, or empty chunk yield chunk; continue; } let limiter = limiter.as_ref().unwrap(); - let limit = rate_limit_rps.unwrap(); - if cardinality <= limit as usize { - let n = NonZeroU32::new(cardinality as u32).unwrap(); + let limit = rate_limit_rps.unwrap() as usize; + + let required_permits = compute_chunk_permits(&chunk, limit); + if required_permits <= limit { + let n = NonZeroU32::new(required_permits as u32).unwrap(); // `InsufficientCapacity` should never happen because we have check the cardinality limiter.until_n_ready(n).await.unwrap(); yield chunk; } else { // Cut the chunk into smaller chunks - for chunk in chunk.split(limit as usize) { - let n = NonZeroU32::new(chunk.cardinality() as u32).unwrap(); + for chunk in chunk.split(limit) { + let n = NonZeroU32::new(compute_chunk_permits(&chunk, limit) as u32).unwrap(); + // chunks split should have effective chunk size <= limit limiter.until_n_ready(n).await.unwrap(); yield chunk; }