Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run out-of-band function 'initialize_rmm_pool' / Event loop was unresponsive #64

Closed
vilmara opened this issue May 31, 2019 · 13 comments
Closed

Comments

@vilmara
Copy link

vilmara commented May 31, 2019

Hi all, I am trying to configure Rapids-Dask in Multi-gpu Multi-node mode. Here is what I am doing and the errors I got, could you please assist please to fix these?

IP of the primary compute node hosting the dask-scheduler: <ip_primary_node>
number of GPU's in primary compute node: 4

IP of the seconday compute node: <ip_secondary_node>
number of GPU's in secondary compute node: 4
Total GPU's: 8

Step 1: Launch the dask-scheduler on the primary compute node (which has 4 GPU's) :
dask-scheduler --port=8888 --bokeh-port 8786
output:

distributed.scheduler - INFO - Remove client Client-0616844c-83ba-11e9-8225-246e96b3e316
distributed.scheduler - INFO - Close client connection: Client-0616844c-83ba-11e9-8225-246e96b3e316
distributed.scheduler - INFO - Receive client connection: Client-9ad22140-83bd-11e9-823c-246e96b3e316
distributed.core - INFO - Starting established connection

Step 2: Launch dask-cuda-worker to start workers on the same machine as the scheduler
dask-cuda-worker tcp://<ip_primary_node>:8888
output:
..... a bunch of messages with successful connection

Step 3: Launch dask-cuda-worker on the secondary compute node (with dditional 4 GPU's):
dask-cuda-worker tcp://<ip_primary_node>:8888
output:
..... a bunch of messages with successful connection

Step 4: Run the Client Python API (jupyter notebook) on the secondary compute node (using all compute node GPUs in distributed mode)
client = Client('tcp://<ip_primary_node>:8888')
output

Client
Scheduler: tcp://<ip_primary_node>:8888
Dashboard: http://<ip_primary_node>:8786/status
Cluster
Workers: 8
Cores: 8
Memory: 67.47 GB

Errors:
Primary compute node worker:

distributed.core - INFO - Starting established connection
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_pool'
distributed.core - INFO - Event loop was unresponsive in Worker for 5.24s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Worker for 5.28s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Worker for 5.29s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Worker for 5.34s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f2aaead7510>, <function run_gpu_workflow at 0x7f2a66c06ea0>, [], (<class 'dict'>, [['quarter', 1], ['year', 2000], ['perf_file', '/home/rapids/data/perf/Performance_2000Q1.txt']])))
kwargs:    {}
Exception: FileNotFoundError()

distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f2aaead7510>, <function run_gpu_workflow at 0x7f2a8c4f66a8>, [], (<class 'dict'>, [['quarter', 2], ['year', 2000], ['perf_file', '/home/rapids/data/perf/Performance_2000Q2.txt']])))
kwargs:    {}
Exception: FileNotFoundError()

distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f2aaead7510>, <function run_gpu_workflow at 0x7f2a9c329598>, [], (<class 'dict'>, [['quarter', 3], ['year', 2000], ['perf_file', '/home/rapids/data/perf/Performance_2000Q3.txt']])))
kwargs:    {}
Exception: FileNotFoundError()

distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7f2aaead7510>, <function run_gpu_workflow at 0x7f2a8c6437b8>, [], (<class 'dict'>, [['quarter', 4], ['year', 2000], ['perf_file', '/home/rapids/data/perf/Performance_2000Q4.txt']])))
kwargs:    {}
Exception: FileNotFoundError()

distributed.worker - INFO - Run out-of-band function 'finalize_rmm'
distributed.worker - INFO - Run out-of-band function 'finalize_rmm'
distributed.worker - INFO - Run out-of-band function 'finalize_rmm'
distributed.worker - INFO - Run out-of-band function 'finalize_rmm'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_no_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_no_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_no_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_no_pool'
^Cdistributed.dask_worker - INFO - Exiting on signal 2
/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/dask_cuda_worker.py:263: UserWarning: Worker._close has moved to Worker.close
  yield [n._close(timeout=2) for n in nannies]
distributed.nanny - INFO - Closing Nanny at 'tcp://<ip_primary_node>:41254'
distributed.nanny - INFO - Closing Nanny at 'tcp://<ip_primary_node>:42505'
distributed.nanny - INFO - Closing Nanny at 'tcp://<ip_primary_node>:33813'
distributed.nanny - INFO - Closing Nanny at 'tcp://<ip_primary_node>:41822'
distributed.dask_worker - INFO - End worker
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-2, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-3, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-4, started daemon)>

Secondary compute node worker:

distributed.worker - INFO - Run out-of-band function 'initialize_rmm_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_pool'
distributed.worker - INFO - Run out-of-band function 'finalize_rmm'
distributed.worker - INFO - Run out-of-band function 'finalize_rmm'
distributed.worker - INFO - Run out-of-band function 'finalize_rmm'
distributed.worker - INFO - Run out-of-band function 'finalize_rmm'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_no_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_no_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_no_pool'
distributed.worker - INFO - Run out-of-band function 'initialize_rmm_no_pool'
@pentschev
Copy link
Member

It looks like the first worker fails while trying to access some files, such as:

Function:  execute_task
args:      ((<function apply at 0x7f2aaead7510>, <function run_gpu_workflow at 0x7f2a66c06ea0>, [], (<class 'dict'>, [['quarter', 1], ['year', 2000], ['perf_file', '/home/rapids/data/perf/Performance_2000Q1.txt']])))
kwargs:    {}
Exception: FileNotFoundError()

Could you check that the files are indeed available when you try to access them? Please note that in a distributed system like you're running, these files need to be accessible by the workers.

Also, are you explicitly enabling RMM? It's normally turned off by default, and the messages related to that are just info, it doesn't seem to be causing any problems.

Besides that, I think it's going to be difficult to find out more without a minimal sample to reproduce the issue.

@vilmara
Copy link
Author

vilmara commented May 31, 2019

hi @pentschev thanks for your prompt reply, in this case the primary worker didn't find the dataset files. In a distributed system, does the dataset need to be copied on each node or only in the node hosting the dask-scheduler?.

On another hand, how can explicitily enable RMM?

@pentschev
Copy link
Member

The data needs to be accessible somehow, one such way is to have network volumes that all workers can reach, such as NFS.

There are instructions on how to explicitly enable RMM in here, but it isn't mandatory. Maybe you have some RMM handling on your notebook and that's why you're seeing some info?

@vilmara
Copy link
Author

vilmara commented May 31, 2019

@pentschev just for now I turned off the workers on the node that doesn't have access to the data and ran the Distributed System with only 4 workers on the secondary node and got this error RMM_ERROR_OUT_OF_MEMORY (I didn't get memory error when I ran the test in the localhost only):

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
/conda/envs/rapids/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
distributed.nanny - WARNING - Worker process 518 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7ff5838f2510>, <function run_gpu_workflow at 0x7ff57013cbf8>, [], (<class 'dict'>, [['quarter', 1], ['year', 2000], ['perf_file', '/home/dell/rapids/data/perf/Performance_2000Q1.txt']])))
kwargs:    {}
Exception: RuntimeError('RMM error encountered at: /rapids/cudf/cpp/src/join/joining.cu:318: 4 RMM_ERROR_OUT_OF_MEMORY')

@pentschev
Copy link
Member

I assume localhost is your client host (as per your original post). Are you sure you sure the worker GPU you're running on now has the same or more memory than the GPU on the host you referred to as localhost?

Also, it's worth noting that dask-cuda 0.8 will have a device<->host<->disk memory spilling mechanism, so this would allow overcoming insufficient GPU memory issues for large datasets.

@vilmara
Copy link
Author

vilmara commented May 31, 2019

@pentschev both machines have the same GPU memory (16GB each gpu). Also, how can I check the dask-cuda version inside the container?

@pentschev
Copy link
Member

You can check the version with:

conda list | grep dask-cuda

But it will normally be the same version as the RAPIDS container, so if you're on the latest RAPIDS stable (0.7), dask-cuda will probably be 0.7.0.

As for the out of memory error, you could try disabling RMM, as per the link I've posted previously. It could also be that you're just on the limit of your GPU memory, and memory consumption may not be deterministically defined for all problems, thus it may fail some times and others not. If that's the case, one thing you could try doing to verify is trying to limit your problem size, perhaps just dropping some rows of your DataFrame.

@vilmara
Copy link
Author

vilmara commented Aug 7, 2019

thanks @pentschev, I have disabled RMM and increased the limit of the GPU memory with the flag --memory-limit

@vilmara vilmara closed this as completed Aug 7, 2019
@pentschev
Copy link
Member

@vilmara I'm glad this worked and I the feedback is much appreciated!

@vilmara
Copy link
Author

vilmara commented Sep 13, 2019

Hi again @pentschev. I am testing now rapids v0.9.1 and getting similar errors when using RMM. I would like to know how can I use the device<->host<->disk memory spilling mechanism you mentioned early or any other suggestion to make RMM works. I am using the NYCTaxi-E2E sample and below is the error I am getting at the training step:

Picking Training Set time ...
35.009541034698486
XGBoost Training ...
terminate called after throwing an instance of 'thrust::system::system_error'
  what():  device free failed: an illegal memory access was encountered
terminate called after throwing an instance of 'thrust::system::system_error'
  what():  device free failed: an illegal memory access was encountered
terminate called after throwing an instance of 'thrust::system::system_error'
  what():  device free failed: an illegal memory access was encountered
distributed.nanny - WARNING - Restarting worker

@pentschev
Copy link
Member

pentschev commented Sep 16, 2019

To enable spilling, you have to pass a memory limit to dask-cuda-worker via --device-memory-limit (device<->host) and --memory-limit (host<->disk). This will only allow Dask-related tasks to spill the memory (but not internals of other RAPIDS libraries, such as cuDF). You can also try enabling RMM's managed memory, details here, your code would then be something like:

# All imports here

def enable_rmm_managed_memory():
    from librmm_cffi import librmm_config as rmm_cfg
    import cudf
    cudf.rmm.finalize()
    rmm_cfg.use_managed_memory = True
    return cudf.rmm.initialize()

# Create Dask's client object

client.run(enable_rmm_managed_memory)

The advantage of the above is that CUDA will use managed memory library internals (such as Thrust), and that's also currently faster than Dask device<->host spilling. That said, I would suggest trying first to enable managed memory only, and if you still experience crashes, then enable Dask spiling to host/disk.

Edit: You have to restart RMM, in the example above, I show how to do it with cuDF, if some other CUDA library is used, that would need to be adjusted too. A fully functional example can be found here.

@vilmara
Copy link
Author

vilmara commented Sep 16, 2019

hi @pentschev, I am working with a multi node configuration, how can I make sure each worker enables manager memory?, I have implemented the code example you presented on my project, and I got these warnings:

At the dask scheduler: distributed.worker - INFO - Run out-of-band function 'start_tracker'
At each node: distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.75 GB -- Worker memory limit: 6.76 GB

I am running the system with 2 servers each with 4xV100-16GB GPU's

@pentschev
Copy link
Member

The client.run executes that on all workers, see https://distributed.dask.org/en/latest/api.html?highlight=run#distributed.Client.run.

The memory warnings seem to be on the host side. It looks like each process has only 6.76GB available, is that correct? The normal operation would divide the host's memory equally, so if you have 4 GPUs (and thus, 4 processes), that means the host has only 6.76 * 4 = 27GB, is that correct? If that is right, it seems that you have much less host memory than the GPUs combined, which may indeed be an issue overall, especially with managed memory (which will generally allocate as much memory on the host as allocated on the GPU, plus any extra memory required for spilling).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants