Skip to content

Commit

Permalink
add mg negative sampling tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Oct 18, 2024
1 parent b4cd8de commit 90e0956
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 75 deletions.
7 changes: 7 additions & 0 deletions python/cugraph-dgl/cugraph_dgl/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,13 @@ def global_uniform_negative_sampling(
num_samples_global = int(num_samples_global)

vertices = torch.tensor_split(vertices, world_size)[rank]

src_bias = cupy.array_split(src_bias, world_size)[rank]
dst_bias = (
src_bias
if can_edge_type[0] == can_edge_type[2]
else cupy.array_split(dst_bias, world_size)[rank]
)
else:
num_samples_global = num_samples

Expand Down
16 changes: 9 additions & 7 deletions python/cugraph-dgl/cugraph_dgl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ def create_karate_bipartite(multi_gpu: bool = False):
node_x_1 = np.random.random((num_nodes_group_1,))
node_x_2 = np.random.random((num_nodes_group_2,))

if multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

node_x_1 = np.array_split(node_x_1, world_size)[rank]
node_x_2 = np.array_split(node_x_2, world_size)[rank]

graph.add_nodes(num_nodes_group_1, {"x": node_x_1}, "type1")
graph.add_nodes(num_nodes_group_2, {"x": node_x_2}, "type2")

Expand Down Expand Up @@ -114,8 +121,8 @@ def create_karate_bipartite(multi_gpu: bool = False):
world_size = torch.distributed.get_world_size()

edges_local = {
etype: edf.iloc[np.array_split(np.arange(edf), world_size)[rank]]
for etype, edf in edges
etype: edf.iloc[np.array_split(np.arange(len(edf)), world_size)[rank]]
for etype, edf in edges.items()
}
else:
edges_local = edges
Expand All @@ -129,8 +136,3 @@ def create_karate_bipartite(multi_gpu: bool = False):
@pytest.fixture
def karate_bipartite():
return create_karate_bipartite(False)


@pytest.fixture
def karate_bipartite_mg():
return create_karate_bipartite(True)
184 changes: 116 additions & 68 deletions python/cugraph-dgl/cugraph_dgl/tests/test_graph_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)

from .utils import init_pytorch_worker
from .conftest import create_karate_bipartite

pylibwholegraph = import_optional("pylibwholegraph")
torch = import_optional("torch")
Expand Down Expand Up @@ -122,8 +123,10 @@ def run_test_graph_make_homogeneous_graph_mg(rank, uid, world_size, direction):
assert (d_out_actual == d_out_exp).all()

cugraph_comms_shutdown()
torch.distributed.destroy_process_group()


@pytest.mark.skip(reason="bleh")
@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skipif(
isinstance(pylibwholegraph, MissingModule), reason="wholegraph not available"
Expand Down Expand Up @@ -287,8 +290,10 @@ def run_test_graph_make_heterogeneous_graph_mg(rank, uid, world_size, direction)
assert len(f) > 0 # may be multiple, some could be on other GPU

cugraph_comms_shutdown()
torch.distributed.destroy_process_group()


@pytest.mark.skip(reason="bleh")
@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skipif(
isinstance(pylibwholegraph, MissingModule), reason="wholegraph not available"
Expand All @@ -312,160 +317,145 @@ def test_graph_make_heterogeneous_graph_mg(direction):

def run_test_graph_find_simple_mg(rank, world_size, uid, direction):
init_pytorch_worker(rank, world_size, uid)
df = karate.get_edgelist()

total_num_nodes = max(df.src.max(), df.dst.max()) + 1

num_nodes_group_1 = total_num_nodes // 2
num_nodes_group_2 = total_num_nodes - num_nodes_group_1

node_x_1 = np.array_split(np.random.random((num_nodes_group_1,)), world_size)[rank]
node_x_2 = np.array_split(np.random.random((num_nodes_group_2,)), world_size)[rank]

graph = cugraph_dgl.Graph(is_multi_gpu=True)
graph.add_nodes(num_nodes_group_1, {"x": node_x_1}, "type1")
graph.add_nodes(num_nodes_group_2, {"x": node_x_2}, "type2")

edges_11 = df[(df.src < num_nodes_group_1) & (df.dst < num_nodes_group_1)]
edges_12 = df[(df.src < num_nodes_group_1) & (df.dst >= num_nodes_group_1)]
edges_21 = df[(df.src >= num_nodes_group_1) & (df.dst < num_nodes_group_1)]
edges_22 = df[(df.src >= num_nodes_group_1) & (df.dst >= num_nodes_group_1)]

edges_12.dst -= num_nodes_group_1
edges_21.src -= num_nodes_group_1
edges_22.dst -= num_nodes_group_1
edges_22.src -= num_nodes_group_1

edges_11_local = edges_11.iloc[
np.array_split(np.arange(len(edges_11)), world_size)[rank]
]
edges_12_local = edges_12.iloc[
np.array_split(np.arange(len(edges_12)), world_size)[rank]
]
edges_21_local = edges_21.iloc[
np.array_split(np.arange(len(edges_21)), world_size)[rank]
]
edges_22_local = edges_22.iloc[
np.array_split(np.arange(len(edges_22)), world_size)[rank]
]

graph.add_edges(
edges_11_local.src, edges_11_local.dst, etype=("type1", "e1", "type1")
)
graph.add_edges(
edges_12_local.src, edges_12_local.dst, etype=("type1", "e2", "type2")
)
graph.add_edges(
edges_21_local.src, edges_21_local.dst, etype=("type2", "e3", "type1")
)
graph.add_edges(
edges_22_local.src, edges_22_local.dst, etype=("type2", "e4", "type2")
)
graph, edges, _ = create_karate_bipartite(multi_gpu=True)

# force direction generation to make sure in case is tested
graph._graph(direction)

assert not graph.is_homogeneous
assert graph.is_multi_gpu

if len(edges_11) > 0:
if len(edges[("type1", "e1", "type1")]) > 0:
srcs, dsts = graph.find_edges(
torch.as_tensor([0, len(edges_11) - 1, 999], dtype=torch.int64),
torch.as_tensor(
[0, len(edges[("type1", "e1", "type1")]) - 1, 999], dtype=torch.int64
),
("type1", "e1", "type1"),
)
assert (
srcs[[0, 1]]
== torch.tensor(
[edges_11.src.iloc[0], edges_11.src.iloc[-1]],
[
edges[("type1", "e1", "type1")].src.iloc[0],
edges[("type1", "e1", "type1")].src.iloc[-1],
],
device="cuda",
dtype=torch.int64,
)
).all()
assert (
dsts[[0, 1]]
== torch.tensor(
[edges_11.dst.iloc[0], edges_11.dst.iloc[-1]],
[
edges[("type1", "e1", "type1")].dst.iloc[0],
edges[("type1", "e1", "type1")].dst.iloc[-1],
],
device="cuda",
dtype=torch.int64,
)
).all()
assert srcs[2] < 0 and dsts[2] < 0
if len(edges_12) > 0:
if len(edges[("type1", "e2", "type2")]) > 0:
srcs, dsts = graph.find_edges(
torch.as_tensor([0, len(edges_12) - 1, 999], dtype=torch.int64),
torch.as_tensor(
[0, len(edges[("type1", "e2", "type2")]) - 1, 999], dtype=torch.int64
),
("type1", "e2", "type2"),
)
assert (
srcs[[0, 1]]
== torch.tensor(
[edges_12.src.iloc[0], edges_12.src.iloc[-1]],
[
edges[("type1", "e2", "type2")].src.iloc[0],
edges[("type1", "e2", "type2")].src.iloc[-1],
],
device="cuda",
dtype=torch.int64,
)
).all()
assert (
dsts[[0, 1]]
== torch.tensor(
[edges_12.dst.iloc[0], edges_12.dst.iloc[-1]],
[
edges[("type1", "e2", "type2")].dst.iloc[0],
edges[("type1", "e2", "type2")].dst.iloc[-1],
],
device="cuda",
dtype=torch.int64,
)
).all()
assert srcs[2] < 0 and dsts[2] < 0
if len(edges_21) > 0:
if len(edges[("type2", "e3", "type1")]) > 0:
srcs, dsts = graph.find_edges(
torch.as_tensor([0, len(edges_21) - 1, 999], dtype=torch.int64),
torch.as_tensor(
[0, len(edges[("type2", "e3", "type1")]) - 1, 999], dtype=torch.int64
),
("type2", "e3", "type1"),
)
assert (
srcs[[0, 1]]
== torch.tensor(
[edges_21.src.iloc[0], edges_21.src.iloc[-1]],
[
edges[("type2", "e3", "type1")].src.iloc[0],
edges[("type2", "e3", "type1")].src.iloc[-1],
],
device="cuda",
dtype=torch.int64,
)
).all()
assert (
dsts[[0, 1]]
== torch.tensor(
[edges_21.dst.iloc[0], edges_21.dst.iloc[-1]],
[
edges[("type2", "e3", "type1")].dst.iloc[0],
edges[("type2", "e3", "type1")].dst.iloc[-1],
],
device="cuda",
dtype=torch.int64,
)
).all()
assert srcs[2] < 0 and dsts[2] < 0
if len(edges_22) > 0:
if len(edges[("type2", "e4", "type2")]) > 0:
srcs, dsts = graph.find_edges(
torch.as_tensor([0, len(edges_22) - 1, 999], dtype=torch.int64),
torch.as_tensor(
[0, len(edges[("type2", "e4", "type2")]) - 1, 999], dtype=torch.int64
),
("type2", "e4", "type2"),
)
assert (
srcs[[0, 1]]
== torch.tensor(
[edges_22.src.iloc[0], edges_22.src.iloc[-1]],
[
edges[("type2", "e4", "type2")].src.iloc[0],
edges[("type2", "e4", "type2")].src.iloc[-1],
],
device="cuda",
dtype=torch.int64,
)
).all()
assert (
dsts[[0, 1]]
== torch.tensor(
[edges_22.dst.iloc[0], edges_22.dst.iloc[-1]],
[
edges[("type2", "e4", "type2")].dst.iloc[0],
edges[("type2", "e4", "type2")].dst.iloc[-1],
],
device="cuda",
dtype=torch.int64,
)
).all()
assert srcs[2] < 0 and dsts[2] < 0

cugraph_comms_shutdown()
torch.distributed.destroy_process_group()


@pytest.mark.skip(reason="bleh")
@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available")
@pytest.mark.parametrize("direction", ["out", "in"])
def test_graph_find_mg(direction):
df = karate.get_edgelist()
df.src = df.src.astype("int64")
df.dst = df.dst.astype("int64")

uid = cugraph_comms_create_unique_id()
world_size = torch.cuda.device_count()

Expand All @@ -478,3 +468,61 @@ def test_graph_find_mg(direction):
),
nprocs=world_size,
)


def run_test_uniform_negative_sample_mg(
rank, world_size, uid, exclude_self_loops, num_samples_per_worker
):
init_pytorch_worker(rank, world_size, uid)

graph, edges, _ = create_karate_bipartite(multi_gpu=True)

assert not graph.is_homogeneous
assert graph.is_multi_gpu

for etype in graph.canonical_etypes:
src_neg, dst_neg = graph.global_uniform_negative_sampling(
num_samples_per_worker,
exclude_self_loops=exclude_self_loops,
etype=etype,
)

assert len(src_neg) == len(dst_neg)
assert len(src_neg) <= num_samples_per_worker

assert (src_neg >= 0).all()
assert (dst_neg >= 0).all()

assert (src_neg < graph.num_nodes(etype[0])).all()
assert (dst_neg < graph.num_nodes(etype[2])).all()

if exclude_self_loops:
assert (src_neg == dst_neg).sum() == 0

for i in range(len(src_neg)):
s = int(src_neg[i])
d = int(dst_neg[i])
assert ((edges[etype].src == s) & (edges[etype].dst == d)).sum() == 0

cugraph_comms_shutdown()
torch.distributed.destroy_process_group()


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available")
@pytest.mark.parametrize("exclude_self_loops", [True, False])
@pytest.mark.parametrize("num_samples_per_worker", [1, 2, 5])
def test_graph_uniform_negative_sample_mg(exclude_self_loops, num_samples_per_worker):
uid = cugraph_comms_create_unique_id()
world_size = torch.cuda.device_count()

torch.multiprocessing.spawn(
run_test_uniform_negative_sample_mg,
args=(
world_size,
uid,
exclude_self_loops,
num_samples_per_worker,
),
nprocs=world_size,
)

0 comments on commit 90e0956

Please sign in to comment.