Skip to content

Commit

Permalink
fix csr/csc issue, wrap up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Jul 3, 2024
1 parent 92fd866 commit 6107d82
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 52 deletions.
5 changes: 3 additions & 2 deletions python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
from cugraph_dgl.dataloading.neighbor_sampler import NeighborSampler

from cugraph_dgl.dataloading.dask_dataloader import DaskDataLoader
from cugraph_dgl.dataloading.dataloader import DataLoader as FutureDataLoader


def DataLoader(*args, **kwargs):
warnings.warn(
"DataLoader has been renamed to DaskDataLoader. "
"In Release 24.10, cugraph_dgl.dataloading.dataloader.DataLoader "
"In Release 24.10, cugraph_dgl.dataloading.FutureDataLoader "
"will take over the DataLoader name.",
FutureWarning
FutureWarning,
)
return DaskDataLoader(*args, **kwargs)
6 changes: 3 additions & 3 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ def __iter__(self):
return self.__sampler.sample(
self.__graph,
self.__dataset,
self.__batch_size,
batch_size=self.__batch_size,
)

"""
start, end, blocks = out
start = start.to(self.__device)
end = end.to(self.__device)
blocks = [b.to(self.__device) for b in blocks]
"""
"""
15 changes: 10 additions & 5 deletions python/cugraph-dgl/cugraph_dgl/dataloading/neighbor_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from cugraph.utilities.utils import import_optional

import cugraph_dgl
from cugraph_dgl.typing import TensorType, DGLSamplerOutput
from cugraph_dgl.typing import DGLSamplerOutput
from cugraph_dgl.dataloading.sampler import Sampler, HomogeneousSampleReader

torch = import_optional("torch")
Expand Down Expand Up @@ -161,11 +161,14 @@ def __init__(
)

def sample(
self, g: "cugraph_dgl.Graph", indices: Iterator["torch.Tensor"], batch_size: int = 1
self,
g: "cugraph_dgl.Graph",
indices: Iterator["torch.Tensor"],
batch_size: int = 1,
) -> Iterator[DGLSamplerOutput]:
kwargs = dict(**self.__kwargs)

directory = kwargs.pop('directory', None)
directory = kwargs.pop("directory", None)
if directory is None:
warnings.warn("Setting a directory to store samples is recommended.")
self._tempdir = tempfile.TemporaryDirectory()
Expand All @@ -180,7 +183,7 @@ def sample(
ds = UniformNeighborSampler(
g._graph(self.edge_dir),
writer,
compression=self.sparse_format.upper(),
compression="CSR",
fanout=self._reversed_fanout_vals,
prior_sources_behavior="carryover",
deduplicate_sources=True,
Expand All @@ -192,7 +195,9 @@ def sample(
if g.is_homogeneous:
indices = torch.concat(list(indices))
ds.sample_from_nodes(indices, batch_size=batch_size)
return HomogeneousSampleReader(ds.get_reader(), self.output_format)
return HomogeneousSampleReader(
ds.get_reader(), self.output_format, self.edge_dir
)

raise ValueError(
"Sampling heterogeneous graphs is currently"
Expand Down
19 changes: 15 additions & 4 deletions python/cugraph-dgl/cugraph_dgl/dataloading/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import cugraph_dgl
from cugraph_dgl.nn import SparseGraph
from cugraph_dgl.typing import TensorType, DGLSamplerOutput
from cugraph_dgl.typing import DGLSamplerOutput
from cugraph_dgl.dataloading.utils.sampling_helpers import (
create_homogeneous_sampled_graphs_from_tensors_csc,
)
Expand Down Expand Up @@ -81,7 +81,12 @@ class HomogeneousSampleReader(SampleReader):
produced by the cuGraph distributed sampler.
"""

def __init__(self, base_reader: DistSampleReader, output_format: str = "dgl.Block"):
def __init__(
self,
base_reader: DistSampleReader,
output_format: str = "dgl.Block",
edge_dir="in",
):
"""
Constructs a new HomogeneousSampleReader
Expand All @@ -93,7 +98,11 @@ def __init__(self, base_reader: DistSampleReader, output_format: str = "dgl.Bloc
output_format: str
The output format for blocks (either "dgl.Block" or
"cugraph_dgl.nn.SparseGraph").
edge_dir: str
The direction sampling was performed in ("in" or "out").
"""

self.__edge_dir = edge_dir
super().__init__(base_reader, output_format=output_format)

def __decode_csc(
Expand Down Expand Up @@ -141,7 +150,6 @@ def __init__(self, sparse_format: str = "csc", output_format="dgl.Block"):
if sparse_format != "csc":
raise ValueError("Only CSC format is supported at this time")

self.__sparse_format = sparse_format
self.__output_format = output_format

@property
Expand All @@ -153,7 +161,10 @@ def sparse_format(self):
return self.__sparse_format

def sample(
self, g: cugraph_dgl.Graph, indices: Iterator["torch.Tensor"], batch_size: int = 1
self,
g: cugraph_dgl.Graph,
indices: Iterator["torch.Tensor"],
batch_size: int = 1,
) -> Iterator[
Tuple["torch.Tensor", "torch.Tensor", List[Union[SparseGraph, "dgl.Block"]]]
]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,21 +592,28 @@ def _create_homogeneous_blocks_from_csc(
]

blocks = []
seednodes_range=None
seednodes_range = None
for mfg in mfgs:
block_mfg = _create_homogeneous_dgl_block_from_tensor_d(
{'sources': mfg.src_ids(), 'destinations': mfg.dst_ids(), 'sources_range': mfg._num_src_nodes-1, 'destinations_range': mfg._num_dst_nodes-1},
{
"sources": mfg.src_ids(),
"destinations": mfg.dst_ids(),
"sources_range": mfg._num_src_nodes - 1,
"destinations_range": mfg._num_dst_nodes - 1,
},
renumber_map=renumber_map_list[b_id],
seednodes_range=seednodes_range
seednodes_range=seednodes_range,
)

seednodes_range = max(
mfg._num_src_nodes-1,
mfg._num_dst_nodes-1,
mfg._num_src_nodes - 1,
mfg._num_dst_nodes - 1,
)
blocks.append(block_mfg)
del mfgs

blocks.reverse()

output_batch.append(blocks)

output.append(output_batch)
Expand Down
34 changes: 18 additions & 16 deletions python/cugraph-dgl/cugraph_dgl/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,26 +191,28 @@ def add_nodes(
raise ValueError("The global number of nodes must match on all workers")

# Ensure the sum of the feature shapes equals the global number of nodes.
for feature_name, feature_tensor in data.items():
features_size = torch.tensor(
[int(feature_tensor.shape[0])], device="cuda", dtype=torch.int64
)
torch.distributed.all_reduce(
features_size, op=torch.distributed.ReduceOp.SUM
)
if features_size != global_num_nodes:
raise ValueError(
"The total length of the feature vector across workers must"
" match the global number of nodes but it does not match for "
f"{feature_name}."
if data is not None:
for feature_name, feature_tensor in data.items():
features_size = torch.tensor(
[int(feature_tensor.shape[0])], device="cuda", dtype=torch.int64
)
torch.distributed.all_reduce(
features_size, op=torch.distributed.ReduceOp.SUM
)
if features_size != global_num_nodes:
raise ValueError(
"The total length of the feature vector across workers must"
" match the global number of nodes but it does not "
f"match for {feature_name}."
)

self.__num_nodes_dict[ntype] = global_num_nodes

for feature_name, feature_tensor in data.items():
self.__ndata_storage[ntype, feature_name] = self.__ndata_storage_type(
_cast_to_torch_tensor(feature_tensor), **self.__wg_kwargs
)
if data is not None:
for feature_name, feature_tensor in data.items():
self.__ndata_storage[ntype, feature_name] = self.__ndata_storage_type(
_cast_to_torch_tensor(feature_tensor), **self.__wg_kwargs
)

self.__graph = None
self.__vertex_offsets = None
Expand Down
113 changes: 96 additions & 17 deletions python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,118 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import cugraph_dgl.dataloading
import pytest

import cugraph_dgl
from cugraph_dgl.dataloading.dataloader import DataLoader
from cugraph_dgl.dataloading import NeighborSampler

from cugraph.datasets import karate
from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional('torch')
dgl = import_optional('dgl')
import numpy as np

torch = import_optional("torch")
dgl = import_optional("dgl")


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available")
def test_dataloader_basic_homogeneous():
graph = cugraph_dgl.Graph(
is_multi_gpu=False
)
graph = cugraph_dgl.Graph(is_multi_gpu=False)

num_nodes = karate.number_of_nodes()
graph.add_nodes(
num_nodes,
data={'z': torch.arange(num_nodes)}
)
graph.add_nodes(num_nodes, data={"z": torch.arange(num_nodes)})

edf = karate.get_edgelist()
graph.add_edges(
u=edf['src'],
v=edf['dst'],
data={'q': torch.arange(karate.number_of_edges())}
u=edf["src"], v=edf["dst"], data={"q": torch.arange(karate.number_of_edges())}
)

sampler = cugraph_dgl.dataloading.NeighborSampler([5, 5, 5])
loader = cugraph_dgl.dataloading.FutureDataLoader(
graph, torch.arange(num_nodes), sampler, batch_size=2
)

for in_t, out_t, blocks in loader:
assert len(blocks) == 3
assert len(out_t) <= 2


def sample_dgl_graphs(g, train_nid, fanouts, batch_size=1):
# Single fanout to match cugraph
sampler = dgl.dataloading.NeighborSampler(fanouts)
dataloader = dgl.dataloading.DataLoader(
g,
train_nid,
sampler,
batch_size=batch_size,
shuffle=False,
drop_last=False,
num_workers=0,
)

dgl_output = {}
for batch_id, (input_nodes, output_nodes, blocks) in enumerate(dataloader):
dgl_output[batch_id] = {
"input_nodes": input_nodes,
"output_nodes": output_nodes,
"blocks": blocks,
}
return dgl_output


def sample_cugraph_dgl_graphs(cugraph_g, train_nid, fanouts, batch_size=1):
sampler = cugraph_dgl.dataloading.NeighborSampler(fanouts)

dataloader = cugraph_dgl.dataloading.FutureDataLoader(
cugraph_g,
train_nid,
sampler,
batch_size=batch_size,
drop_last=False,
shuffle=False,
)

sampler = NeighborSampler([5, 5, 5])
loader = DataLoader(graph, torch.arange(num_nodes), sampler, batch_size=2)
cugraph_dgl_output = {}
for batch_id, (input_nodes, output_nodes, blocks) in enumerate(dataloader):
cugraph_dgl_output[batch_id] = {
"input_nodes": input_nodes,
"output_nodes": output_nodes,
"blocks": blocks,
}
return cugraph_dgl_output


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available")
@pytest.mark.parametrize("ix", [[1], [1, 0]])
@pytest.mark.parametrize("batch_size", [1, 2])
def test_same_homogeneousgraph_results(ix, batch_size):
src = torch.tensor([1, 2, 3, 4, 5, 6, 7, 8])
dst = torch.tensor([0, 0, 0, 0, 1, 1, 1, 1])

train_nid = torch.tensor(ix)
# Create a heterograph with 3 node types and 3 edges types.
dgl_g = dgl.graph((src, dst))

print(next(iter(loader)))
cugraph_g = cugraph_dgl.Graph(is_multi_gpu=False)
cugraph_g.add_nodes(9)
cugraph_g.add_edges(u=src, v=dst)

dgl_output = sample_dgl_graphs(dgl_g, train_nid, [2], batch_size=batch_size)
cugraph_output = sample_cugraph_dgl_graphs(cugraph_g, train_nid, [2], batch_size)

cugraph_output_nodes = cugraph_output[0]["output_nodes"].cpu().numpy()
dgl_output_nodes = dgl_output[0]["output_nodes"].cpu().numpy()

np.testing.assert_array_equal(
np.sort(cugraph_output_nodes), np.sort(dgl_output_nodes)
)
assert (
dgl_output[0]["blocks"][0].num_dst_nodes()
== cugraph_output[0]["blocks"][0].num_dst_nodes()
)
assert (
dgl_output[0]["blocks"][0].num_edges()
== cugraph_output[0]["blocks"][0].num_edges()
)
Loading

0 comments on commit 6107d82

Please sign in to comment.