Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-23.12' into sssp
Browse files Browse the repository at this point in the history
  • Loading branch information
rlratzel committed Nov 20, 2023
2 parents 3f44894 + 0f28b2e commit 70ea650
Show file tree
Hide file tree
Showing 41 changed files with 459 additions and 904 deletions.
4 changes: 4 additions & 0 deletions datasets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,13 @@ The benchmark datasets are described below:
| soc-twitter-2010 | 21,297,772 | 265,025,809 | No | No |

**cit-Patents** : A citation graph that includes all citations made by patents granted between 1975 and 1999, totaling 16,522,438 citations.

**soc-LiveJournal** : A graph of the LiveJournal social network.

**europe_osm** : A graph of OpenStreetMap data for Europe.

**hollywood** : A graph of movie actors where vertices are actors, and two actors are joined by an edge whenever they appeared in a movie together.

**soc-twitter-2010** : A network of follower relationships from a snapshot of Twitter in 2010, where an edge from i to j indicates that j is a follower of i.

_NOTE: the benchmark datasets were converted to a CSV format from their original format described in the reference URL below, and in doing so had edge weights and isolated vertices discarded._
Expand Down
75 changes: 54 additions & 21 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
import cugraph
import warnings

from cugraph.utilities.utils import import_optional, MissingModule
import dask.array as dar
import dask.dataframe as dd
import dask.distributed as distributed
import dask_cudf

dd = import_optional("dask.dataframe")
distributed = import_optional("dask.distributed")
dask_cudf = import_optional("dask_cudf")
from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional("torch")
torch_geometric = import_optional("torch_geometric")
Expand Down Expand Up @@ -367,6 +368,13 @@ def __infer_offsets(
}
)

def __dask_array_from_numpy(self, array: np.ndarray, npartitions: int):
return dar.from_array(
array,
meta=np.array([], dtype=array.dtype),
chunks=max(1, len(array) // npartitions),
)

def __construct_graph(
self,
edge_info: Dict[Tuple[str, str, str], List[TensorType]],
Expand Down Expand Up @@ -464,32 +472,42 @@ def __construct_graph(
]
)

df = pandas.DataFrame(
{
"src": pandas.Series(na_dst)
if order == "CSC"
else pandas.Series(na_src),
"dst": pandas.Series(na_src)
if order == "CSC"
else pandas.Series(na_dst),
"etp": pandas.Series(na_etp),
}
)
vertex_dtype = df.src.dtype
vertex_dtype = na_src.dtype

if multi_gpu:
nworkers = len(distributed.get_client().scheduler_info()["workers"])
df = dd.from_pandas(df, npartitions=nworkers if len(df) > 32 else 1)
client = distributed.get_client()
nworkers = len(client.scheduler_info()["workers"])
npartitions = nworkers * 4

src_dar = self.__dask_array_from_numpy(na_src, npartitions)
del na_src

dst_dar = self.__dask_array_from_numpy(na_dst, npartitions)
del na_dst

etp_dar = self.__dask_array_from_numpy(na_etp, npartitions)
del na_etp

df = dd.from_dask_array(etp_dar, columns=["etp"])
df["src"] = dst_dar if order == "CSC" else src_dar
df["dst"] = src_dar if order == "CSC" else dst_dar

del src_dar
del dst_dar
del etp_dar

if df.etp.dtype != "int32":
raise ValueError("Edge type must be int32!")

# Ensure the dataframe is constructed on each partition
# instead of adding additional synchronization head from potential
# host to device copies.
def get_empty_df():
return cudf.DataFrame(
{
"etp": cudf.Series([], dtype="int32"),
"src": cudf.Series([], dtype=vertex_dtype),
"dst": cudf.Series([], dtype=vertex_dtype),
"etp": cudf.Series([], dtype="int32"),
}
)

Expand All @@ -500,9 +518,23 @@ def get_empty_df():
if len(f) > 0
else get_empty_df(),
meta=get_empty_df(),
).reset_index(drop=True)
).reset_index(
drop=True
) # should be ok for dask
else:
df = cudf.from_pandas(df).reset_index(drop=True)
df = pandas.DataFrame(
{
"src": pandas.Series(na_dst)
if order == "CSC"
else pandas.Series(na_src),
"dst": pandas.Series(na_src)
if order == "CSC"
else pandas.Series(na_dst),
"etp": pandas.Series(na_etp),
}
)
df = cudf.from_pandas(df)
df.reset_index(drop=True, inplace=True)

graph = cugraph.MultiGraph(directed=True)
if multi_gpu:
Expand All @@ -521,6 +553,7 @@ def get_empty_df():
edge_type="etp",
)

del df
return graph

@property
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph-pyg/cugraph_pyg/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import torch
import numpy as np
from cugraph.gnn import FeatureStore
from cugraph.experimental.datasets import karate
from cugraph.datasets import karate

import tempfile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from cugraph_pyg.loader import CuGraphNeighborLoader
from cugraph_pyg.data import CuGraphStore

from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional("torch")
Expand Down
26 changes: 26 additions & 0 deletions python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,29 @@ def test_mg_frame_handle(graph, dask_client):
F, G, N = graph
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)
assert isinstance(cugraph_store._EXPERIMENTAL__CuGraphStore__graph._plc_graph, dict)


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_cugraph_loader_large_index(dask_client):
large_index = (
np.random.randint(0, 1_000_000, (100_000_000,)),
np.random.randint(0, 1_000_000, (100_000_000,)),
)

large_features = np.random.randint(0, 50, (1_000_000,))
F = cugraph.gnn.FeatureStore(backend="torch")
F.add_data(large_features, "N", "f")

store = CuGraphStore(
F,
{("N", "e", "N"): large_index},
{"N": 1_000_000},
multi_gpu=True,
)

graph = store._subgraph()
assert isinstance(graph, cugraph.Graph)

el = graph.view_edge_list().compute()
assert (el["src"].values_host - large_index[0]).sum() == 0
assert (el["dst"].values_host - large_index[1]).sum() == 0
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import cugraph
from cugraph.experimental import PropertyGraph, MGPropertyGraph
from cugraph.experimental import datasets
from cugraph import datasets
from cugraph.generators import rmat


Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/dask/community/leiden.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def leiden(
Examples
--------
>>> from cugraph.experimental.datasets import karate
>>> from cugraph.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts, modularity_score = cugraph.leiden(G)
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/dask/community/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def louvain(
Examples
--------
>>> from cugraph.experimental.datasets import karate
>>> from cugraph.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts = cugraph.louvain(G)
Expand Down
10 changes: 10 additions & 0 deletions python/cugraph/cugraph/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@
small_tree = Dataset(meta_path / "small_tree.yaml")
toy_graph = Dataset(meta_path / "toy_graph.yaml")
toy_graph_undirected = Dataset(meta_path / "toy_graph_undirected.yaml")

# Benchmarking datasets: be mindful of memory usage
# 250 MB
soc_livejournal = Dataset(meta_path / "soc-livejournal1.yaml")
# 965 MB
cit_patents = Dataset(meta_path / "cit-patents.yaml")
# 1.8 GB
europe_osm = Dataset(meta_path / "europe_osm.yaml")
# 1.5 GB
hollywood = Dataset(meta_path / "hollywood.yaml")
Loading

0 comments on commit 70ea650

Please sign in to comment.