Skip to content

Commit

Permalink
Update MG centrality tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nv-rliu committed Feb 26, 2024
1 parent 61a18cb commit a8eac3f
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
# Parameters
# =============================================================================


DATASETS = [karate]
DEFAULT_EPSILON = 0.0001
DIRECTED_GRAPH_OPTIONS = [False, True]
ENDPOINTS_OPTIONS = [False, True]
NORMALIZED_OPTIONS = [False, True]
RESULT_DTYPE_OPTIONS = [np.float64]
SUBSET_SIZE_OPTIONS = [4, None]
SUBSET_SEED_OPTIONS = [42]
# FIXME: The "preset_gpu_count" from 21.08 and below are currently not
# supported and have been removed
WEIGHTED_GRAPH_OPTIONS = [False, True]
IS_DIRECTED = [False, True]
ENDPOINTS = [False, True]
IS_NORMALIZED = [False, True]
RESULT_DTYPES = [np.float64]
SUBSET_SIZES = [4, None]
SUBSET_SEEDS = [42]
IS_WEIGHTED = [False, True]


# =============================================================================
# Pytest Setup / Teardown - called for each test function
Expand All @@ -60,13 +60,13 @@ def setup_function():
@pytest.mark.parametrize(
"graph_file", DATASETS, ids=[f"dataset={d.get_path().stem}" for d in DATASETS]
)
@pytest.mark.parametrize("directed", DIRECTED_GRAPH_OPTIONS)
@pytest.mark.parametrize("subset_size", SUBSET_SIZE_OPTIONS)
@pytest.mark.parametrize("normalized", NORMALIZED_OPTIONS)
@pytest.mark.parametrize("directed", IS_DIRECTED)
@pytest.mark.parametrize("subset_size", SUBSET_SIZES)
@pytest.mark.parametrize("normalized", IS_NORMALIZED)
@pytest.mark.parametrize("weight", [None])
@pytest.mark.parametrize("endpoints", ENDPOINTS_OPTIONS)
@pytest.mark.parametrize("subset_seed", SUBSET_SEED_OPTIONS)
@pytest.mark.parametrize("result_dtype", RESULT_DTYPE_OPTIONS)
@pytest.mark.parametrize("endpoints", ENDPOINTS)
@pytest.mark.parametrize("subset_seed", SUBSET_SEEDS)
@pytest.mark.parametrize("result_dtype", RESULT_DTYPES)
def test_mg_betweenness_centrality(
graph_file,
directed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
# Parameters
# =============================================================================


DATASETS = [karate, netscience]
DIRECTED_GRAPH_OPTIONS = [False, True]
NORMALIZED_OPTIONS = [False, True]
IS_DIRECTED = [True, False]
IS_NORMALIZED = [True, False]
DEFAULT_EPSILON = 0.0001
SUBSET_SIZE_OPTIONS = [4, None]
# FIXME: The "preset_gpu_count" from 21.08 and below are not supported and have
# been removed
RESULT_DTYPE_OPTIONS = [np.float32, np.float64]
SUBSET_SIZES = [4, None]
RESULT_DTYPES = [np.float32, np.float64]


# =============================================================================
# Pytest Setup / Teardown - called for each test function
Expand All @@ -51,16 +51,17 @@ def setup_function():
# Tests
# =============================================================================


# FIXME: Fails for directed = False(bc score twice as much) and normalized = True.
@pytest.mark.mg
@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.parametrize(
"graph_file", DATASETS, ids=[f"dataset={d.get_path().stem}" for d in DATASETS]
)
@pytest.mark.parametrize("directed", DIRECTED_GRAPH_OPTIONS)
@pytest.mark.parametrize("subset_size", SUBSET_SIZE_OPTIONS)
@pytest.mark.parametrize("normalized", NORMALIZED_OPTIONS)
@pytest.mark.parametrize("result_dtype", RESULT_DTYPE_OPTIONS)
@pytest.mark.parametrize("directed", IS_DIRECTED)
@pytest.mark.parametrize("subset_size", SUBSET_SIZES)
@pytest.mark.parametrize("normalized", IS_NORMALIZED)
@pytest.mark.parametrize("result_dtype", RESULT_DTYPES)
def test_mg_edge_betweenness_centrality(
graph_file,
directed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import cudf
import cugraph
import cugraph.dask as dcg
import dask_cudf
from cugraph.datasets import karate, dolphins
from cugraph.testing import utils


# =============================================================================
Expand All @@ -39,11 +37,11 @@ def setup_function():

DATASETS = [karate, dolphins]
IS_DIRECTED = [True, False]
NORMALIZED = [False, True]
ENDPOINTS = [False, True]
SUBSET_SEED = [42, None]
SUBSET_SIZE = [None, 15]
VERTEX_LIST_TYPE = [list, cudf]
IS_NORMALIZED = [True, False]
ENDPOINTS = [True, False]
SUBSET_SEEDS = [42, None]
SUBSET_SIZES = [None, 15]
VERTEX_LIST_TYPES = [list, cudf]

# =============================================================================
# Helper functions
Expand All @@ -57,15 +55,7 @@ def get_sg_graph(dataset, directed):


def get_mg_graph(dataset, directed):
input_data_path = dataset.get_path()
blocksize = dcg.get_chunksize(input_data_path)
ddf = dask_cudf.read_csv(
input_data_path,
blocksize=blocksize,
delimiter=dataset.metadata["delim"],
names=dataset.metadata["col_names"],
dtype=dataset.metadata["col_types"],
)
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
ddf,
Expand All @@ -79,113 +69,74 @@ def get_mg_graph(dataset, directed):
return dg


@pytest.fixture(scope="module")
def input_expected_output(input_combo):
"""
This fixture returns the inputs and expected results from the
betweenness_centrality algo based on cuGraph betweenness_centrality) which can
be used for validation.
"""
# =============================================================================
# Tests
# =============================================================================

input_data_path = input_combo["graph_file"]
normalized = input_combo["normalized"]
endpoints = input_combo["endpoints"]
random_state = input_combo["subset_seed"]
subset_size = input_combo["subset_size"]
directed = input_combo["directed"]
vertex_list_type = input_combo["vertex_list_type"]

G = utils.generate_cugraph_graph_from_file(input_data_path, directed=directed)
@pytest.mark.mg
@pytest.mark.parametrize("dataset", DATASETS)
@pytest.mark.parametrize("directed", IS_DIRECTED)
@pytest.mark.parametrize("normalized", IS_NORMALIZED)
@pytest.mark.parametrize("endpoint", ENDPOINTS)
@pytest.mark.parametrize("subset_seed", SUBSET_SEEDS)
@pytest.mark.parametrize("subset_size", SUBSET_SIZES)
@pytest.mark.parametrize("v_list_type", VERTEX_LIST_TYPES)
def test_dask_mg_betweenness_centrality(
dataset,
directed,
normalized,
endpoint,
subset_seed,
subset_size,
v_list_type,
dask_client,
benchmark,
):
g = get_sg_graph(dataset, directed)
dataset.unload()
dg = get_mg_graph(dataset, directed)
random_state = subset_seed

if subset_size is None:
k = subset_size
elif isinstance(subset_size, int):
# Select random vertices
k = G.select_random_vertices(
k = g.select_random_vertices(
random_state=random_state, num_vertices=subset_size
)
if vertex_list_type is list:
if v_list_type is list:
k = k.to_arrow().to_pylist()

print("the seeds are \n", k)
if vertex_list_type is int:
if v_list_type is int:
# This internally sample k vertices in betweenness centrality.
# Since the nodes that will be sampled by each implementation will
# be random, therefore sample all vertices which will make the test
# consistent.
k = len(G.nodes())

input_combo["k"] = k
k = len(g.nodes())

sg_cugraph_bc = cugraph.betweenness_centrality(
G, k=k, normalized=normalized, endpoints=endpoints, random_state=random_state
g, k=k, normalized=normalized, endpoints=endpoint, random_state=random_state
)
# Save the results back to the input_combo dictionary to prevent redundant
# cuGraph runs. Other tests using the input_combo fixture will look for
# them, and if not present they will have to re-run the same cuGraph call.
sg_cugraph_bc = sg_cugraph_bc.sort_values("vertex").reset_index(drop=True)

input_combo["sg_cugraph_results"] = sg_cugraph_bc
chunksize = dcg.get_chunksize(input_data_path)
ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
ddf,
source="src",
destination="dst",
edge_attr="value",
renumber=True,
store_transposed=True,
)

input_combo["MGGraph"] = dg

return input_combo


# =============================================================================
# Tests
# =============================================================================


# @pytest.mark.skipif(
# is_single_gpu(), reason="skipping MG testing on Single GPU system"
# )


@pytest.mark.mg
def test_dask_mg_betweenness_centrality(dask_client, benchmark, input_expected_output):

dg = input_expected_output["MGGraph"]
k = input_expected_output["k"]
endpoints = input_expected_output["endpoints"]
normalized = input_expected_output["normalized"]
random_state = input_expected_output["subset_seed"]
mg_bc_results = benchmark(
dcg.betweenness_centrality,
dg,
k=k,
normalized=normalized,
endpoints=endpoints,
endpoints=endpoint,
random_state=random_state,
)

mg_bc_results = (
mg_bc_results.compute().sort_values("vertex").reset_index(drop=True)
)["betweenness_centrality"].to_cupy()

sg_bc_results = (
input_expected_output["sg_cugraph_results"]
.sort_values("vertex")
.reset_index(drop=True)
)["betweenness_centrality"].to_cupy()
sg_bc_results = (sg_cugraph_bc.sort_values("vertex").reset_index(drop=True))[
"betweenness_centrality"
].to_cupy()

diff = cupy.isclose(mg_bc_results, sg_bc_results)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Copyright (c) 2018-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -15,13 +15,12 @@

import pytest

import cudf
import dask_cudf
import cugraph
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH
from cugraph.datasets import karate_asymmetric, polbooks, email_Eu_core
from cudf.testing import assert_series_equal


# =============================================================================
# Pytest Setup / Teardown - called for each test function
# =============================================================================
Expand All @@ -31,45 +30,55 @@ def setup_function():
gc.collect()


# =============================================================================
# Parameters
# =============================================================================


DATASETS = [karate_asymmetric, polbooks, email_Eu_core]
IS_DIRECTED = [True, False]

DATA_PATH = [
(RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv").as_posix(),
(RAPIDS_DATASET_ROOT_DIR_PATH / "polbooks.csv").as_posix(),
(RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv").as_posix(),
]

# =============================================================================
# Helper functions
# =============================================================================

@pytest.mark.mg
@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.parametrize("directed", IS_DIRECTED)
@pytest.mark.parametrize("data_file", DATA_PATH)
def test_dask_mg_degree(dask_client, directed, data_file):

input_data_path = data_file
chunksize = cugraph.dask.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

df = cudf.read_csv(
input_data_path,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)
def get_sg_graph(dataset, directed):
G = dataset.get_graph(create_using=cugraph.Graph(directed=directed))

return G


def get_mg_graph(dataset, directed):
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(ddf, "src", "dst")
dg.from_dask_cudf_edgelist(
ddf,
source="src",
destination="dst",
edge_attr="wgt",
renumber=True,
store_transposed=True,
)

return dg


# =============================================================================
# Tests
# =============================================================================


@pytest.mark.mg
@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.parametrize("dataset", DATASETS)
@pytest.mark.parametrize("directed", IS_DIRECTED)
def test_dask_mg_degree(dask_client, dataset, directed):
dg = get_mg_graph(dataset, directed)
dg.compute_renumber_edge_list()

g = cugraph.Graph(directed=directed)
g.from_cudf_edgelist(df, "src", "dst")
g = get_sg_graph(dataset, directed)

merge_df_in_degree = (
dg.in_degree()
Expand Down

0 comments on commit a8eac3f

Please sign in to comment.