Skip to content

Commit

Permalink
Refactor Several MG Tests (#4244)
Browse files Browse the repository at this point in the history
Addresses #4187 

This PR makes improvements to old MG testing conventions used in these testing directories:
 - centrality
 - comms
 - community
 - components
 - core
 - internals

The goals of this PR is to improve readability and use of the MG tests. Changes are similar to #4197 
> Instead of using nested fixtures. eg. `input_expected_output -> input_combo -> fixture_params` which can be confusing, the `@pytest.mark.parametrize` marker is used to iterate through combinations of parameters used for testing. The fixtures are also replaced by functions used to create SG and MG graphs.

By using the Datasets API (which now supports MG Graphs thanks to @huiyuxie), the number of imports and `testing.utils` functions can be significantly reduced.

Authors:
  - Ralph Liu (https://github.com/nv-rliu)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)

URL: #4244
  • Loading branch information
nv-rliu authored May 14, 2024
1 parent 45371cb commit b9c0fc6
Show file tree
Hide file tree
Showing 19 changed files with 322 additions and 704 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,3 @@ def test_mg_betweenness_centrality(
second_key="ref_bc",
epsilon=DEFAULT_EPSILON,
)

# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,3 @@ def test_mg_edge_betweenness_centrality(
second_key="ref_bc",
epsilon=DEFAULT_EPSILON,
)
# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.:
# Copyright (c) 2020-2024, NVIDIA CORPORATION.:
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -111,11 +111,18 @@ def calc_betweenness_centrality(
else:
edge_attr = None

G = graph_file.get_graph(
download=True,
create_using=cugraph.Graph(directed=directed),
ignore_weights=not edgevals,
)
G = None
if multi_gpu_batch:
G = graph_file.get_dask_graph(
create_using=cugraph.Graph(directed=directed), ignore_weights=not edgevals
)
G.enable_batch()
else:
G = graph_file.get_graph(
download=True,
create_using=cugraph.Graph(directed=directed),
ignore_weights=not edgevals,
)

M = G.to_pandas_edgelist().rename(
columns={"src": "0", "dst": "1", "wgt": edge_attr}
Expand All @@ -130,8 +137,6 @@ def calc_betweenness_centrality(
)

assert G is not None and Gnx is not None
if multi_gpu_batch:
G.enable_batch()

calc_func = None
if k is not None and seed is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ def setup_function():


def get_sg_graph(dataset, directed):
dataset.unload()
G = dataset.get_graph(create_using=cugraph.Graph(directed=directed))

return G


def get_mg_graph(dataset, directed):
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
Expand Down Expand Up @@ -96,7 +94,6 @@ def test_dask_mg_betweenness_centrality(
benchmark,
):
g = get_sg_graph(dataset, directed)
dataset.unload()
dg = get_mg_graph(dataset, directed)
random_state = subset_seed

Expand Down Expand Up @@ -143,6 +140,3 @@ def test_dask_mg_betweenness_centrality(
diff = cupy.isclose(mg_bc_results, sg_bc_results)

assert diff.all()

# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ def setup_function():


def get_sg_graph(dataset, directed):
dataset.unload()
G = dataset.get_graph(create_using=cugraph.Graph(directed=directed))

return G


def get_mg_graph(dataset, directed):
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
Expand Down Expand Up @@ -118,6 +116,3 @@ def test_dask_mg_degree(dask_client, dataset, directed):
check_names=False,
check_dtype=False,
)

# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def setup_function():


def get_sg_graph(dataset, directed, edge_ids):
dataset.unload()
df = dataset.get_edgelist()
if edge_ids:
if not directed:
Expand All @@ -71,7 +70,6 @@ def get_sg_graph(dataset, directed, edge_ids):


def get_mg_graph(dataset, directed, edge_ids, weight):
dataset.unload()
ddf = dataset.get_dask_edgelist()

if weight:
Expand Down Expand Up @@ -178,6 +176,3 @@ def test_dask_mg_edge_betweenness_centrality(

assert len(edge_bc_diffs1) == 0
assert len(edge_bc_diffs2) == 0

# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def setup_function():
def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed):
input_data_path = dataset.get_path()
print(f"dataset={input_data_path}")
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True)
Expand Down Expand Up @@ -89,15 +88,11 @@ def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed):
err = err + 1
assert err == 0

# Clean-up stored dataset edge-lists
dataset.unload()


@pytest.mark.mg
def test_dask_mg_eigenvector_centrality_transposed_false(dask_client):
dataset = DATASETS[0]

dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False)
Expand All @@ -110,6 +105,3 @@ def test_dask_mg_eigenvector_centrality_transposed_false(dask_client):

with pytest.warns(UserWarning, match=warning_msg):
dcg.eigenvector_centrality(dg)

# Clean-up stored dataset edge-lists
dataset.unload()
12 changes: 0 additions & 12 deletions python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed):
input_data_path = dataset.get_path()
print(f"dataset={input_data_path}")

dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True)
Expand Down Expand Up @@ -95,16 +94,12 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed):
err = err + 1
assert err == 0

# Clean-up stored dataset edge-lists
dataset.unload()


@pytest.mark.mg
@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.parametrize("dataset", DATASETS)
@pytest.mark.parametrize("directed", IS_DIRECTED)
def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed):
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True)
Expand Down Expand Up @@ -136,14 +131,10 @@ def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed):
err = err + 1
assert err == 0

# Clean-up stored dataset edge-lists
dataset.unload()


@pytest.mark.mg
@pytest.mark.parametrize("dataset", DATASETS)
def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset):
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False)
Expand All @@ -156,6 +147,3 @@ def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset):

with pytest.warns(UserWarning, match=warning_msg):
dcg.katz_centrality(dg)

# Clean-up stored dataset edge-lists
dataset.unload()
92 changes: 35 additions & 57 deletions python/cugraph/cugraph/tests/comms/test_comms_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
import pytest
import cugraph.dask as dcg

import cudf
import dask_cudf
import cugraph
from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH
from cugraph.datasets import karate, dolphins


# =============================================================================
# Pytest Setup / Teardown - called for each test function
Expand All @@ -30,75 +29,54 @@ def setup_function():
gc.collect()


# =============================================================================
# Parameters
# =============================================================================


DATASETS = [karate, dolphins]
IS_DIRECTED = [True, False]


# @pytest.mark.skipif(
# is_single_gpu(), reason="skipping MG testing on Single GPU system"
# )
# =============================================================================
# Helper Functions
# =============================================================================


def get_pagerank_result(dataset, is_mg):
"""Return the cugraph.pagerank result for an MG or SG graph"""

if is_mg:
dg = dataset.get_dask_graph(store_transposed=True)
return dcg.pagerank(dg).compute()
else:
g = dataset.get_graph(store_transposed=True)
return cugraph.pagerank(g)


# =============================================================================
# Tests
# =============================================================================


@pytest.mark.mg
@pytest.mark.parametrize("directed", IS_DIRECTED)
def test_dask_mg_pagerank(dask_client, directed):

# Initialize and run pagerank on two distributed graphs
# with same communicator

input_data_path1 = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix()
input_data_path1 = karate.get_path()
print(f"dataset1={input_data_path1}")
chunksize1 = dcg.get_chunksize(input_data_path1)
result_pr1 = get_pagerank_result(karate, is_mg=True)

input_data_path2 = (RAPIDS_DATASET_ROOT_DIR_PATH / "dolphins.csv").as_posix()
input_data_path2 = dolphins.get_path()
print(f"dataset2={input_data_path2}")
chunksize2 = dcg.get_chunksize(input_data_path2)

ddf1 = dask_cudf.read_csv(
input_data_path1,
blocksize=chunksize1,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

dg1 = cugraph.Graph(directed=directed)
dg1.from_dask_cudf_edgelist(ddf1, "src", "dst")

result_pr1 = dcg.pagerank(dg1).compute()

ddf2 = dask_cudf.read_csv(
input_data_path2,
blocksize=chunksize2,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

dg2 = cugraph.Graph(directed=directed)
dg2.from_dask_cudf_edgelist(ddf2, "src", "dst")

result_pr2 = dcg.pagerank(dg2).compute()
result_pr2 = get_pagerank_result(dolphins, is_mg=True)

# Calculate single GPU pagerank for verification of results
df1 = cudf.read_csv(
input_data_path1,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

g1 = cugraph.Graph(directed=directed)
g1.from_cudf_edgelist(df1, "src", "dst")
expected_pr1 = cugraph.pagerank(g1)

df2 = cudf.read_csv(
input_data_path2,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

g2 = cugraph.Graph(directed=directed)
g2.from_cudf_edgelist(df2, "src", "dst")
expected_pr2 = cugraph.pagerank(g2)
expected_pr1 = get_pagerank_result(karate, is_mg=False)
expected_pr2 = get_pagerank_result(dolphins, is_mg=False)

# Compare and verify pagerank results

Expand Down
15 changes: 4 additions & 11 deletions python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import cugraph
import cugraph.dask as dcg
import dask_cudf
from cudf.testing.testing import assert_frame_equal
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.datasets import karate, dolphins, email_Eu_core
Expand All @@ -36,11 +35,13 @@ def setup_function():
# Parameters
# =============================================================================


DATASETS = [karate, dolphins, email_Eu_core]
IS_DIRECTED = [True, False]
NUM_VERTICES = [2, 5, 10, 20]
OFFSETS = [None]


# =============================================================================
# Helper functions
# =============================================================================
Expand All @@ -53,15 +54,7 @@ def get_sg_graph(dataset, directed):


def get_mg_graph(dataset, directed):
input_data_path = dataset.get_path()
blocksize = dcg.get_chunksize(input_data_path)
ddf = dask_cudf.read_csv(
input_data_path,
blocksize=blocksize,
delimiter=dataset.metadata["delim"],
names=dataset.metadata["col_names"],
dtype=dataset.metadata["col_types"],
)
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
ddf,
Expand Down Expand Up @@ -108,7 +101,7 @@ def test_mg_induced_subgraph(

# FIXME: This parameter is not yet tested
# mg_offsets = mg_offsets.compute().reset_index(drop=True)
mg_df, mg_offsets = result_induced_subgraph
mg_df, _ = result_induced_subgraph

if mg_df is not None and sg_induced_subgraph is not None:
# FIXME: 'edges()' or 'view_edgelist()' takes half the edges out if
Expand Down
Loading

0 comments on commit b9c0fc6

Please sign in to comment.