You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We are currently unable to have multiple workers access a cuGraph Graph in parallel, such as in a distributed training workflow with PyTorch, because cuGraph comms (really RAFT comms) can't be initialized. This was previously working, but was found to no longer be working when debugging a different issue in a cuGraph example training workflow.
We want to support a broad variety of workflows (one of the main goals of upcoming release 24.04) so resolving this issue is critical. If possible, this should be resolved in release 24.02 so that the fix makes it into the 24.03 DLFW container, which is going to add a new round of cuGraph example workflows.
Reproducer:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import os
os.environ['RAPIDS_NO_INITIALIZE'] = '1'
os.environ['CUDF_SPILL'] = '1'
def start_dask_cluster():
from cugraph.testing.mg_utils import enable_spilling
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(
protocol="tcp",
rmm_pool_size=None,
memory_limit=None,
)
from dask.distributed import Client
client = Client(cluster)
client.wait_for_workers(n_workers=len(cluster.workers))
client.run(enable_spilling)
print("Dask Cluster Setup Complete")
return client, cluster
def init_pytorch_worker(rank, world_size):
import cupy
cupy.cuda.Device(rank).use()
from rmm.allocators.cupy import rmm_cupy_allocator
cupy.cuda.set_allocator(rmm_cupy_allocator)
from cugraph.testing.mg_utils import enable_spilling
enable_spilling()
torch.cuda.set_device(rank)
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group('nccl', rank=rank, world_size=world_size)
def main(rank, world_size, scheduler_address):
init_pytorch_worker(rank, world_size)
from dask.distributed import Client
client = Client(address=scheduler_address)
print(f'rank {rank} successfully created a dask client: {str(client)}', flush=True)
import cudf
import dask_cudf
df = cudf.Series([1, 3, 5, 6, 7])
ddf = dask_cudf.from_cudf(df, npartitions=3)
assert ddf.compute().values_host.tolist() == df.values_host.tolist()
print(f'dask-cudf test successful on rank {rank}')
from dask.distributed import Lock
lock = Lock('comms_init')
dist.barrier()
if lock.acquire(timeout=100):
try:
from cugraph.dask.comms import comms as Comms
Comms.initialize(p2p=True)
print(f'cugraph comms initialized on rank {rank}')
finally:
lock.release()
else:
raise TimeoutError("Failed to acquire lock to initialize comms")
if __name__ == '__main__':
client, cluster = start_dask_cluster()
print(cluster.scheduler_address)
world_size = torch.cuda.device_count()
mp.spawn(
main,
args=(world_size, cluster.scheduler_address,),
nprocs=world_size,
join=True,
)
client.close()
cluster.close()
Output:
Dask Cluster Setup Complete
tcp://127.0.0.1:33305
rank 1 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
rank 0 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
rank 3 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
rank 4 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
rank 2 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
dask-cudf test successful on rank 2
dask-cudf test successful on rank 3
dask-cudf test successful on rank 1
dask-cudf test successful on rank 0
dask-cudf test successful on rank 4
2024-01-11 12:15:59,792 - distributed.worker - WARNING - Run Failed
Function: _func_init_all
args: (b"\x91\xae\xda\xeb\xfe\xb1A}\x83'\xbdf\xf0\xecm7", b'\xf3\xdd?\xfe(\xb3j|\x02\x00\xcb\xbd\n!\xe1\xa9\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x18\x81$B\x7f\x00\x00 f\xc1\x16B\x7f\x00\x00\xb0\xd2\xf9\xedB\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00P\xba\xec\xa5WU\x00\x00\x00\x00\x00\x00V:\xbf]`\xbe\xda\xa5WU\x00\x00@\x18\x81$B\x7f\x00\x00\x00\x90\x915\xdb\x99\xee\x84\xc0\xf5C$B\x7f\x00\x00\xb0\xd2\xf9\xedB\x7f\x00', True, {'tcp://127.0.0.1:32793': {'rank': 4, 'port': 52000}, 'tcp://127.0.0.1:35165': {'rank': 1, 'port': 47041}, 'tcp://127.0.0.1:35203': {'rank': 0, 'port': 45080}, 'tcp://127.0.0.1:38909': {'rank': 3, 'port': 42314}, 'tcp://127.0.0.1:41665': {'rank': 2, 'port': 51892}}, False, 0)
kwargs: {'dask_worker': <Worker 'tcp://127.0.0.1:32793', name: 4, status: running, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>}
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/worker.py", line 3182, in run
result = await function(*args, **kwargs)
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 463, in _func_init_all
await _func_ucp_create_endpoints(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 632, in _func_ucp_create_endpoints
ep = await get_ucx(dask_worker=dask_worker).get_endpoint(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 69, in get_endpoint
ep = await self._create_endpoint(ip, port)
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 60, in _create_endpoint
ep = await ucp.create_endpoint(ip, port)
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 1016, in create_endpoint
return await _get_ctx().create_endpoint(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 328, in create_endpoint
peer_info = await exchange_peer_info(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 60, in exchange_peer_info
await asyncio.wait_for(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
2024-01-11 12:15:59,791 - distributed.worker - WARNING - Run Failed
Function: _func_init_all
args: (b"\x91\xae\xda\xeb\xfe\xb1A}\x83'\xbdf\xf0\xecm7", b'\xf3\xdd?\xfe(\xb3j|\x02\x00\xcb\xbd\n!\xe1\xa9\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x18\x81$B\x7f\x00\x00 f\xc1\x16B\x7f\x00\x00\xb0\xd2\xf9\xedB\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00P\xba\xec\xa5WU\x00\x00\x00\x00\x00\x00V:\xbf]`\xbe\xda\xa5WU\x00\x00@\x18\x81$B\x7f\x00\x00\x00\x90\x915\xdb\x99\xee\x84\xc0\xf5C$B\x7f\x00\x00\xb0\xd2\xf9\xedB\x7f\x00', True, {'tcp://127.0.0.1:32793': {'rank': 4, 'port': 52000}, 'tcp://127.0.0.1:35165': {'rank': 1, 'port': 47041}, 'tcp://127.0.0.1:35203': {'rank': 0, 'port': 45080}, 'tcp://127.0.0.1:38909': {'rank': 3, 'port': 42314}, 'tcp://127.0.0.1:41665': {'rank': 2, 'port': 51892}}, False, 0)
kwargs: {'dask_worker': <Worker 'tcp://127.0.0.1:35165', name: 1, status: running, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>}
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/worker.py", line 3182, in run
result = await function(*args, **kwargs)
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 463, in _func_init_all
await _func_ucp_create_endpoints(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 632, in _func_ucp_create_endpoints
ep = await get_ucx(dask_worker=dask_worker).get_endpoint(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 69, in get_endpoint
ep = await self._create_endpoint(ip, port)
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 60, in _create_endpoint
ep = await ucp.create_endpoint(ip, port)
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 1016, in create_endpoint
return await _get_ctx().create_endpoint(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 328, in create_endpoint
peer_info = await exchange_peer_info(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 60, in exchange_peer_info
await asyncio.wait_for(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Task exception was never retrieved
future: <Task finished name='Task-3103' coro=<_listener_handler_coroutine() done, defined at /home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py:140> exception=TimeoutError()>
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 155, in _listener_handler_coroutine
peer_info = await exchange_peer_info(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 51, in exchange_peer_info
await asyncio.wait_for(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Traceback (most recent call last):
File "/home/nfs/abarghi/timeout_repro.py", line 78, in <module>
mp.spawn(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 239, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 197, in start_processes
while not context.join():
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 160, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:
-- Process 1 terminated with the following error:
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
fn(i, *args)
File "/home/nfs/abarghi/timeout_repro.py", line 66, in main
Comms.initialize(p2p=True)
File "/home/nfs/abarghi/cugraph6/python/cugraph/cugraph/dask/comms/comms.py", line 152, in initialize
__instance.init()
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 201, in init
self.client.run(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/client.py", line 2998, in run
return self.sync(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
return sync(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/utils.py", line 434, in sync
raise error
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/utils.py", line 408, in f
result = yield future
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
value = future.result()
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/client.py", line 2903, in _run
raise exc
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 463, in _func_init_all
await _func_ucp_create_endpoints(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 632, in _func_ucp_create_endpoints
ep = await get_ucx(dask_worker=dask_worker).get_endpoint(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 69, in get_endpoint
ep = await self._create_endpoint(ip, port)
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 60, in _create_endpoint
ep = await ucp.create_endpoint(ip, port)
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 1016, in create_endpoint
return await _get_ctx().create_endpoint(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 328, in create_endpoint
peer_info = await exchange_peer_info(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 60, in exchange_peer_info
await asyncio.wait_for(
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
2024-01-11 12:16:06,160 - distributed.nanny - WARNING - Worker process still alive after 3.199997253417969 seconds, killing
2024-01-11 12:16:06,160 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2024-01-11 12:16:06,161 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2024-01-11 12:16:06,161 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2024-01-11 12:16:06,161 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2024-01-11 12:16:06,497 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:32793 failed: CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:33174 remote=tcp://127.0.0.1:32793>: Stream is closed
2024-01-11 12:16:06,533 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:35165 failed: CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:60924 remote=tcp://127.0.0.1:35165>: Stream is closed
2024-01-11 12:16:06,960 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f4215483010>>, <Task finished name='Task-12445' coro=<SpecCluster._correct_state_internal() done, defined at /home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/deploy/spec.py:346> exception=TimeoutError()>)
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/tornado/ioloop.py", line 738, in _run_callback
ret = callback()
File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/tornado/ioloop.py", line 762, in _discard_future_result
future.result()
asyncio.exceptions.TimeoutError
2024-01-11 12:16:06,998 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 443571 exit status was already read will report exitcode 255
2024-01-11 12:16:07,070 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 443558 exit status was already read will report exitcode 255
The text was updated successfully, but these errors were encountered:
We are currently unable to have multiple workers access a cuGraph Graph in parallel, such as in a distributed training workflow with PyTorch, because cuGraph comms (really RAFT comms) can't be initialized. This was previously working, but was found to no longer be working when debugging a different issue in a cuGraph example training workflow.
We want to support a broad variety of workflows (one of the main goals of upcoming release 24.04) so resolving this issue is critical. If possible, this should be resolved in release 24.02 so that the fix makes it into the 24.03 DLFW container, which is going to add a new round of cuGraph example workflows.
Reproducer:
Output:
The text was updated successfully, but these errors were encountered: