-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): partition source reader batch_size by rate_limit
#13800
Changes from all commits
e0c938b
65f0c3b
f78bad8
0a2e767
4b74bea
3aabe64
57f496a
1720b3e
e67c99b
cb92fbb
a405147
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# This test will test that barrier latency does not spike | ||
# when there's rate limit on source. | ||
# The upstream side should backpressure the source reader, | ||
# but still allow barriers to flow through. | ||
|
||
statement ok | ||
SET STREAMING_PARALLELISM=2; | ||
|
||
statement ok | ||
SET STREAMING_RATE_LIMIT=1; | ||
|
||
statement ok | ||
CREATE TABLE source_table (i1 int) | ||
WITH ( | ||
connector = 'datagen', | ||
fields.i1.start = '1', | ||
fields.i1.end = '5', | ||
datagen.rows.per.second = '10000' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SINK sink AS | ||
SELECT x.i1 as i1 FROM source_table x | ||
JOIN source_table s1 ON x.i1 = s1.i1 | ||
JOIN source_table s2 ON x.i1 = s2.i1 | ||
JOIN source_table s3 ON x.i1 = s3.i1 | ||
WITH (connector = 'blackhole'); | ||
|
||
# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink. | ||
# Otherwise, these FLUSH will take a long time to complete, and trigger timeout. | ||
statement ok | ||
flush; | ||
|
||
statement ok | ||
flush; | ||
|
||
statement ok | ||
flush; | ||
|
||
statement ok | ||
drop sink sink; | ||
|
||
statement ok | ||
drop table source_table; | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,7 +56,7 @@ use crate::schema::schema_registry::SchemaRegistryAuth; | |
use crate::source::monitor::GLOBAL_SOURCE_METRICS; | ||
use crate::source::{ | ||
extract_source_struct, BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, | ||
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMeta, | ||
SourceContext, SourceContextRef, SourceEncode, SourceFormat, SourceMessage, SourceMeta, | ||
}; | ||
|
||
pub mod additional_columns; | ||
|
@@ -556,6 +556,21 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { | |
} | ||
} | ||
|
||
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)] | ||
async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) { | ||
#[for_await] | ||
for batch in stream { | ||
let mut batch = batch?; | ||
let mut start = 0; | ||
let end = batch.len(); | ||
while start < end { | ||
let next = std::cmp::min(start + rate_limit as usize, end); | ||
yield std::mem::take(&mut batch[start..next].as_mut()).to_vec(); | ||
start = next; | ||
} | ||
} | ||
} | ||
|
||
kwannoel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#[easy_ext::ext(SourceParserIntoStreamExt)] | ||
impl<P: ByteStreamSourceParser> P { | ||
/// Parse a data stream of one source split into a stream of [`StreamChunk`]. | ||
|
@@ -568,9 +583,15 @@ impl<P: ByteStreamSourceParser> P { | |
/// | ||
/// A [`ChunkSourceStream`] which is a stream of parsed messages. | ||
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream { | ||
// Enable tracing to provide more information for parsing failures. | ||
let source_info = self.source_ctx().source_info.clone(); | ||
|
||
// Ensure chunk size is smaller than rate limit | ||
let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit { | ||
Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do all source connectors go through this code path? This looks nice and clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so, I chose to partition here because it seems to be. cc @tabVersion is this correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Most of them, except There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I recommend waiting for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm but doesn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An alternative is to always include offset columns, but mark them as hidden. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, we can automatically add this hidden column for source or table with connector and prune it afterwards. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
To be exact, Datagen & Nexmark sources do not go through this path. |
||
} else { | ||
data_stream | ||
}; | ||
|
||
// The parser stream will be long-lived. We use `instrument_with` here to create | ||
// a new span for the polling of each chunk. | ||
into_chunk_stream(self, data_stream).instrument_with(move || { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this test, on latest
main
, it will be stuck.After this PR, this test can complete it a timely way.