Skip to content

Commit

Permalink
cherry-pick #13800
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 26, 2024
1 parent 3aeec37 commit fda7de0
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 2 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ steps:
config: ci/docker-compose.yml
environment:
- CODECOV_TOKEN
timeout_in_minutes: 22
timeout_in_minutes: 23
retry: *auto-retry

- label: "check"
Expand Down
File renamed without changes.
44 changes: 44 additions & 0 deletions e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This test will test that barrier latency does not spike
# when there's rate limit on source.
# The upstream side should backpressure the source reader,
# but still allow barriers to flow through.

statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
WITH (
connector = 'datagen',
fields.i1.start = '1',
fields.i1.end = '5',
datagen.rows.per.second = '10000'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM source_table x
JOIN source_table s1 ON x.i1 = s1.i1
JOIN source_table s2 ON x.i1 = s2.i1
JOIN source_table s3 ON x.i1 = s3.i1
WITH (connector = 'blackhole');

# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink.
# Otherwise, these FLUSH will take a long time to complete, and trigger timeout.
statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
drop sink sink;

statement ok
drop table source_table;
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
};
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: source.context().get_config().developer.chunk_size,
rate_limit: None,
};

let column_ids: Vec<_> = source_node
Expand Down
22 changes: 21 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
extract_source_struct, BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType,
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMeta,
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMessage, SourceMeta,
};

pub mod additional_columns;
Expand Down Expand Up @@ -556,6 +556,21 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
}
}

#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) {
#[for_await]
for batch in stream {
let mut batch = batch?;
let mut start = 0;
let end = batch.len();
while start < end {
let next = std::cmp::min(start + rate_limit as usize, end);
yield std::mem::take(&mut batch[start..next].as_mut()).to_vec();
start = next;
}
}
}

#[easy_ext::ext(SourceParserIntoStreamExt)]
impl<P: ByteStreamSourceParser> P {
/// Parse a data stream of one source split into a stream of [`StreamChunk`].
Expand All @@ -573,6 +588,11 @@ impl<P: ByteStreamSourceParser> P {

// The parser stream will be long-lived. We use `instrument_with` here to create
// a new span for the polling of each chunk.
let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit {
Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit))
} else {
data_stream
};
into_chunk_stream(self, data_stream).instrument_with(move || {
tracing::info_span!(
"source_parse_chunk",
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,15 @@ pub struct SourceCtrlOpts {
// comes from developer::stream_chunk_size in stream scenario and developer::batch_chunk_size
// in batch scenario
pub chunk_size: usize,
/// Rate limit of source
pub rate_limit: Option<u32>,
}

impl Default for SourceCtrlOpts {
fn default() -> Self {
Self {
chunk_size: MAX_CHUNK_SIZE,
rate_limit: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
);
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: source.rate_limit.map(|x| x as _),
};

let source_column_ids: Vec<_> = source_desc_builder
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {

let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: source.rate_limit.map(|x| x as _),
};

let source_column_ids: Vec<_> = source_desc_builder
Expand Down

0 comments on commit fda7de0

Please sign in to comment.