-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tracking: online scaling in compute node #3750
Comments
Isn't this already covered by |
Exactly. However, they're only used for creating and dropping materialized views. We should reuse them to support creating and dropping parallel units. |
What should the interface be for this? I guess you are referring to having a high-level interface in meta like: // in meta/..
fragment.add_parallel_units(par_unit_ids: &[usize]) // -> create_actors, delete_actors, add outputs to upstream, stop prev outputs from upstream
fragment.remove_parallel_units(par_unit_ids: &[usize]) Currently, we need to wait until new actors are created, before we can send I guess for the time being, whenever there are new compute nodes added, we can scale up to the new parallel units. We can have a more fine-grained per-fragment control of parallel units in the future once we decide on a scaling and placement policy. Furthermore, what should be the behaviour for stateful operators? Should it always replace all of the current actors? Or should we allow existing actors to continue? In the latter case, we need an Some of the keys (for scale up) may no longer be relevant to the node. We could rely on LRU to evict these unused keys or explicitly evict them by iterating through all the keys in the application cache and evict the ones whose vnodes are in the removed_vnodes set. After resume barrier, we can make use of the new vnodes in the state table. I guess for the time being we can stop all actors and create new ones for the entire fragment. It's unproven whether its necessary to reuse existing actors. Actor startup should be fast and cache can be populated easily from block cache. |
Exactly.
Yep. This is very similar to creating materialized views. Currently, we first build the actors on the worker nodes and then issue a command to the global barrier manager. Therefore, the consistency is kept.
Yes. This will be included in the step "Utilize parallel units of newly joined compute nodes."
In the original design by @fuyufjh, we will drop all of the current actors for simplicity. However, after we've unified the state interface with
Not sure how much it will cost, while keeping the cache in the original actors can always be better. |
Agree. Actually, my initial design is to reuse the existing actor and ignore the data that is not owned by it anymore, which will not affect anything and will be evicted out soon or later in theory. But please feel free to simplify the design. |
To support scaling in our system, we decide to generally follow the design in Re-Introduce Configuration Change based on Pause Barrier. After consistent hash has been utilized in most of the critical places in our system (#3543), it's high time we start doing this!
This task can be roughly divided into several steps below.
Support creating and dropping actors in existing fragments.
The minimal scaling granularity will be decided as a fragment, in this way we can support finer-grained scaling requests and implement it with more ease: It's easy to find that complex scaling can be formed by a series of fragment scaling.
UpdateOutputs
mutation. #3752Improve parallel unit management in meta service.
Based on the step above, the compute node is able to scale the number of actors in each fragment. However, whether a fragment can scale or where the new actor will be scheduled should be determined by the parallel unit management in meta service. Currently, we use a fixed number of parallel units for each compute node and always utilize all of them for parallelized fragments, which should be tweaked to be more flexible.
Really take advantage of scaled actors with correct data distribution.
Needless to say, the ultimate goal of scaling is to harness the computing of distributed systems to fit the user-required throughput. So it's not enough that we're able to create or drop actors independently, but we also need to distribute data correctly and evenly with the new configuration. Thanks to consistent hash and unified storage accessing layer of state table, it's not that hard to implement this. However, due to Hummock's special transaction design and the concurrent checkpoint introduction, we still have to handle the consistency carefully.
Unify other configuration changes with the scaling pipeline.
We may add fail-over and actor migration support before this task is done. However, some of these configuration changes can be treated as a specific form of scaling. For example, migrating an actor from one node to another is simply a scale-in and scale-out within a single barrier, where the partitions of other actors are untouched.
Manual debugging tools.
The online scaling is intended to be invoked by the cloud manager. However, it's believed there're some edge cases for scaling, so manual tests are necessary. We may integrate these tools with
risectl
.risectl
. feat(ctl): show cluster info / parallel unit matrix in risectl #4252risectl
. #5163Improvements and fixes.
Here records some significant improvements and fixes that need to be done, some of which need should be discussed first.
Interface with cloud manager.
After online scaling is fully implemented, we can interface with the cloud manager and make the whole thing automated by some scaling policies.
The text was updated successfully, but these errors were encountered: