diff --git a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py index 478b7e655d5..0dde5ef476d 100644 --- a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -14,14 +14,9 @@ import gc import pytest -import dask_cudf -from pylibcugraph.testing.utils import gen_fixture_params_product -from cugraph.datasets import karate, dolphins - import cugraph import cugraph.dask as dcg - -# from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.datasets import karate, dolphins # ============================================================================= @@ -33,79 +28,35 @@ def setup_function(): gc.collect() -IS_DIRECTED = [True, False] -INCLUDE_WEIGHTS = [False, True] -INCLUDE_EDGE_IDS = [False, True] -NORMALIZED_OPTIONS = [False, True] -SUBSET_SIZE_OPTIONS = [4, None] +# ============================================================================= +# Parameters +# ============================================================================= -# email_Eu_core is too expensive to test -datasets = [karate, dolphins] +DATASETS = [karate, dolphins] +IS_DIRECTED = [True, False] +IS_WEIGHTED = [True, False] +INCLUDE_EDGE_IDS = [True, False] +IS_NORMALIZED = [True, False] +SUBSET_SIZES = [4, None] # ============================================================================= -# Pytest fixtures +# Helper functions # ============================================================================= -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - (IS_DIRECTED, "directed"), - (INCLUDE_WEIGHTS, "include_weights"), - (INCLUDE_EDGE_IDS, "include_edgeids"), - (NORMALIZED_OPTIONS, "normalized"), - (SUBSET_SIZE_OPTIONS, "subset_size"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict( - zip( - ( - "graph_file", - "directed", - "include_weights", - "include_edge_ids", - "normalized", - "subset_size", - "subset_seed", - ), - request.param, - ) - ) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(input_combo): - """ - This fixture returns the inputs and expected results from the edge - betweenness centrality algo. - (based on cuGraph edge betweenness centrality) which can be used - for validation. - """ - directed = input_combo["directed"] - normalized = input_combo["normalized"] - k = input_combo["subset_size"] - subset_seed = 42 - edge_ids = input_combo["include_edge_ids"] - weight = input_combo["include_weights"] +def get_sg_graph(dataset, directed, edge_ids): + dataset.unload() + df = dataset.get_edgelist() - df = input_combo["graph_file"].get_edgelist() if edge_ids: if not directed: # Edge ids not supported for undirected graph - return - dtype = df.dtypes[0] + return None + dtype = df.dtypes.iloc[0] edge_id = "edge_id" - df["edge_id"] = df.index + df[edge_id] = df.index df = df.astype(dtype) else: @@ -115,30 +66,13 @@ def input_expected_output(input_combo): G.from_cudf_edgelist( df, source="src", destination="dst", weight="wgt", edge_id=edge_id ) - if isinstance(k, int): - k = G.select_random_vertices(subset_seed, k) - input_combo["k"] = k - # Save the results back to the input_combo dictionary to prevent redundant - # cuGraph runs. Other tests using the input_combo fixture will look for - # them, and if not present they will have to re-run the same cuGraph call. - sg_cugraph_edge_bc = ( - cugraph.edge_betweenness_centrality(G, k, normalized) - .sort_values(["src", "dst"]) - .reset_index(drop=True) - ) + return G - input_data_path = input_combo["graph_file"].get_path() - input_combo["sg_cugraph_results"] = sg_cugraph_edge_bc - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) +def get_mg_graph(dataset, directed, edge_ids, weight): + dataset.unload() + ddf = dataset.get_dask_edgelist() if weight: weight = ddf @@ -154,20 +88,16 @@ def input_expected_output(input_combo): edge_id = None dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist( ddf, source="src", destination="dst", - weight="value", + weight="wgt", edge_id=edge_id, renumber=True, ) - input_combo["MGGraph"] = dg - input_combo["include_weights"] = weight - - return input_combo + return dg, weight # ============================================================================= @@ -175,57 +105,76 @@ def input_expected_output(input_combo): # ============================================================================= -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) @pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("directed", IS_DIRECTED) +@pytest.mark.parametrize("weighted", IS_WEIGHTED) +@pytest.mark.parametrize("edge_ids", INCLUDE_EDGE_IDS) +@pytest.mark.parametrize("normalized", IS_NORMALIZED) +@pytest.mark.parametrize("subset_size", SUBSET_SIZES) def test_dask_mg_edge_betweenness_centrality( - dask_client, benchmark, input_expected_output + dask_client, + dataset, + directed, + weighted, + edge_ids, + normalized, + subset_size, + benchmark, ): - if input_expected_output is not None: - dg = input_expected_output["MGGraph"] - k = input_expected_output["k"] - normalized = input_expected_output["normalized"] - weight = input_expected_output["include_weights"] - if weight is not None: - with pytest.raises(NotImplementedError): - result_edge_bc = benchmark( - dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight - ) - - else: + g = get_sg_graph(dataset, directed, edge_ids) + + if g is None: + pytest.skip("Edge_ids not supported for undirected graph") + + dg, weight = get_mg_graph(dataset, directed, edge_ids, weighted) + subset_seed = 42 + + k = subset_size + if isinstance(k, int): + k = g.select_random_vertices(subset_seed, k) + + sg_cugraph_edge_bc = ( + cugraph.edge_betweenness_centrality(g, k, normalized) + .sort_values(["src", "dst"]) + .reset_index(drop=True) + ) + + if weight is not None: + with pytest.raises(NotImplementedError): result_edge_bc = benchmark( dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight ) - result_edge_bc = ( - result_edge_bc.compute() - .sort_values(["src", "dst"]) - .reset_index(drop=True) - .rename(columns={"betweenness_centrality": "mg_betweenness_centrality"}) - ) - if len(result_edge_bc.columns) > 3: - result_edge_bc = result_edge_bc.rename( - columns={"edge_id": "mg_edge_id"} - ) + else: + result_edge_bc = benchmark( + dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight + ) + result_edge_bc = ( + result_edge_bc.compute() + .sort_values(["src", "dst"]) + .reset_index(drop=True) + .rename(columns={"betweenness_centrality": "mg_betweenness_centrality"}) + ) - expected_output = input_expected_output["sg_cugraph_results"].reset_index( - drop=True - ) - result_edge_bc["betweenness_centrality"] = expected_output[ - "betweenness_centrality" - ] - if len(expected_output.columns) > 3: - result_edge_bc["edge_id"] = expected_output["edge_id"] - edge_id_diff = result_edge_bc.query("mg_edge_id != edge_id") - assert len(edge_id_diff) == 0 - - edge_bc_diffs1 = result_edge_bc.query( - "mg_betweenness_centrality - betweenness_centrality > 0.01" - ) - edge_bc_diffs2 = result_edge_bc.query( - "betweenness_centrality - mg_betweenness_centrality < -0.01" - ) + if len(result_edge_bc.columns) > 3: + result_edge_bc = result_edge_bc.rename(columns={"edge_id": "mg_edge_id"}) + + expected_output = sg_cugraph_edge_bc.reset_index(drop=True) + result_edge_bc["betweenness_centrality"] = expected_output[ + "betweenness_centrality" + ] + if len(expected_output.columns) > 3: + result_edge_bc["edge_id"] = expected_output["edge_id"] + edge_id_diff = result_edge_bc.query("mg_edge_id != edge_id") + assert len(edge_id_diff) == 0 + + edge_bc_diffs1 = result_edge_bc.query( + "mg_betweenness_centrality - betweenness_centrality > 0.01" + ) + edge_bc_diffs2 = result_edge_bc.query( + "betweenness_centrality - mg_betweenness_centrality < -0.01" + ) - assert len(edge_bc_diffs1) == 0 - assert len(edge_bc_diffs2) == 0 + assert len(edge_bc_diffs1) == 0 + assert len(edge_bc_diffs2) == 0