Skip to content

Commit

Permalink
Support non p2p configuration when initializing the comms (#4543)
Browse files Browse the repository at this point in the history
closes #4490

Authors:
  - Joseph Nke (https://github.com/jnke2016)
  - Ralph Liu (https://github.com/nv-rliu)
  - Chuck Hastings (https://github.com/ChuckHastings)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #4543
  • Loading branch information
jnke2016 authored Jul 31, 2024
1 parent 8f7fec9 commit c941748
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
2 changes: 0 additions & 2 deletions python/cugraph/cugraph/dask/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ def initialize(comms=None, p2p=False, prows=None, pcols=None, partition_type=1):
__default_handle = None
if comms is None:
# Initialize communicator
if not p2p:
raise Exception("Set p2p to True for running mnmg algorithms")
__instance = raftComms(comms_p2p=p2p)
__instance.init()
# Initialize subcommunicator
Expand Down
8 changes: 6 additions & 2 deletions python/cugraph/cugraph/testing/mg_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -35,6 +35,7 @@ def start_dask_client(
jit_unspill=False,
worker_class=None,
device_memory_limit=0.8,
p2p=True,
):
"""
Creates a new dask client, and possibly also a cluster, and returns them as
Expand Down Expand Up @@ -95,6 +96,9 @@ def start_dask_client(
dask_cuda.LocalCUDACluster for details. This parameter is ignored if
the env var SCHEDULER_FILE is set which implies the dask cluster has
already been created.
p2p : bool, optional (default=True)
Initialize UCX endpoints if True.
"""
dask_scheduler_file = os.environ.get("SCHEDULER_FILE")
dask_local_directory = os.getenv("DASK_LOCAL_DIRECTORY")
Expand Down Expand Up @@ -164,7 +168,7 @@ def start_dask_client(
# FIXME: use proper logging, INFO or DEBUG level
print("\nDask client/cluster created using LocalCUDACluster")

Comms.initialize(p2p=True)
Comms.initialize(p2p=p2p)

return (client, cluster)

Expand Down
17 changes: 16 additions & 1 deletion python/cugraph/cugraph/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -52,6 +52,21 @@ def dask_client():
stop_dask_client(dask_client, dask_cluster)


# FIXME: Add tests leveraging this fixture
@pytest.fixture(scope="module")
def dask_client_non_p2p():
# start_dask_client will check for the SCHEDULER_FILE and
# DASK_WORKER_DEVICES env vars and use them when creating a client if
# set. start_dask_client will also initialize the Comms singleton.
dask_client, dask_cluster = start_dask_client(
worker_class=IncreasedCloseTimeoutNanny, p2p=False
)

yield dask_client

stop_dask_client(dask_client, dask_cluster)


@pytest.fixture(scope="module")
def scratch_dir():
# This should always be set if doing MG testing, since temporary
Expand Down

0 comments on commit c941748

Please sign in to comment.