Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): partition source reader batch_size by rate_limit #13800

Merged
merged 11 commits into from
Feb 23, 2024

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Dec 5, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Closes #13799

  • For the granularity of rate limit, currently only non-transaction sources will be able to rate limit smaller than chunk size.
  • For transaction sources (e.g. debezium), the granularity would still be chunk size, so we don't break the transaction semantics. We have to support dynamic config for transaction size for this feature.
  • The configuration of rate_limit is done on stream job initialization, it cannot be altered yet. Need a further PR for it.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@kwannoel kwannoel changed the title feat(stream): do not block barrier for rate limit feat(connector): do not block barrier for rate limit Dec 5, 2023
drop sink sink;

statement ok
drop table source_table;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this test, on latest main, it will be stuck.

After this PR, this test can complete it a timely way.

@BugenZhao
Copy link
Member

Didn't quite get that. 👀 As we always prefer the barrier arm in the source, doesn't it mean that throttling the output of the source executor is essentially throttling the data side? The only uncovered edge cases I can think of are those involving transactions. But there seems no solution as we must guarantee the atomicity of these records.

@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 6, 2023

Didn't quite get that. 👀 As we always prefer the barrier arm in the source, doesn't it mean that throttling the output of the source executor is essentially throttling the data side? The only uncovered edge cases I can think of are those involving transactions. But there seems no solution as we must guarantee the atomicity of these records.

Oh that's true. That's part of this PR. The other part is to also support rate limit on chunks which are larger than the rate limit.

For example, with a chunk size of 256, and rate limit of 100, in our current approach, rate limiting on anything less than 256 won't quite work to reduce barrier latency, because although chunks will be split into 2, they same amount of data being processed within an epoch will still be the same.

So here in this PR we also do source read message partitioning based on rate limit, so that before the streams (barrier and source read) get merged in source executor, the max cardinality will be of size rate_limit, and they won't block barrier.

This will only apply to non-transaction source messages.

@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 6, 2023

The only uncovered edge cases I can think of are those involving transactions. But there seems no solution as we must guarantee the atomicity of these records.

We can just break it IMO. Currently we also have MAX_TRANSACTION_SIZE, which will break the semantics.

So an alternative solution is to support dynamically changing the chunk size that source reader yields. IF setting rate limit on source breaks the transaction size, it can just provide a warning.

@BugenZhao
Copy link
Member

with a chunk size of 256, and rate limit of 100

Wow, I don't expect the rate limit to be set to less than a single chunk. 🤣 Things will get really tricky in this case, that is to say, the logic for rate limit will invade into several modules, which I think contradicts our initial idea of a quick and dirty workaround.

For example and if I understand correctly, the following logic in backfill should also be aware of the rate limit:

// Before processing barrier, if did not snapshot read,
// do a snapshot read first.
// This is so we don't lose the tombstone iteration progress.
if !has_snapshot_read {
let (_, snapshot) = backfill_stream.into_inner();
#[for_await]
for msg in snapshot {

@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 6, 2023

For example and if I understand correctly, the following logic in backfill should also be aware of the rate limit:

Yeah, to handle the case where we want to rate limit smaller than chunk size.

Maybe has to be done within the snapshot_read logic.

@kwannoel
Copy link
Contributor Author

kwannoel commented Dec 6, 2023

logic for rate limit will invade into several modules, which I think contradicts our initial idea of a quick and dirty workaround.

True that it will need to handle rate limit in modules, iff the rate limit is smaller than the specified chunk sizes.

Most of the design will not need to be changed, probably only:

  • snapshot read
  • source read

Changes required would be:

  • Support partitioning of the snapshot read / source read messages.
  • Support dynamically altering the partition sizes (slightly tricker for source read).

@kwannoel kwannoel changed the title feat(connector): do not block barrier for rate limit feat(connector): partition source message by rate_limit Dec 6, 2023
@kwannoel kwannoel changed the title feat(connector): partition source message by rate_limit feat(connector): partition source reader batch_size by rate_limit Dec 6, 2023
Copy link

codecov bot commented Dec 11, 2023

Codecov Report

Attention: 7 lines in your changes are missing coverage. Please review.

Comparison is base (a3c71aa) 68.10% compared to head (a0c9644) 68.12%.

Files Patch % Lines
src/connector/src/parser/mod.rs 0.00% 4 Missing ⚠️
src/batch/src/executor/source.rs 0.00% 1 Missing ⚠️
src/stream/src/from_proto/source/fs_fetch.rs 0.00% 1 Missing ⚠️
src/stream/src/from_proto/source/trad_source.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #13800      +/-   ##
==========================================
+ Coverage   68.10%   68.12%   +0.01%     
==========================================
  Files        1529     1529              
  Lines      264150   264158       +8     
==========================================
+ Hits       179892   179946      +54     
+ Misses      84258    84212      -46     
Flag Coverage Δ
rust 68.12% <12.50%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tabVersion
Copy link
Contributor

The overall design LGTM, partitioning within parser is a doable way. But as described above, there will be a more consistent way to achieve this.

Copy link
Contributor

@Rossil2012 Rossil2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As some corner cases (Datagen & Nexmark) are not covered in parser, I suggest we move the logic to exec part (SourceExecutor/FsSourceExecutor/FetchExecutor).

For example, here in SourceExecutor:

.inc_by(chunk.cardinality() as u64);
yield Message::Chunk(chunk);
self.try_flush_data().await?;

@@ -487,7 +499,11 @@ impl<P: ByteStreamSourceParser> P {
actor_id = source_info.actor_id,
source_id = source_info.source_id.table_id()
);

let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit {
Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all source connectors go through this code path?

To be exact, Datagen & Nexmark sources do not go through this path.

src/connector/src/parser/mod.rs Show resolved Hide resolved
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please take a look at #13800 (comment) before merging.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@kwannoel kwannoel force-pushed the kwannoel/test-rate-limit branch from a0c9644 to 01a0958 Compare February 23, 2024 07:09
Copy link

gitguardian bot commented Feb 23, 2024

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secret in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9425213 Triggered Generic Password a405147 ci/scripts/regress-test.sh View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Our GitHub checks need improvements? Share your feedbacks!

@kwannoel kwannoel enabled auto-merge February 23, 2024 07:10
@kwannoel kwannoel force-pushed the kwannoel/test-rate-limit branch from 01a0958 to a405147 Compare February 23, 2024 07:11
@kwannoel kwannoel added this pull request to the merge queue Feb 23, 2024
Merged via the queue into main with commit 2cec344 Feb 23, 2024
27 of 28 checks passed
@kwannoel kwannoel deleted the kwannoel/test-rate-limit branch February 23, 2024 08:06
kwannoel added a commit that referenced this pull request Feb 23, 2024
zwang28 pushed a commit that referenced this pull request Feb 25, 2024
kwannoel added a commit that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Do not block barrier when rate limiting with smaller than chunk_size
5 participants