-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(meta): deprecate parallel unit #17523
Conversation
I suppose this is part of https://www.notion.so/risingwave-labs/From-Parallel-Unit-to-Actor-Group-b0b780a332f147be8ca469162c43d3e6?
Could you elaborate on this? IIUC it was previously used to map vnodes to parallel units (worker nodes).
What is a "WorkerSlot"? And which does it represent? Worker or worker parallelism? |
First, Therefore, we can also generate
Previously, we mainly used
In summary, we previously bound everything to |
9c57fe1
to
120fee5
Compare
Some thoughts on the topic of "splitting PR":
Since I'm not an expert on this area, so they are just ideas on the general topic that immediately came to me, and they might not apply to this specific work. (Or it's just too late and troublesome to split now ...?) |
manager | ||
.alter_table( | ||
Table::alter() | ||
.table(WorkerProperty::Table) | ||
.add_column( | ||
ColumnDef::new(WorkerProperty::Parallelism) | ||
.integer() | ||
.not_null(), | ||
) | ||
.to_owned(), | ||
) | ||
.await?; | ||
|
||
manager | ||
.alter_table( | ||
Table::alter() | ||
.table(WorkerProperty::Table) | ||
.drop_column(WorkerProperty::ParallelUnitIds) | ||
.to_owned(), | ||
) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw you talked about compatibility, but it seems the specific behavior here is not clearly discussed: We persisted parallel unit ids for tables, but dropped columns and added new columns here, so what will happen exactly for existing jobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During upgrades, compute-node will restart and re-trigger add_worker
, so the parallelism field will be repopulated. We won't use the parallel unit id of actors and the vnode mapping field of fragments. All the necessary information can be derived from existing data.
Just to be safe, I'll still manually populate the parallelism field.
However, it seems our current architecture doesn't support downgrading very well 🫠 , so it's best to backup before upgrading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't use the parallel unit id of actors and the vnode mapping field of fragments. All the necessary information can be derived from existing data.
I'm still a little confused. e.g., IIUC we can have a table scheduled to a given worker with a fixed parallelism.
Now we dropped the persisted info (the table's parallel units), how can we recover it?
BTW, I think I commented in the wrong place. I should have commented on Fragments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will read the WorkerId from the ActorStatus field, then determine its location. Previously, we would use the ParallelUnitId from ActorStatus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I didn't notice ActorStatus
is part of TableFragments
, and might misunderstand how everything works.
The ParallelUnit in ActorStatus has not been removed because I haven't figured out how to handle backward compatibility elegantly yet. 🥵 We will only use the WorkerId field. If it's a new ActorStatus, we will fill ParallelUnit with u32::MAX as the id.
BTW, this might be worth writing in the proto comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be removed in the next PR. 🥵
src/frontend/src/catalog/system_catalog/rw_catalog/rw_parallel_units.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't delved into this PR in detail, but I'm curious about where we maintain the mapping from vnode to worker node for each fragment in the new implementation. 🤔
- Do we persist the mapping?
- Do we directly use worker id?
- Is worker id reusable?
We will still store the worker id of the actor (through actor status), but the mapping from worker to vnode is dynamically generated and not stored. We only store the bitmap of the actor. The main reason is that the information in the actor's bitmap is sufficient, so we can use the existing actor bitmap to reconstruct and generate the What does "worker id reuseable" refer to? 🤔 This PR doesn't modify the handling of worker ids, so it remains the same as before. |
120fee5
to
da5026f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pulled the PR locally and browsed some parts. I have a feeling that most parts are relatively trivial, i.e., just change the data passed down. 🤣 (Well, this might indeed be the largest trickiness of large PRs: We cannot distinguish "real important" changes from others)
Perhaps you have some ideas about "where is really important change that need to be carefully reviewed", and you can comment inline to highlight them, or have any guidance about how you want the PR to be reviewed.
Yes, most document modifications are limited to code changes like |
This sounds promising. 👍
For example, if a completely new set of compute nodes join and replace the original ones, do we need to rewrite the persisted worker id in the actor? Previously it's the parallel unit id that is persisted, so we only need to update the mapping from parallel unit to worker node in other places. |
If the worker changes, we will trigger scale or actor migration. In the case of scale and migration, we only need to update the scaling: https://github.com/risingwavelabs/risingwave/pull/17523/files#diff-64e58f6fb6513a71def5d29b781d04f146c4f594d2678b916e003cc1c00ded73L1449-L1451 In fact, in the future, it may not be necessary to store the Actor's location, or even store Actor information at all. If the algorithm is stable enough, each Meta startup recovery will recalculate the correct Actor information. |
c06d4e1
to
41117dd
Compare
777760b
to
1483ce5
Compare
…sm fields & simplify logic
Refactor `scale_service.rs` and remove `num_traits::Signed` from `scale.rs`. Update rescheduling logic, test module, & integration tests.
… dynamically based on need
1483ce5
to
36f3c83
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! In the past two days, @shanicky have continuously tested this PR and haven't found any issues, I feel it can be merged first. @BugenZhao WDUT?
Let's merge this PR and roll it back if there are any critical issues. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This PR is a massive one, and the specific features still need to be verified. If possible, it may be split into multiple smaller PRs in the future. Due to the need to consider backwards compatibility, more testing is required. Currently, we need to ensure that it can consistently pass CI tests.
This PR made the following modifications:
ParallelUnit
, replacing them with dynamic temporaryWorkerSlots
. Previously,ParallelUnit
had two semantics: Worker and Worker Parallelism. So for each Worker with parallelism of Parallelism, we generated (WorkerId, 0), (WorkerId, 1) and so on to replace it. The vnode mapping in the communication between meta and the frontend also uses worker slots, which should be simplified to WorkerMapping in the future.VnodeMapping
ofFragment
. SinceVnodeMapping
was implemented throughParallelUnitMapping
, we can deriveVnodeMapping
fromActorBitmaps
andActorStatus
, so a persistentVnodeMapping
is not needed. Moreover, binding Fragments to Workers is actually quite strange.{fragment_id}-[parallel_unit]+[parallel_unit]
, which has now been changed to{fragment_id}-[worker_id:count]+[worker_id:count]
{fragment_id}:[worker_id:diff]
. Reschedule is now defined by modifying the number of allocations on workers. Simulation tests have all been updated to adapt.Considering compatibility,rw_parallel_units
is still retained, changed to the form of(slot_id, worker_id)
.Considering compatibility issues, when creating a streaming job, the maximum parallelism is still limited to the sum of worker parallelism. However, it is possible to manually alter the parallelism to a huge value.
You can find more specific reasons in the Comment #17523 (comment).
The following is an AI-generated summary.
Summary of Changes
This extensive PR contains a series of updates to both the Protocol Buffers definitions and Rust source code. These changes aim to streamline and simplify the management of worker nodes in our system.
Protocol Buffers
Changes to
proto/common.proto
andproto/meta.proto
parallel_units
field in theWorkerNode
message has been removed; its field number is now reserved.uint32 parallelism
, has been introduced to theWorkerNode
message, enhancing its descriptive power.TableFragments
,MigrationPlan
, andListActorStatesResponse
messages:vnode_mapping
inTableFragments
.parallel_unit_migration_plan
inMigrationPlan
with a newworker_slot_migration_plan
field.parallel_unit_id
inListActorStatesFoResponse
, substituting it with a newworker_id
field.Reschedule
message is now more appropriately namedWorkerReschedule
, with changes to the map types foradded_parallel_units
andremoved_parallel_units
.RescheduleRequest
message has been simplified by removing thereschedules
field and includingworker_reschedules
.GetReschedulePlanRequest
andGetReschedulePlanResponse
messages alongside their corresponding RPC method inScaleService
(i.e.,GetReschedulePlan
) to reflect the new design.Rust Source Code
Updates in
src/batch/src/executor/join/local_lookup_join.rs
worker.parallel_units.len()
toworker.parallelism as usize
.Refactor in
src/batch/src/worker_manager/worker_node_manager.rs
parallel_units
, utilizing a new structure based on theid
andparallelism
.total_parallelism
function to compute using the newparallelism
field rather than the length ofparallel_units
.Adjustments in
src/common/src/hash/consistent_hash/mapping.rs
WorkerSlotId
now uses a custom implementation offmt
for its `DebugChecklist
./risedev check
(or alias,./risedev c
)