Skip to content

Commit

Permalink
[BUG] Fix Graph Construction From Pandas in cuGraph-PyG (#3985)
Browse files Browse the repository at this point in the history
The current graph construction creates a single pandas dataframe, which for larger datasets (i.e. ogbn-papers100M) cannot be serialized.  This PR resolves this by breaking up the dataframe into scattered numpy arrays that are then reassembled.

Merge after #3978

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Naim (https://github.com/naimnv)

Approvers:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Brad Rees (https://github.com/BradReesWork)
  - Tingyu Wang (https://github.com/tingyu66)

URL: #3985
  • Loading branch information
alexbarghi-nv authored Nov 20, 2023
1 parent 0684f9d commit 0f28b2e
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 22 deletions.
75 changes: 54 additions & 21 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
import cugraph
import warnings

from cugraph.utilities.utils import import_optional, MissingModule
import dask.array as dar
import dask.dataframe as dd
import dask.distributed as distributed
import dask_cudf

dd = import_optional("dask.dataframe")
distributed = import_optional("dask.distributed")
dask_cudf = import_optional("dask_cudf")
from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional("torch")
torch_geometric = import_optional("torch_geometric")
Expand Down Expand Up @@ -367,6 +368,13 @@ def __infer_offsets(
}
)

def __dask_array_from_numpy(self, array: np.ndarray, npartitions: int):
return dar.from_array(
array,
meta=np.array([], dtype=array.dtype),
chunks=max(1, len(array) // npartitions),
)

def __construct_graph(
self,
edge_info: Dict[Tuple[str, str, str], List[TensorType]],
Expand Down Expand Up @@ -464,32 +472,42 @@ def __construct_graph(
]
)

df = pandas.DataFrame(
{
"src": pandas.Series(na_dst)
if order == "CSC"
else pandas.Series(na_src),
"dst": pandas.Series(na_src)
if order == "CSC"
else pandas.Series(na_dst),
"etp": pandas.Series(na_etp),
}
)
vertex_dtype = df.src.dtype
vertex_dtype = na_src.dtype

if multi_gpu:
nworkers = len(distributed.get_client().scheduler_info()["workers"])
df = dd.from_pandas(df, npartitions=nworkers if len(df) > 32 else 1)
client = distributed.get_client()
nworkers = len(client.scheduler_info()["workers"])
npartitions = nworkers * 4

src_dar = self.__dask_array_from_numpy(na_src, npartitions)
del na_src

dst_dar = self.__dask_array_from_numpy(na_dst, npartitions)
del na_dst

etp_dar = self.__dask_array_from_numpy(na_etp, npartitions)
del na_etp

df = dd.from_dask_array(etp_dar, columns=["etp"])
df["src"] = dst_dar if order == "CSC" else src_dar
df["dst"] = src_dar if order == "CSC" else dst_dar

del src_dar
del dst_dar
del etp_dar

if df.etp.dtype != "int32":
raise ValueError("Edge type must be int32!")

# Ensure the dataframe is constructed on each partition
# instead of adding additional synchronization head from potential
# host to device copies.
def get_empty_df():
return cudf.DataFrame(
{
"etp": cudf.Series([], dtype="int32"),
"src": cudf.Series([], dtype=vertex_dtype),
"dst": cudf.Series([], dtype=vertex_dtype),
"etp": cudf.Series([], dtype="int32"),
}
)

Expand All @@ -500,9 +518,23 @@ def get_empty_df():
if len(f) > 0
else get_empty_df(),
meta=get_empty_df(),
).reset_index(drop=True)
).reset_index(
drop=True
) # should be ok for dask
else:
df = cudf.from_pandas(df).reset_index(drop=True)
df = pandas.DataFrame(
{
"src": pandas.Series(na_dst)
if order == "CSC"
else pandas.Series(na_src),
"dst": pandas.Series(na_src)
if order == "CSC"
else pandas.Series(na_dst),
"etp": pandas.Series(na_etp),
}
)
df = cudf.from_pandas(df)
df.reset_index(drop=True, inplace=True)

graph = cugraph.MultiGraph(directed=True)
if multi_gpu:
Expand All @@ -521,6 +553,7 @@ def get_empty_df():
edge_type="etp",
)

del df
return graph

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from cugraph_pyg.loader import CuGraphNeighborLoader
from cugraph_pyg.data import CuGraphStore

from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional("torch")
Expand Down
26 changes: 26 additions & 0 deletions python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,29 @@ def test_mg_frame_handle(graph, dask_client):
F, G, N = graph
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)
assert isinstance(cugraph_store._EXPERIMENTAL__CuGraphStore__graph._plc_graph, dict)


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_cugraph_loader_large_index(dask_client):
large_index = (
np.random.randint(0, 1_000_000, (100_000_000,)),
np.random.randint(0, 1_000_000, (100_000_000,)),
)

large_features = np.random.randint(0, 50, (1_000_000,))
F = cugraph.gnn.FeatureStore(backend="torch")
F.add_data(large_features, "N", "f")

store = CuGraphStore(
F,
{("N", "e", "N"): large_index},
{"N": 1_000_000},
multi_gpu=True,
)

graph = store._subgraph()
assert isinstance(graph, cugraph.Graph)

el = graph.view_edge_list().compute()
assert (el["src"].values_host - large_index[0]).sum() == 0
assert (el["dst"].values_host - large_index[1]).sum() == 0

0 comments on commit 0f28b2e

Please sign in to comment.