Skip to content

Commit

Permalink
Revert edge_betweenness_centrality
Browse files Browse the repository at this point in the history
  • Loading branch information
nv-rliu committed Mar 11, 2024
1 parent b0e9141 commit 36182a8
Showing 1 changed file with 139 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@
import gc
import pytest

import dask_cudf
from pylibcugraph.testing.utils import gen_fixture_params_product
from cugraph.datasets import karate, dolphins

import cugraph
import cugraph.dask as dcg
from cugraph.datasets import karate, dolphins

# from cugraph.dask.common.mg_utils import is_single_gpu


# =============================================================================
Expand All @@ -28,34 +33,79 @@ def setup_function():
gc.collect()


# =============================================================================
# Parameters
# =============================================================================
IS_DIRECTED = [True, False]
INCLUDE_WEIGHTS = [False, True]
INCLUDE_EDGE_IDS = [False, True]
NORMALIZED_OPTIONS = [False, True]
SUBSET_SIZE_OPTIONS = [4, None]


DATASETS = [karate, dolphins]
IS_DIRECTED = [True, False]
IS_WEIGHTED = [True, False]
INCLUDE_EDGE_IDS = [True, False]
IS_NORMALIZED = [True, False]
SUBSET_SIZES = [4, None]
# email_Eu_core is too expensive to test
datasets = [karate, dolphins]


# =============================================================================
# Helper functions
# Pytest fixtures
# =============================================================================


def get_sg_graph(dataset, directed, edge_ids):
dataset.unload()
df = dataset.get_edgelist()
fixture_params = gen_fixture_params_product(
(datasets, "graph_file"),
(IS_DIRECTED, "directed"),
(INCLUDE_WEIGHTS, "include_weights"),
(INCLUDE_EDGE_IDS, "include_edgeids"),
(NORMALIZED_OPTIONS, "normalized"),
(SUBSET_SIZE_OPTIONS, "subset_size"),
)


@pytest.fixture(scope="module", params=fixture_params)
def input_combo(request):
"""
Simply return the current combination of params as a dictionary for use in
tests or other parameterized fixtures.
"""
parameters = dict(
zip(
(
"graph_file",
"directed",
"include_weights",
"include_edge_ids",
"normalized",
"subset_size",
"subset_seed",
),
request.param,
)
)

return parameters


@pytest.fixture(scope="module")
def input_expected_output(input_combo):
"""
This fixture returns the inputs and expected results from the edge
betweenness centrality algo.
(based on cuGraph edge betweenness centrality) which can be used
for validation.
"""
directed = input_combo["directed"]
normalized = input_combo["normalized"]
k = input_combo["subset_size"]
subset_seed = 42
edge_ids = input_combo["include_edge_ids"]
weight = input_combo["include_weights"]

df = input_combo["graph_file"].get_edgelist()
if edge_ids:
if not directed:
# Edge ids not supported for undirected graph
return None
dtype = df.dtypes.iloc[0]
return
dtype = df.dtypes[0]
edge_id = "edge_id"
df[edge_id] = df.index
df["edge_id"] = df.index
df = df.astype(dtype)

else:
Expand All @@ -65,13 +115,30 @@ def get_sg_graph(dataset, directed, edge_ids):
G.from_cudf_edgelist(
df, source="src", destination="dst", weight="wgt", edge_id=edge_id
)
if isinstance(k, int):
k = G.select_random_vertices(subset_seed, k)

return G
input_combo["k"] = k
# Save the results back to the input_combo dictionary to prevent redundant
# cuGraph runs. Other tests using the input_combo fixture will look for
# them, and if not present they will have to re-run the same cuGraph call.
sg_cugraph_edge_bc = (
cugraph.edge_betweenness_centrality(G, k, normalized)
.sort_values(["src", "dst"])
.reset_index(drop=True)
)

input_data_path = input_combo["graph_file"].get_path()

def get_mg_graph(dataset, directed, edge_ids, weight):
dataset.unload()
ddf = dataset.get_dask_edgelist()
input_combo["sg_cugraph_results"] = sg_cugraph_edge_bc
chunksize = dcg.get_chunksize(input_data_path)
ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

if weight:
weight = ddf
Expand All @@ -87,93 +154,78 @@ def get_mg_graph(dataset, directed, edge_ids, weight):
edge_id = None

dg = cugraph.Graph(directed=directed)

dg.from_dask_cudf_edgelist(
ddf,
source="src",
destination="dst",
weight="wgt",
weight="value",
edge_id=edge_id,
renumber=True,
)

return dg, weight
input_combo["MGGraph"] = dg
input_combo["include_weights"] = weight

return input_combo


# =============================================================================
# Tests
# =============================================================================


# @pytest.mark.skipif(
# is_single_gpu(), reason="skipping MG testing on Single GPU system"
# )
@pytest.mark.mg
@pytest.mark.parametrize("dataset", DATASETS)
@pytest.mark.parametrize("directed", IS_DIRECTED)
@pytest.mark.parametrize("weighted", IS_WEIGHTED)
@pytest.mark.parametrize("edge_ids", INCLUDE_EDGE_IDS)
@pytest.mark.parametrize("normalized", IS_NORMALIZED)
@pytest.mark.parametrize("subset_size", SUBSET_SIZES)
def test_dask_mg_edge_betweenness_centrality(
dask_client,
dataset,
directed,
weighted,
edge_ids,
normalized,
subset_size,
benchmark,
dask_client, benchmark, input_expected_output
):
g = get_sg_graph(dataset, directed, edge_ids)

if g is None:
pytest.skip("Edge_ids not supported for undirected graph")

dg, weight = get_mg_graph(dataset, directed, edge_ids, weighted)
subset_seed = 42

k = subset_size
if isinstance(k, int):
k = g.select_random_vertices(subset_seed, k)

sg_cugraph_edge_bc = (
cugraph.edge_betweenness_centrality(g, k, normalized)
.sort_values(["src", "dst"])
.reset_index(drop=True)
)

if weight is not None:
with pytest.raises(NotImplementedError):
if input_expected_output is not None:
dg = input_expected_output["MGGraph"]
k = input_expected_output["k"]
normalized = input_expected_output["normalized"]
weight = input_expected_output["include_weights"]
if weight is not None:
with pytest.raises(NotImplementedError):
result_edge_bc = benchmark(
dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight
)

else:
result_edge_bc = benchmark(
dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight
)
result_edge_bc = (
result_edge_bc.compute()
.sort_values(["src", "dst"])
.reset_index(drop=True)
.rename(columns={"betweenness_centrality": "mg_betweenness_centrality"})
)

else:
result_edge_bc = benchmark(
dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight
)
result_edge_bc = (
result_edge_bc.compute()
.sort_values(["src", "dst"])
.reset_index(drop=True)
.rename(columns={"betweenness_centrality": "mg_betweenness_centrality"})
)

if len(result_edge_bc.columns) > 3:
result_edge_bc = result_edge_bc.rename(columns={"edge_id": "mg_edge_id"})

expected_output = sg_cugraph_edge_bc.reset_index(drop=True)
result_edge_bc["betweenness_centrality"] = expected_output[
"betweenness_centrality"
]
if len(expected_output.columns) > 3:
result_edge_bc["edge_id"] = expected_output["edge_id"]
edge_id_diff = result_edge_bc.query("mg_edge_id != edge_id")
assert len(edge_id_diff) == 0
if len(result_edge_bc.columns) > 3:
result_edge_bc = result_edge_bc.rename(
columns={"edge_id": "mg_edge_id"}
)

edge_bc_diffs1 = result_edge_bc.query(
"mg_betweenness_centrality - betweenness_centrality > 0.01"
)
edge_bc_diffs2 = result_edge_bc.query(
"betweenness_centrality - mg_betweenness_centrality < -0.01"
)
expected_output = input_expected_output["sg_cugraph_results"].reset_index(
drop=True
)
result_edge_bc["betweenness_centrality"] = expected_output[
"betweenness_centrality"
]
if len(expected_output.columns) > 3:
result_edge_bc["edge_id"] = expected_output["edge_id"]
edge_id_diff = result_edge_bc.query("mg_edge_id != edge_id")
assert len(edge_id_diff) == 0

edge_bc_diffs1 = result_edge_bc.query(
"mg_betweenness_centrality - betweenness_centrality > 0.01"
)
edge_bc_diffs2 = result_edge_bc.query(
"betweenness_centrality - mg_betweenness_centrality < -0.01"
)

assert len(edge_bc_diffs1) == 0
assert len(edge_bc_diffs2) == 0
assert len(edge_bc_diffs1) == 0
assert len(edge_bc_diffs2) == 0

0 comments on commit 36182a8

Please sign in to comment.