diff --git a/e2e_test/streaming/rate_limit.slt b/e2e_test/streaming/rate_limit/basic.slt similarity index 100% rename from e2e_test/streaming/rate_limit.slt rename to e2e_test/streaming/rate_limit/basic.slt diff --git a/e2e_test/streaming/rate_limit/upstream_amplification.slt b/e2e_test/streaming/rate_limit/upstream_amplification.slt new file mode 100644 index 0000000000000..71be801a78fc2 --- /dev/null +++ b/e2e_test/streaming/rate_limit/upstream_amplification.slt @@ -0,0 +1,44 @@ +# This test will test that barrier latency does not spike +# when there's rate limit on source. +# The upstream side should backpressure the source reader, +# but still allow barriers to flow through. + +statement ok +SET STREAMING_PARALLELISM=2; + +statement ok +SET STREAMING_RATE_LIMIT=1; + +statement ok +CREATE TABLE source_table (i1 int) +WITH ( + connector = 'datagen', + fields.i1.start = '1', + fields.i1.end = '5', + datagen.rows.per.second = '10000' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SINK sink AS + SELECT x.i1 as i1 FROM source_table x + JOIN source_table s1 ON x.i1 = s1.i1 + JOIN source_table s2 ON x.i1 = s2.i1 + JOIN source_table s3 ON x.i1 = s3.i1 + WITH (connector = 'blackhole'); + +# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink. +# Otherwise, these FLUSH will take a long time to complete, and trigger timeout. +statement ok +flush; + +statement ok +flush; + +statement ok +flush; + +statement ok +drop sink sink; + +statement ok +drop table source_table; \ No newline at end of file diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 2714d5335b906..93fc4bdc395b3 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -87,6 +87,7 @@ impl BoxedExecutorBuilder for SourceExecutor { }; let source_ctrl_opts = SourceCtrlOpts { chunk_size: source.context().get_config().developer.chunk_size, + rate_limit: None, }; let column_ids: Vec<_> = source_node diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 1c165b45660e9..2daf45be1db04 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -55,8 +55,8 @@ use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, - SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId, - StreamChunkWithState, + SourceContextRef, SourceEncode, SourceFormat, SourceMessage, SourceMeta, SourceWithStateStream, + SplitId, StreamChunkWithState, }; pub mod additional_columns; @@ -515,6 +515,21 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { } } +#[try_stream(ok = Vec, error = anyhow::Error)] +async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) { + #[for_await] + for batch in stream { + let mut batch = batch?; + let mut start = 0; + let end = batch.len(); + while start < end { + let next = std::cmp::min(start + rate_limit as usize, end); + yield std::mem::take(&mut batch[start..next].as_mut()).to_vec(); + start = next; + } + } +} + #[easy_ext::ext(SourceParserIntoStreamExt)] impl P { /// Parse a data stream of one source split into a stream of [`StreamChunk`]. @@ -534,7 +549,11 @@ impl P { actor_id = source_info.actor_id, source_id = source_info.source_id.table_id() ); - + let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit { + Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit)) + } else { + data_stream + }; into_chunk_stream(self, data_stream).instrument(span) } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index e3c309b0cb410..926f3966723ca 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -131,12 +131,15 @@ pub struct SourceCtrlOpts { // comes from developer::stream_chunk_size in stream scenario and developer::batch_chunk_size // in batch scenario pub chunk_size: usize, + /// Rate limit of source + pub rate_limit: Option, } impl Default for SourceCtrlOpts { fn default() -> Self { Self { chunk_size: MAX_CHUNK_SIZE, + rate_limit: None, } } } diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index dc6b718f566ea..77f1d6f9521d3 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -60,6 +60,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { ); let source_ctrl_opts = SourceCtrlOpts { chunk_size: params.env.config().developer.chunk_size, + rate_limit: source.rate_limit.map(|x| x as _), }; let source_column_ids: Vec<_> = source diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 0538dbf1535ee..348143fe1e3c6 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -123,6 +123,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { let source_ctrl_opts = SourceCtrlOpts { chunk_size: params.env.config().developer.chunk_size, + rate_limit: source.rate_limit.map(|x| x as _), }; let source_column_ids: Vec<_> = source_columns