Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): deprecate parallel unit #17523

Merged
merged 19 commits into from
Jul 23, 2024
Merged

feat(meta): deprecate parallel unit #17523

merged 19 commits into from
Jul 23, 2024

Conversation

shanicky
Copy link
Contributor

@shanicky shanicky commented Jul 1, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This PR is a massive one, and the specific features still need to be verified. If possible, it may be split into multiple smaller PRs in the future. Due to the need to consider backwards compatibility, more testing is required. Currently, we need to ensure that it can consistently pass CI tests.

This PR made the following modifications:

  1. Removed the dependency on persistent global ParallelUnit, replacing them with dynamic temporary WorkerSlots. Previously, ParallelUnit had two semantics: Worker and Worker Parallelism. So for each Worker with parallelism of Parallelism, we generated (WorkerId, 0), (WorkerId, 1) and so on to replace it. The vnode mapping in the communication between meta and the frontend also uses worker slots, which should be simplified to WorkerMapping in the future.
  2. Removed the VnodeMapping of Fragment. Since VnodeMapping was implemented through ParallelUnitMapping, we can derive VnodeMapping from ActorBitmaps and ActorStatus, so a persistent VnodeMapping is not needed. Moreover, binding Fragments to Workers is actually quite strange.
  3. Modified the Reschedule interface. The previous syntax was {fragment_id}-[parallel_unit]+[parallel_unit], which has now been changed to {fragment_id}-[worker_id:count]+[worker_id:count] {fragment_id}:[worker_id:diff]. Reschedule is now defined by modifying the number of allocations on workers. Simulation tests have all been updated to adapt.
  4. Considering compatibility, rw_parallel_units is still retained, changed to the form of (slot_id, worker_id).
  5. The ParallelUnit in ActorStatus has not been removed because I haven't figured out how to handle backward compatibility elegantly yet. 🥵 We will only use the WorkerId field. If it's a new ActorStatus, we will fill ParallelUnit with u32::MAX as the id.
  6. The original interface for generating stable resize plans has been removed in this PR. Due to the introduction of alter syntax and auto scale, this feature is rarely used. If we were to modify it, this PR would continue to grow larger. Let's discuss this later.

Considering compatibility issues, when creating a streaming job, the maximum parallelism is still limited to the sum of worker parallelism. However, it is possible to manually alter the parallelism to a huge value.


dev=> set streaming_parallelism = 20;
SET_VARIABLE
dev=> create table t(v int);
ERROR:  Failed to run the query

Caused by these errors (recent errors listed first):
  1: gRPC request to meta service failed: The service is currently unavailable
  2: Service unavailable: Not enough parallelism to schedule, required: 20, available: 12

dev=> set streaming_parallelism = 0;
SET_VARIABLE
dev=> create table t(v int);
CREATE_TABLE
dev=> alter table t set parallelism = 100;
ALTER_TABLE
dev=> select fragment_id, count(*) from rw_actors group by fragment_id;
 fragment_id | count
-------------+-------
           3 |   100
           4 |   100
(2 rows)

You can find more specific reasons in the Comment #17523 (comment).

The following is an AI-generated summary.

Summary of Changes

This extensive PR contains a series of updates to both the Protocol Buffers definitions and Rust source code. These changes aim to streamline and simplify the management of worker nodes in our system.

Protocol Buffers

Changes to proto/common.proto and proto/meta.proto

  • The parallel_units field in the WorkerNode message has been removed; its field number is now reserved.
  • A new field, uint32 parallelism, has been introduced to the WorkerNode message, enhancing its descriptive power.
  • Redundancy reduction was performed by removing and reserving several fields in the TableFragments, MigrationPlan, and ListActorStatesResponse messages:
    • Reserved vnode_mapping in TableFragments.
    • Replaced and reserved parallel_unit_migration_plan in MigrationPlan with a new worker_slot_migration_plan field.
    • Reserved parallel_unit_id in ListActorStatesFoResponse, substituting it with a new worker_id field.
  • The Reschedule message is now more appropriately named WorkerReschedule, with changes to the map types for added_parallel_units and removed_parallel_units.
  • The RescheduleRequest message has been simplified by removing the reschedules field and including worker_reschedules.
  • Deletion of GetReschedulePlanRequest and GetReschedulePlanResponse messages alongside their corresponding RPC method in ScaleService (i.e., GetReschedulePlan) to reflect the new design.

Rust Source Code

Updates in src/batch/src/executor/join/local_lookup_join.rs

  • Reflected the Protocol Buffers changes with a switch from using worker.parallel_units.len() to worker.parallelism as usize.

Refactor in src/batch/src/worker_manager/worker_node_manager.rs

  • Redesigned the worker slot to worker node mapping system following the removal of parallel_units, utilizing a new structure based on the id and parallelism.
  • Modified the total_parallelism function to compute using the new parallelism field rather than the length of parallel_units.

Adjustments in src/common/src/hash/consistent_hash/mapping.rs

  • WorkerSlotId now uses a custom implementation of fmt for its `Debug

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See [details]
  • All checks passed in ./risedev check (or alias, ./risedev c)

@kwannoel
Copy link
Contributor

kwannoel commented Jul 2, 2024

I suppose this is part of https://www.notion.so/risingwave-labs/From-Parallel-Unit-to-Actor-Group-b0b780a332f147be8ca469162c43d3e6?

we can derive VnodeMapping from ActorBitmaps and ActorStatus

Could you elaborate on this? IIUC it was previously used to map vnodes to parallel units (worker nodes).
I suppose now it's mapped to Actor Group. And so ActorBitmaps and ActorStatus somehow can be used to provide this mapping.

Removed the dependency on persistent global Parallel Units, replacing them with dynamic temporary WorkerSlots. Previously, Parallel Units had two semantics: Worker and Worker Parallelism. So for each Worker with parallelism of Parallelism, we generated (WorkerId, 0), (WorkerId, 1) and so on to replace it. The vnode mapping in the communication between meta and the frontend also uses worker slots, which should be simplified to WorkerMapping in the future.

What is a "WorkerSlot"? And which does it represent? Worker or worker parallelism?

@shanicky
Copy link
Contributor Author

shanicky commented Jul 2, 2024

I suppose this is part of notion.so/risingwave-labs/From-Parallel-Unit-to-Actor-Group-b0b780a332f147be8ca469162c43d3e6?

we can derive VnodeMapping from ActorBitmaps and ActorStatus

Could you elaborate on this? IIUC it was previously used to map vnodes to parallel units (worker nodes). I suppose now it's mapped to Actor Group. And so ActorBitmaps and ActorStatus somehow can be used to provide this mapping.

Removed the dependency on persistent global Parallel Units, replacing them with dynamic temporary WorkerSlots. Previously, Parallel Units had two semantics: Worker and Worker Parallelism. So for each Worker with parallelism of Parallelism, we generated (WorkerId, 0), (WorkerId, 1) and so on to replace it. The vnode mapping in the communication between meta and the frontend also uses worker slots, which should be simplified to WorkerMapping in the future.

What is a "WorkerSlot"? And which does it represent? Worker or worker parallelism?

First, VnodeMapping previously used ParallelUnit as a key to map to Vnode. If combined with the Actor location (including ParallelUnitId and WorkerId) in ActorStatus, it's possible to derive ActorMapping (mapping ActorId to Vnode), and ActorMapping can be converted into a Bitmap for each Actor.

Therefore, we can also generate ActorMapping through Actor's Bitmap (handling both Single and Hash cases for Fragments), and then generate the Fragment's VnodeMapping (i.e., ParallelUnitMapping) using the ParallelUnit from ActorStatus.

WorkerSlot is an attempt at ParallelUnit. Theoretically, we should completely discard this intermediate layer, but there are too many dependencies. So at this stage, we've found a more temporary solution that can express the semantics of ParallelUnit. WorkerSlot is a temporarily generated Id, corresponding to each parallelism of each Worker. For example, for a Worker with Id 1 and 5 parallelisms, we generate five WorkerSlots: (1,0) (1,1) (1,2) (1,3) (1,4).

Previously, we mainly used ParallelUnit to align upstream and downstream of NoShuffle relationships (because they need 1-to-1 correspondence and need to be on the same Worker) through the same ParallelUnitId. This mainly appears in two scenarios:

  1. During scheduling, we use WorkerSlot to ensure this correspondence, meaning the WorkerSlots of NoShuffle's upstream and downstream are consistent.
  2. During scaling, which is the rescheduling process, the original 1-to-1 relationship is retained in the actor's dispatcher. We can rebuild based on this relationship without needing WorkerSlot alignment. For upstream and downstream in cascade no shuffle expansion, we use temporary WorkerSlots to solve it (same as scheduling).

In summary, we previously bound everything to ParallelUnit and established their bindings, which created significant limitations. For instance, an Actor couldn't run without a ParallelUnit. So we can bind these concepts through other means, allowing us to remove the dependency on ParallelUnit and downgrade it to temporary WorkerSlots that only exist during the scheduling process.

@shanicky shanicky force-pushed the peng/remove-pu-union branch from 9c57fe1 to 120fee5 Compare July 2, 2024 08:55
proto/meta.proto Show resolved Hide resolved
proto/meta.proto Show resolved Hide resolved
@xxchan
Copy link
Member

xxchan commented Jul 3, 2024

This PR is a massive one, and the specific features still need to be verified. If possible, it may be split into multiple smaller PRs in the future. Due to the need to consider backwards compatibility, more testing is required. Currently, we need to ensure that it can consistently pass CI tests.

Some thoughts on the topic of "splitting PR":

  1. We can have a "code deletion" PR first (e.g., GetReschedulePlan). I guess it's trivial to merge, and can reduce a lot of LoC of this PR.
  2. Another idea is to do the "concept mapping" (or do abstraction) refactoring first: e.g., previously we have worker.parallel_units.len(). We can first refactor that into a method worker.parallelism() (without changing the implementation). And in the new PR, we just need to change the implementation. (But I'm not sure how much similar things we can do for this PR, and how much it would help)
  3. Graphite can help a lot to work on the splitted PRs simultaneously! (specifically, automatically rebase later PRs when you make changes to prior ones)

Since I'm not an expert on this area, so they are just ideas on the general topic that immediately came to me, and they might not apply to this specific work. (Or it's just too late and troublesome to split now ...?)

proto/meta.proto Outdated Show resolved Hide resolved
Comment on lines 9 to 30
manager
.alter_table(
Table::alter()
.table(WorkerProperty::Table)
.add_column(
ColumnDef::new(WorkerProperty::Parallelism)
.integer()
.not_null(),
)
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(WorkerProperty::Table)
.drop_column(WorkerProperty::ParallelUnitIds)
.to_owned(),
)
.await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you talked about compatibility, but it seems the specific behavior here is not clearly discussed: We persisted parallel unit ids for tables, but dropped columns and added new columns here, so what will happen exactly for existing jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During upgrades, compute-node will restart and re-trigger add_worker, so the parallelism field will be repopulated. We won't use the parallel unit id of actors and the vnode mapping field of fragments. All the necessary information can be derived from existing data.

Just to be safe, I'll still manually populate the parallelism field.

However, it seems our current architecture doesn't support downgrading very well 🫠 , so it's best to backup before upgrading.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't use the parallel unit id of actors and the vnode mapping field of fragments. All the necessary information can be derived from existing data.

I'm still a little confused. e.g., IIUC we can have a table scheduled to a given worker with a fixed parallelism.

Now we dropped the persisted info (the table's parallel units), how can we recover it?

BTW, I think I commented in the wrong place. I should have commented on Fragments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will read the WorkerId from the ActorStatus field, then determine its location. Previously, we would use the ParallelUnitId from ActorStatus.

Copy link
Member

@xxchan xxchan Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I didn't notice ActorStatus is part of TableFragments, and might misunderstand how everything works.

The ParallelUnit in ActorStatus has not been removed because I haven't figured out how to handle backward compatibility elegantly yet. 🥵 We will only use the WorkerId field. If it's a new ActorStatus, we will fill ParallelUnit with u32::MAX as the id.

BTW, this might be worth writing in the proto comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be removed in the next PR. 🥵

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't delved into this PR in detail, but I'm curious about where we maintain the mapping from vnode to worker node for each fragment in the new implementation. 🤔

  • Do we persist the mapping?
  • Do we directly use worker id?
  • Is worker id reusable?

@shanicky
Copy link
Contributor Author

shanicky commented Jul 3, 2024

Haven't delved into this PR in detail, but I'm curious about where we maintain the mapping from vnode to worker node for each fragment in the new implementation. 🤔

  • Do we persist the mapping?
  • Do we directly use worker id?
  • Is worker id reusable?

We will still store the worker id of the actor (through actor status), but the mapping from worker to vnode is dynamically generated and not stored. We only store the bitmap of the actor. The main reason is that the information in the actor's bitmap is sufficient, so we can use the existing actor bitmap to reconstruct and generate the ActorMapping. Then, we use a stable (seemingly) algorithm to generate the WorkerSlotMapping, which essentially involves grouping and sorting the actors of the fragment by worker from small to large, and allocating WorkerSlotId accordingly. So I still need your help to confirm if there would be any problems with this approach. 🥹

What does "worker id reuseable" refer to? 🤔 This PR doesn't modify the handling of worker ids, so it remains the same as before.

@shanicky shanicky force-pushed the peng/remove-pu-union branch from 120fee5 to da5026f Compare July 3, 2024 09:57
Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the PR locally and browsed some parts. I have a feeling that most parts are relatively trivial, i.e., just change the data passed down. 🤣 (Well, this might indeed be the largest trickiness of large PRs: We cannot distinguish "real important" changes from others)

Perhaps you have some ideas about "where is really important change that need to be carefully reviewed", and you can comment inline to highlight them, or have any guidance about how you want the PR to be reviewed.

@shanicky
Copy link
Contributor Author

shanicky commented Jul 3, 2024

I pulled the PR locally and browsed some parts. I have a feeling that most parts are relatively trivial, i.e., just change the data passed down. 🤣 (Well, this might indeed be the largest trickiness of large PRs: We cannot distinguish "real important" changes from others)

Perhaps you have some ideas about "where is really important change that need to be carefully reviewed", and you can comment inline to highlight them, or have any guidance about how you want the PR to be reviewed.

Yes, most document modifications are limited to code changes like worker.parallel_units.len() -> worker.parallelism, the core modifications should be files in stream/stream_graph, recovery.rs, and scale.rs.

@BugenZhao
Copy link
Member

We will still store the worker id of the actor (through actor status), but the mapping from worker to vnode is dynamically generated and not stored. We only store the bitmap of the actor.

This sounds promising. 👍

What does "worker id reuseable" refer to? 🤔

For example, if a completely new set of compute nodes join and replace the original ones, do we need to rewrite the persisted worker id in the actor? Previously it's the parallel unit id that is persisted, so we only need to update the mapping from parallel unit to worker node in other places.

@shanicky
Copy link
Contributor Author

shanicky commented Jul 4, 2024

For example, if a completely new set of compute nodes join and replace the original ones, do we need to rewrite the persisted worker id in the actor? Previously it's the parallel unit id that is persisted, so we only need to update the mapping from parallel unit to worker node in other places.

If the worker changes, we will trigger scale or actor migration. In the case of scale and migration, we only need to update the WorkerId in the ActorStatus field. Previously, it was complicated because we also had to update the fragment mapping, but now it's not necessary. We only need to change the actor's location.

scaling: https://github.com/risingwavelabs/risingwave/pull/17523/files#diff-64e58f6fb6513a71def5d29b781d04f146c4f594d2678b916e003cc1c00ded73L1449-L1451
migration: https://github.com/risingwavelabs/risingwave/pull/17523/files#diff-243aa8215b544fab44806f8910e08aab06133d79de2d84e4f65ed056bf413ac5R866-R868

In fact, in the future, it may not be necessary to store the Actor's location, or even store Actor information at all. If the algorithm is stable enough, each Meta startup recovery will recalculate the correct Actor information.

@shanicky shanicky force-pushed the peng/remove-pu-union branch 4 times, most recently from 777760b to 1483ce5 Compare July 19, 2024 07:59
shanicky added 19 commits July 22, 2024 13:13
Refactor `scale_service.rs` and remove `num_traits::Signed` from `scale.rs`.

Update rescheduling logic, test module, & integration tests.
@shanicky shanicky force-pushed the peng/remove-pu-union branch from 1483ce5 to 36f3c83 Compare July 22, 2024 05:26
Copy link
Member

@yezizp2012 yezizp2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! In the past two days, @shanicky have continuously tested this PR and haven't found any issues, I feel it can be merged first. @BugenZhao WDUT?

@shanicky shanicky added this pull request to the merge queue Jul 23, 2024
@shanicky
Copy link
Contributor Author

Let's merge this PR and roll it back if there are any critical issues.

Merged via the queue into main with commit 007e802 Jul 23, 2024
34 of 35 checks passed
@shanicky shanicky deleted the peng/remove-pu-union branch July 23, 2024 06:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants