Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work Stealing with multi-GPU Systems #44

Open
mtjrider opened this issue May 8, 2019 · 20 comments
Open

Work Stealing with multi-GPU Systems #44

mtjrider opened this issue May 8, 2019 · 20 comments

Comments

@mtjrider
Copy link

mtjrider commented May 8, 2019

It is often the case, when running workloads, tests, etc., that Dask will engage in intermittent work stealing. The work stealing typically results in undefined behavior as a GPU processes more than its intended volume of work.

As a work-around, we've used

export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=False
export DASK_DISTRIBUTED__SCHEDULER__BANDWIDTH=1

To trick the scheduler into disabling work-stealing, and to prevent it from shuffling data (which may cause a GPU to go out of memory).

I'm wondering how we should address this in a more general way, that does not require us (or a user) to set environment variables.

One thought I had would be to have Dask, when dealing with GPUs, simply not steal work or shuffle data unless explicitly directed by the user in the form of an API call.

@mrocklin
Copy link
Contributor

mrocklin commented May 8, 2019

I acknowledge the frustration. I'd rather invest time in solving the underlying issues rather than encoding workarounds if possible. So, what are the underlying issues?

It sounds like you're running into out-of-memory issues. This might be solvable short term by #35 ? Also as always I encourage you not to pack things too tightly.

For you short term you might also consider creating a configuration file with your desired options, and place that wherever you're running.

@mrocklin
Copy link
Contributor

mrocklin commented May 8, 2019

If you're interested in making a config file: http://docs.dask.org/en/latest/configuration.html

@mtjrider
Copy link
Author

mtjrider commented May 8, 2019

I don't think memory spilling is the issue. The packing is generally not tight. Do we have any new criterion setup for stealing work with Dask + CUDA?

From what I'm seeing, GPUs with an even split of ~30% data seem to arbitrarily steal work in a pipeline of tasks. This impacts follow-on tasks in the following way:

  1. GPU worker steals pipelined task from another worker
  2. Thief processes that data, and the other worker continues on its way
  3. Dependents of the pipelined task in (1) now fall to the thief
  4. The thief must now run operations on twice as much of the optimal load-balancing

If one of the tasks involves a concatenate, then the thief will be expected to use 120% of the its available memory to accomplish the task.

Based on my benchmarking, work stealing is actually leading to inefficiencies, and conflated benchmark times.

Memory spilling would get the above sequence to run, but not optimally. In some cases, I start a job with 8 GPUs, and only 5 of them are actually doing meaningful work (e.g.) nvidia-smi reports 5 GPUs with utilization, the other 3 have none and no memory utilized apart from process overhead for Dask.

@mtjrider
Copy link
Author

mtjrider commented May 8, 2019

Point being, I think the right thing to do would be to clarify general criteria for Dask + CUDA work stealing. As best I can see, we're just using what's here: http://distributed.dask.org/en/latest/work-stealing.html

@mrocklin
Copy link
Contributor

mrocklin commented May 8, 2019

The thief must now run operations on twice as much of the optimal load-balancing

This would only be true if you have only one task per worker. Typical workloads have 10x or 100x as many tasks as workers, so the occasional steal doesn't affect things much.

Memory spilling would get the above sequence to run, but not optimally.

Once a worker started slowing down from writing to disk then presumably other workers would take from it.

work stealing is actually leading to inefficiencies, and conflated benchmark times.

Sometimes. Sometimes it's absolutely critical to performance.

I think the right thing to do would be to clarify general criteria for Dask + CUDA work stealing

I'm not sure I see any particular difference when using CUDA code. My guess is that the issues you're running into have more to do with the particular benchmark you're running than with the fact that we're using CUDA.

@mrocklin
Copy link
Contributor

mrocklin commented May 8, 2019

Or, maybe I should ask, what about using CUDA code would make us want different stealing practices?

My guess is that the difficulties you're running into have more to do with using very few chunks than with using a GPU. I could be wrong though. I don't know GPUs as well as many others.

@mtjrider
Copy link
Author

mtjrider commented May 8, 2019

Sometimes. Sometimes it's absolutely critical to performance.

I have not encountered a situation involving Dask + GPUs where work stealing improved performance. I've made benchmark runs in Dask-cuML, Dask-XGBoost. Using Dask-cuDF. Really anything here: https://github.com/rapidsai/notebooks

Or, maybe I should ask, what about using CUDA code would make us want different stealing practices?

It's not really the CUDA code that's the issue. It's that the latency associated with moving memory to and from GPUs is far more costly than the mirrored process in CPU-based workloads. While you can amortize the cost of these transactions over many operations, this relies on the assumption that within such a workload, you'll have num_transactions << num_tasks. In reality, from my experience, even when there are a large number of tasks, Dask will simply engage in work stealing more frequently. Not only that, but work stolen is work lost by a worker.

When work is not local to the worker, you have two options:

  1. Move that data back to the original worker
  2. Compute it where it is

Dask + CUDA engages in (2). That wouldn't be a problem if work stealing weren't so prevalent. The issue is that these cases frequently lead to a scenario whereby one GPU ends up with slightly too much work (even with modest packing, and good load-balancing to start), it dies, and is forced to restart. If it can't restart, then it idles.

My guess is that the difficulties you're running into have more to do with using very few chunks than with using a GPU. I could be wrong though. I don't know GPUs as well as many others.

There are hundreds of thousands of chunks in these workloads, and half the task streams are just solid red. Can we avoid making guesses about workloads. I don't think that's constructive.

@mrocklin
Copy link
Contributor

mrocklin commented May 8, 2019

It's that the latency associated with moving memory to and from GPUs is far more costly than the mirrored process in CPU-based workloads

Right, so if we change our estimates of bandwidth then things should improve. You may be interested in dask/distributed#2658 which tries to learn bandwidth over time.

Also, I would hope that this would reverse once UCX shows up.

When work is not local to the worker, you have two options:

I'm not sure what you mean by this. You mean having to move the dependencies to the theif machine? This cost is taken into account in the work stealing process. (though apparently with errant bandwidth numbers)

The issue is that these cases frequently lead to a scenario whereby one GPU ends up with slightly too much work (even with modest packing, and good load-balancing to start), it dies, and is forced to restart

Why would it die if it has only slightly too much work? Are your chunks very large, or are you running very close to the memory limit? I wouldn't recommend either of these approaches.

@mtjrider
Copy link
Author

mtjrider commented May 8, 2019

Right, so if we change our estimates of bandwidth then things should improve. You may be interested in dask/distributed#2658 which tries to learn bandwidth over time.

Very interesting! I think this would help tremendously in the short term. The latency and effective bandwidths of data paths with GPU systems is likely contributing to suboptimal work stealing because this information isn't properly represented.

Also, I would hope that this would reverse once UCX shows up.

Yes. UCX will certainly mitigate much of what I'm seeing. I was mostly looking for an automated short-term solution that was amenable to our long term goals.

I'm not sure what you mean by this. You mean having to move the dependencies to the theif machine? This cost is taken into account in the work stealing process. (though apparently with errant bandwidth numbers)

This wasn't clear. The thief would own the new tasks because they're local to the thief, and the victim would have lost work. In many of our workloads, we have something like a concat function. This means that if a worker breaks the 50% memory utilization line, the worker will go out of memory because concat incurs a 2X memory overhead.

The recovery process in the event that this worker dies leads to another worker picking up that data, and dying. Dask isn't smart enough to say "Oh, I went OOM because concat; therefore, I will send these chunks/partitions to many different workers and avoid using more than 50% of the worker's memory". It's a cascade failure. [1]

Why would it die if it has only slightly too much work? Are your chunks very large, or are you running very close to the memory limit? I wouldn't recommend either of these approaches.

I tried to answer this in [1]. It's not that the chunks themselves are large, it's that there are often many chunks grouped together, being concatenated. This is one canonical case, but realistically speaking, Dask doesn't understand there is an intrinsic memory overhead when grabbing the tasks from a victim and placing them on a thief. A worker with 45% memory utilization looks like it can handle a few more chunks. Other functions have alternate overheads.

It sounds like I need to do some experimenting to determine what the optimal bandwidth settings are for this case, and implement them in the form of a configuration file as you originally suggested. The long term solution being a learned bandwidth metric + UCX.

Is there a way to help Dask understand the memory overhead associated with a particular function call before it permits stealing? Suppose we ask Dask to compute a single chunk, have it grab input memory and output memory, and use the ratio to guide stealing practices. Would that be feasible? Or something similar?

@mrocklin
Copy link
Contributor

mrocklin commented May 8, 2019

I was mostly looking for an automated short-term solution that was amenable to our long term goals

I think that specifying config as you are now is the most appropriate short term solution.

Long term I think that things like UCX and learning bandwidth will help.

This wasn't clear. The thief would own the new tasks because they're local to the thief, and the victim would have lost work. In many of our workloads, we have something like a concat function. This means that if a worker breaks the 50% memory utilization line, the worker will go out of memory because concat incurs a 2X memory overhead.

Having a single task that takes up half of memory sounds problematic to me. It might be worth reconsidering this approach.

For your particular workload I suspect that this is dask-xgboost. Maybe the solution proposed in that project is suboptimal for this reason. Maybe long term we should try to find another approach.

@mtjrider
Copy link
Author

mtjrider commented May 8, 2019

Having a single task that takes up half of memory sounds problematic to me. It might be worth reconsidering this approach.

It's not a single task. A worker was assigned many tasks, and before they're fed into XGBoost training (for example) they need to be concated. Perhaps I'm not following what you mean by tasks.

Suppose I staged the concat over many chunks (as many different tasks). Assume each chunk is 100 MB, and that there are 10 chunks.

  1. concat over 2 chunks: objects = 200 MB, utilization = 400 MB
  2. concat over next two chunks: objects = 100 MB + 200 MB, utilization = 600 MB
  3. concat over next two chunks: objects = 100 MB + 300 MB, utilization = 800 MB
    ...
    The utilization will eventually reach 2000 MB, or 2X of the aggregate, regardless of the number of tasks. It's currently unavoidable.

For your particular workload I suspect that this is dask-xgboost. Maybe the solution proposed in that project is suboptimal for this reason. Maybe long term we should try to find another approach.

Dask-XGBoost is one workload from the list above, but this applies to any workload that uses concatenate, or any function that incurs a transactional memory overhead. Another example would be calling a constructor out-of-place, which would be pertinent for interoperating with other libraries. Say, cuDF << >> cuML, cuML << >> FAISS, etc.

@mrocklin
Copy link
Contributor

mrocklin commented May 8, 2019

It's not a single task. A worker was assigned many tasks, and before they're fed into XGBoost training (for example) they need to be concated. Perhaps I'm not following what you mean by tasks.

There is a bit of computation that we're running that expects to be able to use a large fraction of the available memory at once. This is somewhat atypical in moral dask workloads, though I acknowledge that it's how dask-xgboost is currently designed.

Dask-XGBoost is one workload from the list above, but this applies to any workload that uses concatenate, or any function that incurs a transactional memory overhead

Yes, it's somewhat odd (from a current Dask perspective) to bring in a bunch of data at once and claim a large amount of RAM.

To be clear, I acknowledge that there is an issue here. It's not clear how to handle workloads like what dask-xgboost proposes, that want to claim a lot of RAM at once. This isn't specific to CUDA workloads I don't think. I don't have a good solution for you at the moment.

@mtjrider
Copy link
Author

mtjrider commented May 8, 2019

Yes, it's somewhat odd (from a current Dask perspective) to bring in a bunch of data at once and claim a large amount of RAM.

I think we're liable to run into many situations like this ... where Dask needs to perform some kind of hand-off to another library which will, by virtue of its design, claim large memory. The real issue I foresee is that many of these workloads will look very much like a blackbox to Dask. (e.g.) the workload passes a task off to another computational framework with a distinct communication layer that will consume memory in a manner that is immoral from Dask's perspective.

To be clear, I acknowledge that there is an issue here. It's not clear how to handle workloads like what dask-xgboost proposes, that want to claim a lot of RAM at once. This isn't specific to CUDA workloads I don't think. I don't have a good solution for you at the moment.

Agree that it's not necessarily specific to CUDA. I believe GPUs exacerbate the issue because they're memory constrained. Adding a GPU means adding a worker and increasing available memory, but if a worker fails, that leads to problems that you wouldn't otherwise have to deal with on the CPU (where nodes can be constructed with large amounts of RAM, and disk spill-over is available).

I'd love to be a part of any discussion which aims itself at addressing the aforementioned issue. Thanks for your many timely responses @mrocklin

@mrocklin
Copy link
Contributor

@ayushdg and I took a look at one of his workflows. It looks like Dask was seeing the size of a pyarrow.Table as 48 bytes, and so was making poor decisions.

I've raised an issue upstream here: https://issues.apache.org/jira/browse/ARROW-6926

In the meantime, we can add a workaround here: https://github.com/dask/dask/blob/master/dask/sizeof.py . Hopefully the examples in that file should be informative enough to add new sizeof methods for relevant Arrow objects.

@mrocklin
Copy link
Contributor

mrocklin commented Oct 21, 2019

@rjzamora do you have time to implement a sizeof method for the commonly used PyArrow types in dask/dask?

@mrocklin
Copy link
Contributor

In particular, here we care about pyarrow.Table. I took a brief look at things and couldn't immediately find a simple way of doing this (but I didn't look for too long). If we can't find an exact solution then an approximate solution, based on dtypes and length and some guesswork would also be good.

@rjzamora
Copy link
Member

@rjzamora do you have time to implement a sizeof method for the commonly used PyArrow types in dask/dask?

Yes - I can certainly look into this

@TomAugspurger
Copy link

This may be fixed now that dask/dask#5522 is in distributed master.

@github-actions
Copy link

This issue has been marked rotten due to no recent activity in the past 90d. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.

@github-actions
Copy link

This issue has been marked stale due to no recent activity in the past 30d. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be marked rotten if there is no activity in the next 60d.

@caryr35 caryr35 added this to dask-cuda Dec 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: No status
Development

No branches or pull requests

4 participants