Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make Nats jetstream Source work in parallel #19529

Merged
merged 30 commits into from
Dec 3, 2024

Conversation

tabVersion
Copy link
Contributor

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

tracked by #18876

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@tabVersion

This comment was marked as outdated.

@tabVersion
Copy link
Contributor Author

perf test using EC2

setup:

  • Nats JS on EC2_1 (5,000,000 messages, file storage, GP3)
  • RisingWave on EC2_2

test against main (b4bca57)

  • ack all
create table t ( a int ) with ( connector = 'nats', server_url='172.31.46.172:4222', subject='testsubject', connect_mode='plain', consumer.durable_name = 'main_all_3', consumer.ack_policy = 'all', stream='teststream', consumer.max_ack_pending = '1000000') format plain encode json;
image
  • ack explicit
create table t ( a int ) with ( connector = 'nats', server_url='172.31.46.172:4222', subject='testsubject', connect_mode='plain', consumer.durable_name = 'main_explicit_4', consumer.ack_policy = 'explicit', stream='teststream', consumer.max_ack_pending = '1000000') format plain encode json;
image
  • ack none
create table t ( a int ) with ( connector = 'nats', server_url='172.31.46.172:4222', subject='testsubject', connect_mode='plain', consumer.durable_name = 'main_none_4', consumer.ack_policy = 'none', stream='teststream', consumer.max_ack_pending = '1000000') format plain encode json;
image
  • 4 parallelism + ack all
set streaming_parallelism to 4;
create table t ( a int ) with ( connector = 'nats', server_url='172.31.46.172:4222', subject='testsubject', connect_mode='plain', consumer.durable_name = 'branch_all_3', consumer.ack_policy = 'all', stream='teststream', consumer.max_ack_pending = '1000000') format plain encode json;
image
  • 4 parallelism + ack explicit
set streaming_parallelism to 4;
create table t ( a int ) with ( connector = 'nats', server_url='172.31.46.172:4222', subject='testsubject', connect_mode='plain', consumer.durable_name = 'branch_explicit_4', consumer.ack_policy = 'explicit', stream='teststream', consumer.max_ack_pending = '1000000') format plain encode json;
image
  • 4 parallelism + ack none
set streaming_parallelism to 4;
create table t ( a int ) with ( connector = 'nats', server_url='172.31.46.172:4222', subject='testsubject', connect_mode='plain', consumer.durable_name = 'branch_none_3', consumer.ack_policy = 'none', stream='teststream', consumer.max_ack_pending = '1000000') format plain encode json;
image

@tabVersion tabVersion marked this pull request as ready for review November 27, 2024 08:40
@tabVersion tabVersion requested review from hzxa21 and fuyufjh November 27, 2024 08:40
@@ -520,13 +535,17 @@ impl<T: SplitMetaData + Clone> Ord for ActorSplitsAssignment<T> {
#[derive(Debug)]
struct SplitDiffOptions {
enable_scale_in: bool,

/// For most connectors, this should be false. When enabled, RisingWave will not track any progress.
enable_adaptive: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert that sources with adaptive enabled must also have scale-in enabled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming enable_scale_in to enable_drop_splits?

src/meta/src/stream/source_manager.rs Outdated Show resolved Hide resolved
src/connector/src/source/util.rs Outdated Show resolved Hide resolved
@graphite-app graphite-app bot requested a review from a team December 3, 2024 05:35
@tabVersion tabVersion added this pull request to the merge queue Dec 3, 2024
Merged via the queue into main with commit 84566dc Dec 3, 2024
31 of 32 checks passed
@tabVersion tabVersion deleted the tab/adaptive-parallelism-nats branch December 3, 2024 06:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants