diff --git a/e2e_test/streaming/rate_limit.slt b/e2e_test/streaming/rate_limit/basic.slt similarity index 100% rename from e2e_test/streaming/rate_limit.slt rename to e2e_test/streaming/rate_limit/basic.slt diff --git a/e2e_test/streaming/rate_limit/upstream_amplification.slt b/e2e_test/streaming/rate_limit/upstream_amplification.slt new file mode 100644 index 000000000000..71be801a78fc --- /dev/null +++ b/e2e_test/streaming/rate_limit/upstream_amplification.slt @@ -0,0 +1,44 @@ +# This test will test that barrier latency does not spike +# when there's rate limit on source. +# The upstream side should backpressure the source reader, +# but still allow barriers to flow through. + +statement ok +SET STREAMING_PARALLELISM=2; + +statement ok +SET STREAMING_RATE_LIMIT=1; + +statement ok +CREATE TABLE source_table (i1 int) +WITH ( + connector = 'datagen', + fields.i1.start = '1', + fields.i1.end = '5', + datagen.rows.per.second = '10000' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SINK sink AS + SELECT x.i1 as i1 FROM source_table x + JOIN source_table s1 ON x.i1 = s1.i1 + JOIN source_table s2 ON x.i1 = s2.i1 + JOIN source_table s3 ON x.i1 = s3.i1 + WITH (connector = 'blackhole'); + +# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink. +# Otherwise, these FLUSH will take a long time to complete, and trigger timeout. +statement ok +flush; + +statement ok +flush; + +statement ok +flush; + +statement ok +drop sink sink; + +statement ok +drop table source_table; \ No newline at end of file diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index e67d4b26f850..fe053ff63dfc 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -87,6 +87,7 @@ impl BoxedExecutorBuilder for SourceExecutor { }; let source_ctrl_opts = SourceCtrlOpts { chunk_size: source.context().get_config().developer.chunk_size, + rate_limit: None, }; let column_ids: Vec<_> = source_node diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index c5b470db966a..952ccd9774d3 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -56,7 +56,7 @@ use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ extract_source_struct, BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, - SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMeta, + SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMessage, SourceMeta, }; pub mod additional_columns; @@ -556,6 +556,21 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { } } +#[try_stream(ok = Vec, error = anyhow::Error)] +async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) { + #[for_await] + for batch in stream { + let mut batch = batch?; + let mut start = 0; + let end = batch.len(); + while start < end { + let next = std::cmp::min(start + rate_limit as usize, end); + yield std::mem::take(&mut batch[start..next].as_mut()).to_vec(); + start = next; + } + } +} + #[easy_ext::ext(SourceParserIntoStreamExt)] impl P { /// Parse a data stream of one source split into a stream of [`StreamChunk`]. @@ -568,9 +583,15 @@ impl P { /// /// A [`ChunkSourceStream`] which is a stream of parsed messages. pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream { - // Enable tracing to provide more information for parsing failures. let source_info = self.source_ctx().source_info.clone(); + // Ensure chunk size is smaller than rate limit + let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit { + Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit)) + } else { + data_stream + }; + // The parser stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. into_chunk_stream(self, data_stream).instrument_with(move || { diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index fed8e0263aac..e26bc2dbcb40 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -133,12 +133,15 @@ pub struct SourceCtrlOpts { // comes from developer::stream_chunk_size in stream scenario and developer::batch_chunk_size // in batch scenario pub chunk_size: usize, + /// Rate limit of source + pub rate_limit: Option, } impl Default for SourceCtrlOpts { fn default() -> Self { Self { chunk_size: MAX_CHUNK_SIZE, + rate_limit: None, } } } diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index 97cf26129d8d..2cedce5a8cd0 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -61,6 +61,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { ); let source_ctrl_opts = SourceCtrlOpts { chunk_size: params.env.config().developer.chunk_size, + rate_limit: source.rate_limit.map(|x| x as _), }; let source_column_ids: Vec<_> = source_desc_builder diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 28d923ffb69c..142b4ad9e155 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -157,6 +157,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { let source_ctrl_opts = SourceCtrlOpts { chunk_size: params.env.config().developer.chunk_size, + rate_limit: source.rate_limit.map(|x| x as _), }; let source_column_ids: Vec<_> = source_desc_builder