diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index edeeface4c4..14dc5d84f90 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -27,11 +27,12 @@ import cugraph import warnings -from cugraph.utilities.utils import import_optional, MissingModule +import dask.array as dar +import dask.dataframe as dd +import dask.distributed as distributed +import dask_cudf -dd = import_optional("dask.dataframe") -distributed = import_optional("dask.distributed") -dask_cudf = import_optional("dask_cudf") +from cugraph.utilities.utils import import_optional, MissingModule torch = import_optional("torch") torch_geometric = import_optional("torch_geometric") @@ -367,6 +368,13 @@ def __infer_offsets( } ) + def __dask_array_from_numpy(self, array: np.ndarray, npartitions: int): + return dar.from_array( + array, + meta=np.array([], dtype=array.dtype), + chunks=max(1, len(array) // npartitions), + ) + def __construct_graph( self, edge_info: Dict[Tuple[str, str, str], List[TensorType]], @@ -464,22 +472,32 @@ def __construct_graph( ] ) - df = pandas.DataFrame( - { - "src": pandas.Series(na_dst) - if order == "CSC" - else pandas.Series(na_src), - "dst": pandas.Series(na_src) - if order == "CSC" - else pandas.Series(na_dst), - "etp": pandas.Series(na_etp), - } - ) - vertex_dtype = df.src.dtype + vertex_dtype = na_src.dtype if multi_gpu: - nworkers = len(distributed.get_client().scheduler_info()["workers"]) - df = dd.from_pandas(df, npartitions=nworkers if len(df) > 32 else 1) + client = distributed.get_client() + nworkers = len(client.scheduler_info()["workers"]) + npartitions = nworkers * 4 + + src_dar = self.__dask_array_from_numpy(na_src, npartitions) + del na_src + + dst_dar = self.__dask_array_from_numpy(na_dst, npartitions) + del na_dst + + etp_dar = self.__dask_array_from_numpy(na_etp, npartitions) + del na_etp + + df = dd.from_dask_array(etp_dar, columns=["etp"]) + df["src"] = dst_dar if order == "CSC" else src_dar + df["dst"] = src_dar if order == "CSC" else dst_dar + + del src_dar + del dst_dar + del etp_dar + + if df.etp.dtype != "int32": + raise ValueError("Edge type must be int32!") # Ensure the dataframe is constructed on each partition # instead of adding additional synchronization head from potential @@ -487,9 +505,9 @@ def __construct_graph( def get_empty_df(): return cudf.DataFrame( { + "etp": cudf.Series([], dtype="int32"), "src": cudf.Series([], dtype=vertex_dtype), "dst": cudf.Series([], dtype=vertex_dtype), - "etp": cudf.Series([], dtype="int32"), } ) @@ -500,9 +518,23 @@ def get_empty_df(): if len(f) > 0 else get_empty_df(), meta=get_empty_df(), - ).reset_index(drop=True) + ).reset_index( + drop=True + ) # should be ok for dask else: - df = cudf.from_pandas(df).reset_index(drop=True) + df = pandas.DataFrame( + { + "src": pandas.Series(na_dst) + if order == "CSC" + else pandas.Series(na_src), + "dst": pandas.Series(na_src) + if order == "CSC" + else pandas.Series(na_dst), + "etp": pandas.Series(na_etp), + } + ) + df = cudf.from_pandas(df) + df.reset_index(drop=True, inplace=True) graph = cugraph.MultiGraph(directed=True) if multi_gpu: @@ -521,6 +553,7 @@ def get_empty_df(): edge_type="etp", ) + del df return graph @property diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py index 55aebf305da..f5035a38621 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py @@ -15,7 +15,6 @@ from cugraph_pyg.loader import CuGraphNeighborLoader from cugraph_pyg.data import CuGraphStore - from cugraph.utilities.utils import import_optional, MissingModule torch = import_optional("torch") diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py index 13c9c90c7c2..be8f8245807 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py @@ -386,3 +386,29 @@ def test_mg_frame_handle(graph, dask_client): F, G, N = graph cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) assert isinstance(cugraph_store._EXPERIMENTAL__CuGraphStore__graph._plc_graph, dict) + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +def test_cugraph_loader_large_index(dask_client): + large_index = ( + np.random.randint(0, 1_000_000, (100_000_000,)), + np.random.randint(0, 1_000_000, (100_000_000,)), + ) + + large_features = np.random.randint(0, 50, (1_000_000,)) + F = cugraph.gnn.FeatureStore(backend="torch") + F.add_data(large_features, "N", "f") + + store = CuGraphStore( + F, + {("N", "e", "N"): large_index}, + {"N": 1_000_000}, + multi_gpu=True, + ) + + graph = store._subgraph() + assert isinstance(graph, cugraph.Graph) + + el = graph.view_edge_list().compute() + assert (el["src"].values_host - large_index[0]).sum() == 0 + assert (el["dst"].values_host - large_index[1]).sum() == 0