Skip to content

Commit

Permalink
feat(connector): partition source reader batch_size by rate_limit (#…
Browse files Browse the repository at this point in the history
…13800)

Co-authored-by: kwannoel <[email protected]>
  • Loading branch information
kwannoel and kwannoel authored Feb 23, 2024
1 parent fb6dbe2 commit 2cec344
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 2 deletions.
File renamed without changes.
44 changes: 44 additions & 0 deletions e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This test will test that barrier latency does not spike
# when there's rate limit on source.
# The upstream side should backpressure the source reader,
# but still allow barriers to flow through.

statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
WITH (
connector = 'datagen',
fields.i1.start = '1',
fields.i1.end = '5',
datagen.rows.per.second = '10000'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM source_table x
JOIN source_table s1 ON x.i1 = s1.i1
JOIN source_table s2 ON x.i1 = s2.i1
JOIN source_table s3 ON x.i1 = s3.i1
WITH (connector = 'blackhole');

# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink.
# Otherwise, these FLUSH will take a long time to complete, and trigger timeout.
statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
drop sink sink;

statement ok
drop table source_table;
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
};
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: source.context().get_config().developer.chunk_size,
rate_limit: None,
};

let column_ids: Vec<_> = source_node
Expand Down
25 changes: 23 additions & 2 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
extract_source_struct, BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType,
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMeta,
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMessage, SourceMeta,
};

pub mod additional_columns;
Expand Down Expand Up @@ -556,6 +556,21 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
}
}

#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) {
#[for_await]
for batch in stream {
let mut batch = batch?;
let mut start = 0;
let end = batch.len();
while start < end {
let next = std::cmp::min(start + rate_limit as usize, end);
yield std::mem::take(&mut batch[start..next].as_mut()).to_vec();
start = next;
}
}
}

#[easy_ext::ext(SourceParserIntoStreamExt)]
impl<P: ByteStreamSourceParser> P {
/// Parse a data stream of one source split into a stream of [`StreamChunk`].
Expand All @@ -568,9 +583,15 @@ impl<P: ByteStreamSourceParser> P {
///
/// A [`ChunkSourceStream`] which is a stream of parsed messages.
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream {
// Enable tracing to provide more information for parsing failures.
let source_info = self.source_ctx().source_info.clone();

// Ensure chunk size is smaller than rate limit
let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit {
Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit))
} else {
data_stream
};

// The parser stream will be long-lived. We use `instrument_with` here to create
// a new span for the polling of each chunk.
into_chunk_stream(self, data_stream).instrument_with(move || {
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,15 @@ pub struct SourceCtrlOpts {
// comes from developer::stream_chunk_size in stream scenario and developer::batch_chunk_size
// in batch scenario
pub chunk_size: usize,
/// Rate limit of source
pub rate_limit: Option<u32>,
}

impl Default for SourceCtrlOpts {
fn default() -> Self {
Self {
chunk_size: MAX_CHUNK_SIZE,
rate_limit: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
);
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: source.rate_limit.map(|x| x as _),
};

let source_column_ids: Vec<_> = source_desc_builder
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {

let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: source.rate_limit.map(|x| x as _),
};

let source_column_ids: Vec<_> = source_desc_builder
Expand Down

0 comments on commit 2cec344

Please sign in to comment.