Skip to content

Commit

Permalink
Support Negative Sampling in pylibcugraph and cuGraph-PyG (#4660)
Browse files Browse the repository at this point in the history
Adds support for negative sampling in `pylibcugraph` and `cugraph-pyg`.

Closes rapidsai/cugraph-gnn#39
Merge after #4641

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #4660
  • Loading branch information
alexbarghi-nv authored Sep 30, 2024
1 parent 0f4fe8f commit 4dfd3b3
Show file tree
Hide file tree
Showing 36 changed files with 3,415 additions and 466 deletions.
21 changes: 15 additions & 6 deletions cpp/src/c_api/graph_generators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,32 +124,41 @@ cugraph_error_code_t cugraph_generate_rmat_edgelists(
extern "C" cugraph_type_erased_device_array_view_t* cugraph_coo_get_sources(cugraph_coo_t* coo)
{
auto internal_pointer = reinterpret_cast<cugraph::c_api::cugraph_coo_t*>(coo);
return reinterpret_cast<cugraph_type_erased_device_array_view_t*>(internal_pointer->src_->view());
return (internal_pointer->src_) ? reinterpret_cast<cugraph_type_erased_device_array_view_t*>(
internal_pointer->src_->view())
: nullptr;
}

extern "C" cugraph_type_erased_device_array_view_t* cugraph_coo_get_destinations(cugraph_coo_t* coo)
{
auto internal_pointer = reinterpret_cast<cugraph::c_api::cugraph_coo_t*>(coo);
return reinterpret_cast<cugraph_type_erased_device_array_view_t*>(internal_pointer->dst_->view());
return (internal_pointer->dst_) ? reinterpret_cast<cugraph_type_erased_device_array_view_t*>(
internal_pointer->dst_->view())
: nullptr;
}

extern "C" cugraph_type_erased_device_array_view_t* cugraph_coo_get_edge_weights(cugraph_coo_t* coo)
{
auto internal_pointer = reinterpret_cast<cugraph::c_api::cugraph_coo_t*>(coo);
return reinterpret_cast<cugraph_type_erased_device_array_view_t*>(internal_pointer->wgt_->view());
return (internal_pointer->wgt_) ? reinterpret_cast<cugraph_type_erased_device_array_view_t*>(
internal_pointer->wgt_->view())
: nullptr;
}

extern "C" cugraph_type_erased_device_array_view_t* cugraph_coo_get_edge_id(cugraph_coo_t* coo)
{
auto internal_pointer = reinterpret_cast<cugraph::c_api::cugraph_coo_t*>(coo);
return reinterpret_cast<cugraph_type_erased_device_array_view_t*>(internal_pointer->id_->view());
return (internal_pointer->id_) ? reinterpret_cast<cugraph_type_erased_device_array_view_t*>(
internal_pointer->id_->view())
: nullptr;
}

extern "C" cugraph_type_erased_device_array_view_t* cugraph_coo_get_edge_type(cugraph_coo_t* coo)
{
auto internal_pointer = reinterpret_cast<cugraph::c_api::cugraph_coo_t*>(coo);
return reinterpret_cast<cugraph_type_erased_device_array_view_t*>(
internal_pointer->type_->view());
return (internal_pointer->type_) ? reinterpret_cast<cugraph_type_erased_device_array_view_t*>(
internal_pointer->type_->view())
: nullptr;
}

extern "C" size_t cugraph_coo_list_size(const cugraph_coo_list_t* coo_list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,8 @@ def sample(

if g.is_homogeneous:
indices = torch.concat(list(indices))
ds.sample_from_nodes(indices.long(), batch_size=batch_size)
return HomogeneousSampleReader(
ds.get_reader(), self.output_format, self.edge_dir
)
reader = ds.sample_from_nodes(indices.long(), batch_size=batch_size)
return HomogeneousSampleReader(reader, self.output_format, self.edge_dir)

raise ValueError(
"Sampling heterogeneous graphs is currently"
Expand Down
15 changes: 9 additions & 6 deletions python/cugraph-dgl/cugraph_dgl/dataloading/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
create_homogeneous_sampled_graphs_from_tensors_csc,
)

from cugraph.gnn import DistSampleReader

from cugraph.utilities.utils import import_optional

Expand All @@ -33,14 +32,18 @@ class SampleReader:
Iterator that processes results from the cuGraph distributed sampler.
"""

def __init__(self, base_reader: DistSampleReader, output_format: str = "dgl.Block"):
def __init__(
self,
base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]],
output_format: str = "dgl.Block",
):
"""
Constructs a new SampleReader.
Parameters
----------
base_reader: DistSampleReader
The reader responsible for loading saved samples produced by
base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]]
The iterator responsible for loading saved samples produced by
the cuGraph distributed sampler.
"""
self.__output_format = output_format
Expand Down Expand Up @@ -83,7 +86,7 @@ class HomogeneousSampleReader(SampleReader):

def __init__(
self,
base_reader: DistSampleReader,
base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]],
output_format: str = "dgl.Block",
edge_dir="in",
):
Expand All @@ -92,7 +95,7 @@ def __init__(
Parameters
----------
base_reader: DistSampleReader
base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]]
The reader responsible for loading saved samples produced by
the cuGraph distributed sampler.
output_format: str
Expand Down
3 changes: 0 additions & 3 deletions python/cugraph-dgl/cugraph_dgl/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,6 @@ def _get_n_emb(
)

try:
print(
u,
)
return self.__ndata_storage[ntype, emb_name].fetch(
_cast_to_torch_tensor(u), "cuda"
)
Expand Down
7 changes: 6 additions & 1 deletion python/cugraph-pyg/cugraph_pyg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -12,3 +12,8 @@
# limitations under the License.

from cugraph_pyg._version import __git_commit__, __version__

import cugraph_pyg.data
import cugraph_pyg.loader
import cugraph_pyg.sampler
import cugraph_pyg.nn
17 changes: 11 additions & 6 deletions python/cugraph-pyg/cugraph_pyg/data/graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,18 @@ def _num_vertices(self) -> Dict[str, int]:
else edge_attr.size[1]
)
else:
if edge_attr.edge_type[0] not in num_vertices:
if edge_attr.edge_type[0] != edge_attr.edge_type[2]:
if edge_attr.edge_type[0] not in num_vertices:
num_vertices[edge_attr.edge_type[0]] = int(
self.__edge_indices[edge_attr.edge_type][0].max() + 1
)
if edge_attr.edge_type[2] not in num_vertices:
num_vertices[edge_attr.edge_type[1]] = int(
self.__edge_indices[edge_attr.edge_type][1].max() + 1
)
elif edge_attr.edge_type[0] not in num_vertices:
num_vertices[edge_attr.edge_type[0]] = int(
self.__edge_indices[edge_attr.edge_type][0].max() + 1
)
if edge_attr.edge_type[2] not in num_vertices:
num_vertices[edge_attr.edge_type[1]] = int(
self.__edge_indices[edge_attr.edge_type][1].max() + 1
self.__edge_indices[edge_attr.edge_type].max() + 1
)

if self.is_multi_gpu:
Expand Down
24 changes: 18 additions & 6 deletions python/cugraph-pyg/cugraph_pyg/examples/gcn_dist_mnmg.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ def run_train(
wall_clock_start,
tempdir=None,
num_layers=3,
in_memory=False,
seeds_per_call=-1,
):
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0005)

Expand All @@ -196,20 +198,23 @@ def run_train(
from cugraph_pyg.loader import NeighborLoader

ix_train = split_idx["train"].cuda()
train_path = os.path.join(tempdir, f"train_{global_rank}")
os.mkdir(train_path)
train_path = None if in_memory else os.path.join(tempdir, f"train_{global_rank}")
if train_path:
os.mkdir(train_path)
train_loader = NeighborLoader(
data,
input_nodes=ix_train,
directory=train_path,
shuffle=True,
drop_last=True,
local_seeds_per_call=seeds_per_call if seeds_per_call > 0 else None,
**kwargs,
)

ix_test = split_idx["test"].cuda()
test_path = os.path.join(tempdir, f"test_{global_rank}")
os.mkdir(test_path)
test_path = None if in_memory else os.path.join(tempdir, f"test_{global_rank}")
if test_path:
os.mkdir(test_path)
test_loader = NeighborLoader(
data,
input_nodes=ix_test,
Expand All @@ -221,14 +226,16 @@ def run_train(
)

ix_valid = split_idx["valid"].cuda()
valid_path = os.path.join(tempdir, f"valid_{global_rank}")
os.mkdir(valid_path)
valid_path = None if in_memory else os.path.join(tempdir, f"valid_{global_rank}")
if valid_path:
os.mkdir(valid_path)
valid_loader = NeighborLoader(
data,
input_nodes=ix_valid,
directory=valid_path,
shuffle=True,
drop_last=True,
local_seeds_per_call=seeds_per_call if seeds_per_call > 0 else None,
**kwargs,
)

Expand Down Expand Up @@ -347,6 +354,9 @@ def parse_args():
parser.add_argument("--skip_partition", action="store_true")
parser.add_argument("--wg_mem_type", type=str, default="distributed")

parser.add_argument("--in_memory", action="store_true", default=False)
parser.add_argument("--seeds_per_call", type=int, default=-1)

return parser.parse_args()


Expand Down Expand Up @@ -429,6 +439,8 @@ def parse_args():
wall_clock_start,
tempdir,
args.num_layers,
args.in_memory,
args.seeds_per_call,
)
else:
warnings.warn("This script should be run with 'torchrun`. Exiting.")
24 changes: 20 additions & 4 deletions python/cugraph-pyg/cugraph_pyg/examples/gcn_dist_sg.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,28 @@ def test(loader: NeighborLoader, val_steps: Optional[int] = None):


def create_loader(
data, num_neighbors, input_nodes, replace, batch_size, samples_dir, stage_name
data,
num_neighbors,
input_nodes,
replace,
batch_size,
samples_dir,
stage_name,
local_seeds_per_call,
):
directory = os.path.join(samples_dir, stage_name)
os.mkdir(directory)
if samples_dir is not None:
directory = os.path.join(samples_dir, stage_name)
os.mkdir(directory)
else:
directory = None
return NeighborLoader(
data,
num_neighbors=num_neighbors,
input_nodes=input_nodes,
replace=replace,
batch_size=batch_size,
directory=directory,
local_seeds_per_call=local_seeds_per_call,
)


Expand Down Expand Up @@ -147,6 +158,8 @@ def parse_args():
parser.add_argument("--tempdir_root", type=str, default=None)
parser.add_argument("--dataset_root", type=str, default="dataset")
parser.add_argument("--dataset", type=str, default="ogbn-products")
parser.add_argument("--in_memory", action="store_true", default=False)
parser.add_argument("--seeds_per_call", type=int, default=-1)

return parser.parse_args()

Expand All @@ -170,7 +183,10 @@ def parse_args():
"num_neighbors": [args.fan_out] * args.num_layers,
"replace": False,
"batch_size": args.batch_size,
"samples_dir": samples_dir,
"samples_dir": None if args.in_memory else samples_dir,
"local_seeds_per_call": None
if args.seeds_per_call <= 0
else args.seeds_per_call,
}

train_loader = create_loader(
Expand Down
23 changes: 17 additions & 6 deletions python/cugraph-pyg/cugraph_pyg/examples/gcn_dist_snmg.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def run_train(
wall_clock_start,
tempdir=None,
num_layers=3,
in_memory=False,
seeds_per_call=-1,
):

init_pytorch_worker(
Expand Down Expand Up @@ -119,20 +121,23 @@ def run_train(
dist.barrier()

ix_train = torch.tensor_split(split_idx["train"], world_size)[rank].cuda()
train_path = os.path.join(tempdir, f"train_{rank}")
os.mkdir(train_path)
train_path = None if in_memory else os.path.join(tempdir, f"train_{rank}")
if train_path:
os.mkdir(train_path)
train_loader = NeighborLoader(
(feature_store, graph_store),
input_nodes=ix_train,
directory=train_path,
shuffle=True,
drop_last=True,
local_seeds_per_call=seeds_per_call if seeds_per_call > 0 else None,
**kwargs,
)

ix_test = torch.tensor_split(split_idx["test"], world_size)[rank].cuda()
test_path = os.path.join(tempdir, f"test_{rank}")
os.mkdir(test_path)
test_path = None if in_memory else os.path.join(tempdir, f"test_{rank}")
if test_path:
os.mkdir(test_path)
test_loader = NeighborLoader(
(feature_store, graph_store),
input_nodes=ix_test,
Expand All @@ -144,14 +149,16 @@ def run_train(
)

ix_valid = torch.tensor_split(split_idx["valid"], world_size)[rank].cuda()
valid_path = os.path.join(tempdir, f"valid_{rank}")
os.mkdir(valid_path)
valid_path = None if in_memory else os.path.join(tempdir, f"valid_{rank}")
if valid_path:
os.mkdir(valid_path)
valid_loader = NeighborLoader(
(feature_store, graph_store),
input_nodes=ix_valid,
directory=valid_path,
shuffle=True,
drop_last=True,
local_seeds_per_call=seeds_per_call if seeds_per_call > 0 else None,
**kwargs,
)

Expand Down Expand Up @@ -269,6 +276,8 @@ def run_train(
parser.add_argument("--tempdir_root", type=str, default=None)
parser.add_argument("--dataset_root", type=str, default="dataset")
parser.add_argument("--dataset", type=str, default="ogbn-products")
parser.add_argument("--in_memory", action="store_true", default=False)
parser.add_argument("--seeds_per_call", type=int, default=-1)

parser.add_argument(
"--n_devices",
Expand Down Expand Up @@ -322,6 +331,8 @@ def run_train(
wall_clock_start,
tempdir,
args.num_layers,
args.in_memory,
args.seeds_per_call,
),
nprocs=world_size,
join=True,
Expand Down
Loading

0 comments on commit 4dfd3b3

Please sign in to comment.