Skip to content

Commit

Permalink
cherry pick #13800
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 23, 2024
1 parent 02ee186 commit beab581
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 3 deletions.
File renamed without changes.
44 changes: 44 additions & 0 deletions e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This test will test that barrier latency does not spike
# when there's rate limit on source.
# The upstream side should backpressure the source reader,
# but still allow barriers to flow through.

statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
WITH (
connector = 'datagen',
fields.i1.start = '1',
fields.i1.end = '5',
datagen.rows.per.second = '10000'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM source_table x
JOIN source_table s1 ON x.i1 = s1.i1
JOIN source_table s2 ON x.i1 = s2.i1
JOIN source_table s3 ON x.i1 = s3.i1
WITH (connector = 'blackhole');

# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink.
# Otherwise, these FLUSH will take a long time to complete, and trigger timeout.
statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
drop sink sink;

statement ok
drop table source_table;
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
};
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: source.context().get_config().developer.chunk_size,
rate_limit: None,
};

let column_ids: Vec<_> = source_node
Expand Down
25 changes: 22 additions & 3 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
StreamChunkWithState,
SourceContextRef, SourceEncode, SourceFormat, SourceMessage, SourceMeta, SourceWithStateStream,
SplitId, StreamChunkWithState,
};

pub mod additional_columns;
Expand Down Expand Up @@ -515,6 +515,21 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
}
}

#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) {
#[for_await]
for batch in stream {
let mut batch = batch?;
let mut start = 0;
let end = batch.len();
while start < end {
let next = std::cmp::min(start + rate_limit as usize, end);
yield std::mem::take(&mut batch[start..next].as_mut()).to_vec();
start = next;
}
}
}

#[easy_ext::ext(SourceParserIntoStreamExt)]
impl<P: ByteStreamSourceParser> P {
/// Parse a data stream of one source split into a stream of [`StreamChunk`].
Expand All @@ -534,7 +549,11 @@ impl<P: ByteStreamSourceParser> P {
actor_id = source_info.actor_id,
source_id = source_info.source_id.table_id()
);

let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit {
Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit))
} else {
data_stream
};
into_chunk_stream(self, data_stream).instrument(span)
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,15 @@ pub struct SourceCtrlOpts {
// comes from developer::stream_chunk_size in stream scenario and developer::batch_chunk_size
// in batch scenario
pub chunk_size: usize,
/// Rate limit of source
pub rate_limit: Option<u32>,
}

impl Default for SourceCtrlOpts {
fn default() -> Self {
Self {
chunk_size: MAX_CHUNK_SIZE,
rate_limit: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
);
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: source.rate_limit.map(|x| x as _),
};

let source_column_ids: Vec<_> = source
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {

let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: source.rate_limit.map(|x| x as _),
};

let source_column_ids: Vec<_> = source_columns
Expand Down

0 comments on commit beab581

Please sign in to comment.