-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): partition source reader batch_size by rate_limit
#13800
Conversation
drop sink sink; | ||
|
||
statement ok | ||
drop table source_table; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this test, on latest main
, it will be stuck.
After this PR, this test can complete it a timely way.
Didn't quite get that. 👀 As we always prefer the barrier arm in the source, doesn't it mean that throttling the output of the source executor is essentially throttling the data side? The only uncovered edge cases I can think of are those involving transactions. But there seems no solution as we must guarantee the atomicity of these records. |
Oh that's true. That's part of this PR. The other part is to also support rate limit on chunks which are larger than the rate limit. For example, with a chunk size of 256, and rate limit of 100, in our current approach, rate limiting on anything less than 256 won't quite work to reduce barrier latency, because although chunks will be split into 2, they same amount of data being processed within an epoch will still be the same. So here in this PR we also do source read message partitioning based on rate limit, so that before the streams (barrier and source read) get merged in source executor, the max cardinality will be of size This will only apply to non-transaction source messages. |
We can just break it IMO. Currently we also have So an alternative solution is to support dynamically changing the chunk size that source reader yields. IF setting rate limit on source breaks the transaction size, it can just provide a warning. |
Wow, I don't expect the rate limit to be set to less than a single chunk. 🤣 Things will get really tricky in this case, that is to say, the logic for rate limit will invade into several modules, which I think contradicts our initial idea of a quick and dirty workaround. For example and if I understand correctly, the following logic in backfill should also be aware of the rate limit: risingwave/src/stream/src/executor/backfill/no_shuffle_backfill.rs Lines 313 to 319 in 3fd4a57
|
Yeah, to handle the case where we want to rate limit smaller than chunk size. Maybe has to be done within the |
True that it will need to handle rate limit in modules, iff the rate limit is smaller than the specified chunk sizes. Most of the design will not need to be changed, probably only:
Changes required would be:
|
rate_limit
rate_limit
rate_limit
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #13800 +/- ##
==========================================
+ Coverage 68.10% 68.12% +0.01%
==========================================
Files 1529 1529
Lines 264150 264158 +8
==========================================
+ Hits 179892 179946 +54
+ Misses 84258 84212 -46
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
The overall design LGTM, partitioning within parser is a doable way. But as described above, there will be a more consistent way to achieve this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As some corner cases (Datagen & Nexmark) are not covered in parser, I suggest we move the logic to exec part (SourceExecutor/FsSourceExecutor/FetchExecutor).
For example, here in SourceExecutor:
risingwave/src/stream/src/executor/source/source_executor.rs
Lines 608 to 610 in a0c9644
.inc_by(chunk.cardinality() as u64); | |
yield Message::Chunk(chunk); | |
self.try_flush_data().await?; |
@@ -487,7 +499,11 @@ impl<P: ByteStreamSourceParser> P { | |||
actor_id = source_info.actor_id, | |||
source_id = source_info.source_id.table_id() | |||
); | |||
|
|||
let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit { | |||
Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do all source connectors go through this code path?
To be exact, Datagen & Nexmark sources do not go through this path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Please take a look at #13800 (comment) before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
a0c9644
to
01a0958
Compare
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9425213 | Triggered | Generic Password | a405147 | ci/scripts/regress-test.sh | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secret safely. Learn here the best practices.
- Revoke and rotate this secret.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
Our GitHub checks need improvements? Share your feedbacks!
01a0958
to
a405147
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Closes #13799
rate_limit
is done on stream job initialization, it cannot be altered yet. Need a further PR for it.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.