Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): partition source reader batch_size by rate_limit #13800

Merged
merged 11 commits into from
Feb 23, 2024
44 changes: 44 additions & 0 deletions e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This test will test that barrier latency does not spike
# when there's rate limit on source.
# The upstream side should backpressure the source reader,
# but still allow barriers to flow through.

statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
WITH (
connector = 'datagen',
fields.i1.start = '1',
fields.i1.end = '5',
datagen.rows.per.second = '10000'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SINK sink AS
SELECT x.i1 as i1 FROM source_table x
JOIN source_table s1 ON x.i1 = s1.i1
JOIN source_table s2 ON x.i1 = s2.i1
JOIN source_table s3 ON x.i1 = s3.i1
WITH (connector = 'blackhole');

# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink.
# Otherwise, these FLUSH will take a long time to complete, and trigger timeout.
statement ok
flush;

statement ok
flush;

statement ok
flush;

statement ok
drop sink sink;

statement ok
drop table source_table;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this test, on latest main, it will be stuck.

After this PR, this test can complete it a timely way.

1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
};
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: source.context().get_config().developer.chunk_size,
rate_limit: None,
};

let column_ids: Vec<_> = source_node
Expand Down
25 changes: 23 additions & 2 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
extract_source_struct, BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType,
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMeta,
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMessage, SourceMeta,
};

pub mod additional_columns;
Expand Down Expand Up @@ -556,6 +556,21 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
}
}

#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) {
#[for_await]
for batch in stream {
let mut batch = batch?;
let mut start = 0;
let end = batch.len();
while start < end {
let next = std::cmp::min(start + rate_limit as usize, end);
yield std::mem::take(&mut batch[start..next].as_mut()).to_vec();
start = next;
}
}
}

kwannoel marked this conversation as resolved.
Show resolved Hide resolved
#[easy_ext::ext(SourceParserIntoStreamExt)]
impl<P: ByteStreamSourceParser> P {
/// Parse a data stream of one source split into a stream of [`StreamChunk`].
Expand All @@ -568,9 +583,15 @@ impl<P: ByteStreamSourceParser> P {
///
/// A [`ChunkSourceStream`] which is a stream of parsed messages.
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream {
// Enable tracing to provide more information for parsing failures.
let source_info = self.source_ctx().source_info.clone();

// Ensure chunk size is smaller than rate limit
let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit {
Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all source connectors go through this code path? This looks nice and clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, I chose to partition here because it seems to be. cc @tabVersion is this correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all source connectors go through this code path? This looks nice and clear.

Most of them, except encode native ones, which is from datagen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, I chose to partition here because it seems to be. cc @tabVersion is this correct?

I recommend waiting for include as syntax. Things can get tricky here.
For kafka, the limit is parallelism level because we experimentally allow multiple splits run in one split reader. But when it comes to kinesis, the limit is source reader level, we just spawn multiple readers (one split per reader) and join them together.
So the real limit varies for different connectors, I don't think it is expected behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include as can carry the offset column into stream chunk, allowing the partition anywhere from the chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm but doesn't include as depend on user specifying the offset column? By default it won't be included.
Then user can't rate limit at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative is to always include offset columns, but mark them as hidden.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative is to always include offset columns, but mark them as hidden.

Yeah, we can automatically add this hidden column for source or table with connector and prune it afterwards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all source connectors go through this code path?

To be exact, Datagen & Nexmark sources do not go through this path.

} else {
data_stream
};

// The parser stream will be long-lived. We use `instrument_with` here to create
// a new span for the polling of each chunk.
into_chunk_stream(self, data_stream).instrument_with(move || {
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,15 @@ pub struct SourceCtrlOpts {
// comes from developer::stream_chunk_size in stream scenario and developer::batch_chunk_size
// in batch scenario
pub chunk_size: usize,
/// Rate limit of source
pub rate_limit: Option<u32>,
}

impl Default for SourceCtrlOpts {
fn default() -> Self {
Self {
chunk_size: MAX_CHUNK_SIZE,
rate_limit: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
);
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: source.rate_limit.map(|x| x as _),
};

let source_column_ids: Vec<_> = source_desc_builder
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {

let source_ctrl_opts = SourceCtrlOpts {
chunk_size: params.env.config().developer.chunk_size,
rate_limit: source.rate_limit.map(|x| x as _),
};

let source_column_ids: Vec<_> = source_desc_builder
Expand Down
Loading