Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]: cuGraph MNMG Hangs #4037

Closed
VibhuJawa opened this issue Dec 4, 2023 · 1 comment · Fixed by #4046
Closed

[BUG]: cuGraph MNMG Hangs #4037

VibhuJawa opened this issue Dec 4, 2023 · 1 comment · Fixed by #4046
Assignees
Labels
bug Something isn't working

Comments

@VibhuJawa
Copy link
Member

VibhuJawa commented Dec 4, 2023

Version

23.12

Which installation method(s) does this occur on?

Describe the bug.

We currently hang on large clusters 8+nodes(64 workers) and the hang seems to be non deterministic.

@jnke2016 has been investigating this deeply so far and has following MREs for us.

Graph Creation

import cugraph
from cugraph.testing.mg_utils import start_dask_client, stop_dask_client
import dask_cudf
import cugraph.dask as dask_cugraph
from dask.distributed import wait, default_client
import cudf
import cugraph.dask.comms.comms as Comms

if __name__ == "__main__":
    print("starting the cluster")
    setup_objs = start_dask_client()

    client = setup_objs[0]
    worker_list = list(client.scheduler_info()['workers'].keys())
    print("the number of workers = ", len(worker_list))
    graph_file="/home/nfs/jnke/reenable-mg-tests/debug_files/reproducer_dataset.csv"
    el = cudf.read_csv(
        graph_file,
        delimiter=" ",
        header=0,
        usecols=["0_src", "0_dst", "value", "1_src", "1_dst", "edge_id", "edge_type"],
        dtype={"0_src": "int32", "0_dst": "int32", "value": "float32", "1_src": "int32", "1_dst": "int32", "edge_id": "int32", "edge_type": "int32"},
    )

    num_workers = len(Comms.get_workers())
    el = dask_cudf.from_cudf(el, npartitions=num_workers)
    print("el = ", el.head())
    
    for i in range(200):

        print("creating graph - iter = ", i)
        G = cugraph.Graph(directed=True)
        G.from_dask_cudf_edgelist(
            el,
            source=["0_src", "1_src"],
            destination=["0_dst", "1_dst"],
            edge_attr=["value", "edge_id", "edge_type"],
        )
        print("done creating graph")

call_plc_two_hop_neighbors MRE.

import cugraph
from cugraph.testing.mg_utils import start_dask_client, stop_dask_client
import dask_cudf
import cugraph.dask as dask_cugraph
import time
import gc
import cugraph.dask.comms.comms as Comms

if __name__ == "__main__":
    print("starting the cluster")
    setup_objs = start_dask_client()

    client = setup_objs[0]
    worker_list = list(client.scheduler_info()['workers'].keys())
    print("the number of workers = ", len(worker_list))

    input_data_path = "path_to_/email-Eu-core.csv"

    chunksize = dask_cugraph.get_chunksize(input_data_path)
    e_list = dask_cudf.read_csv(
        input_data_path,
        delimiter=' ',
        blocksize=chunksize,
        names=['src', 'dst', 'wgt'],
        dtype=['int32', 'int32', 'float32'],
    )

    # create graph from input data
    G = cugraph.Graph(directed=False)
    G.from_dask_cudf_edgelist(e_list, source='src', destination='dst', weight='wgt', store_transposed=False)

    start_vertices = G.nodes().compute().to_arrow().to_pylist()[:5]
    for i in range(200):
        print("iteration - ", i)
        seed = i
        pairs = G.get_two_hop_neighbors(start_vertices=start_vertices)        

    stop_dask_client(*setup_objs)
@VibhuJawa VibhuJawa added bug Something isn't working ? - Needs Triage Need team to review and classify labels Dec 4, 2023
@VibhuJawa
Copy link
Member Author

VibhuJawa commented Dec 5, 2023

Current Hypothesis:

  1. Dask scheduler assigns a plc task to GPU 0 for partition 0
  2. Dask scheduler assigns non plc task (like pre-processing) to GPU 0 for partition 1
  3. Now the PLC task for partition 1 is never reached as the non plc task of that partition 1 is blocked by GPU-0 (as that is still waiting for partition task to be submitted)
  4. This leads to a hang.

Screenshot of HANG:

image

Potential Work around:

  1. Introduce persist before any PLC task, this introduces a synchronization of non-PLC tasks . This prevents the race condition. This can introduce memory overhead as well as some slow down though.

TODO:

  1. Verify this is the problem
  2. Come up wit a better solution.

P.S: Below works for 200 iterations.

diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py
index f666900b2..0c17976ad 100644
--- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py
+++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py
@@ -354,21 +354,45 @@ class simpleDistributedGraphImpl:
         workers = _client.scheduler_info()["workers"].keys()
         ddf_keys_ls = _chunk_lst(ddf_keys, len(workers))
 
-        delayed_tasks_d = {
-            w: delayed(simpleDistributedGraphImpl._make_plc_graph)(
-                Comms.get_session_id(),
-                edata,
-                graph_props,
-                src_col_name,
-                dst_col_name,
-                store_transposed,
-                self.vertex_type,
-                self.weight_type,
-                self.edge_id_type,
-                self.edge_type_id_type,
+        sync_non_plc_tasks=True
+        if sync_non_plc_tasks:
+            persisted_keys_d = persist_dask_df_equal_parts_per_worker(
+                ddf, _client, return_type="dict"
             )
-            for w, edata in zip(workers, ddf_keys_ls)
-        }
+            del ddf
+
+            delayed_tasks_d = {
+                        w: delayed(simpleDistributedGraphImpl._make_plc_graph)(
+                            Comms.get_session_id(),
+                            edata,
+                            graph_props,
+                            src_col_name,
+                            dst_col_name,
+                            store_transposed,
+                            self.vertex_type,
+                            self.weight_type,
+                            self.edge_id_type,
+                            self.edge_type_id_type,
+                        )
+                        for w, edata in persisted_keys_d.items()
+            }
+            del persisted_keys_d
+        else:
+            delayed_tasks_d = {
+                w: delayed(simpleDistributedGraphImpl._make_plc_graph)(
+                    Comms.get_session_id(),
+                    edata,
+                    graph_props,
+                    src_col_name,
+                    dst_col_name,
+                    store_transposed,
+                    self.vertex_type,
+                    self.weight_type,
+                    self.edge_id_type,
+                    self.edge_type_id_type,
+                )
+                for w, edata in zip(workers, ddf_keys_ls)
+            }
         self._plc_graph = {
             w: _client.compute(
                 delayed_task, workers=w, allow_other_workers=False, pure=False

@BradReesWork BradReesWork removed the ? - Needs Triage Need team to review and classify label Dec 5, 2023
@rlratzel rlratzel linked a pull request Dec 6, 2023 that will close this issue
@rapids-bot rapids-bot bot closed this as completed in #4046 Dec 6, 2023
rapids-bot bot pushed a commit that referenced this issue Dec 6, 2023
This PR introduces a short term fix for #4037 . 


CC: @jnke2016 , @rlratzel

Authors:
  - Vibhu Jawa (https://github.com/VibhuJawa)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)
  - Joseph Nke (https://github.com/jnke2016)

URL: #4046
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants