From 83902645ff9ab1b2b62a02efd7b5d610bae69c90 Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Wed, 13 Mar 2024 12:06:14 -0700 Subject: [PATCH] Add call to deload internal dataset edge list --- .../test_batch_betweenness_centrality_mg.py | 11 +- ...st_batch_edge_betweenness_centrality_mg.py | 10 +- .../test_betweenness_centrality_mg.py | 5 + .../centrality/test_degree_centrality_mg.py | 5 + .../test_edge_betweenness_centrality_mg.py | 229 +++++++----------- .../test_eigenvector_centrality_mg.py | 50 ++-- .../centrality/test_katz_centrality_mg.py | 71 +++--- 7 files changed, 170 insertions(+), 211 deletions(-) diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py index 7050d0c9e55..1c73ebb0216 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py @@ -57,9 +57,7 @@ def setup_function(): @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", DATASETS, ids=[f"dataset={d.get_path().stem}" for d in DATASETS] -) +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) @pytest.mark.parametrize("subset_size", SUBSET_SIZES) @pytest.mark.parametrize("normalized", IS_NORMALIZED) @@ -68,7 +66,7 @@ def setup_function(): @pytest.mark.parametrize("subset_seed", SUBSET_SEEDS) @pytest.mark.parametrize("result_dtype", RESULT_DTYPES) def test_mg_betweenness_centrality( - graph_file, + dataset, directed, subset_size, normalized, @@ -79,7 +77,7 @@ def test_mg_betweenness_centrality( dask_client, ): sorted_df = calc_betweenness_centrality( - graph_file, + dataset, directed=directed, normalized=normalized, k=subset_size, @@ -95,3 +93,6 @@ def test_mg_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py index 48364a4a79a..4530dd3da86 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py @@ -55,15 +55,13 @@ def setup_function(): # FIXME: Fails for directed = False(bc score twice as much) and normalized = True. @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", DATASETS, ids=[f"dataset={d.get_path().stem}" for d in DATASETS] -) +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) @pytest.mark.parametrize("subset_size", SUBSET_SIZES) @pytest.mark.parametrize("normalized", IS_NORMALIZED) @pytest.mark.parametrize("result_dtype", RESULT_DTYPES) def test_mg_edge_betweenness_centrality( - graph_file, + dataset, directed, subset_size, normalized, @@ -71,7 +69,7 @@ def test_mg_edge_betweenness_centrality( dask_client, ): sorted_df = calc_edge_betweenness_centrality( - graph_file, + dataset, directed=directed, normalized=normalized, k=subset_size, @@ -86,3 +84,5 @@ def test_mg_edge_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py index 48fbe796bb4..c94c2dcaff6 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py @@ -49,12 +49,14 @@ def setup_function(): def get_sg_graph(dataset, directed): + dataset.unload() G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) return G def get_mg_graph(dataset, directed): + dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( @@ -141,3 +143,6 @@ def test_dask_mg_betweenness_centrality( diff = cupy.isclose(mg_bc_results, sg_bc_results) assert diff.all() + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py index 8606649c745..68daff9238c 100644 --- a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py @@ -45,12 +45,14 @@ def setup_function(): def get_sg_graph(dataset, directed): + dataset.unload() G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) return G def get_mg_graph(dataset, directed): + dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( @@ -116,3 +118,6 @@ def test_dask_mg_degree(dask_client, dataset, directed): check_names=False, check_dtype=False, ) + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py index 6c066a947ac..c3a559da5c9 100644 --- a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py @@ -14,14 +14,9 @@ import gc import pytest -import dask_cudf -from pylibcugraph.testing.utils import gen_fixture_params_product -from cugraph.datasets import karate, dolphins - import cugraph import cugraph.dask as dcg - -# from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.datasets import karate, dolphins # ============================================================================= @@ -33,79 +28,34 @@ def setup_function(): gc.collect() -IS_DIRECTED = [True, False] -INCLUDE_WEIGHTS = [False, True] -INCLUDE_EDGE_IDS = [False, True] -NORMALIZED_OPTIONS = [False, True] -SUBSET_SIZE_OPTIONS = [4, None] - - -# email_Eu_core is too expensive to test -datasets = [karate, dolphins] - - # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - (IS_DIRECTED, "directed"), - (INCLUDE_WEIGHTS, "include_weights"), - (INCLUDE_EDGE_IDS, "include_edgeids"), - (NORMALIZED_OPTIONS, "normalized"), - (SUBSET_SIZE_OPTIONS, "subset_size"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict( - zip( - ( - "graph_file", - "directed", - "include_weights", - "include_edge_ids", - "normalized", - "subset_size", - "subset_seed", - ), - request.param, - ) - ) +DATASETS = [karate, dolphins] +IS_DIRECTED = [True, False] +IS_WEIGHTED = [True, False] +INCLUDE_EDGE_IDS = [True, False] +IS_NORMALIZED = [True, False] +SUBSET_SIZES = [4, None] - return parameters +# ============================================================================= +# Helper functions +# ============================================================================= -@pytest.fixture(scope="module") -def input_expected_output(input_combo): - """ - This fixture returns the inputs and expected results from the edge - betweenness centrality algo. - (based on cuGraph edge betweenness centrality) which can be used - for validation. - """ - directed = input_combo["directed"] - normalized = input_combo["normalized"] - k = input_combo["subset_size"] - subset_seed = 42 - edge_ids = input_combo["include_edge_ids"] - weight = input_combo["include_weights"] - df = input_combo["graph_file"].get_edgelist() +def get_sg_graph(dataset, directed, edge_ids): + dataset.unload() + df = dataset.get_edgelist() if edge_ids: if not directed: # Edge ids not supported for undirected graph - return - dtype = df.dtypes[0] + return None + dtype = df.dtypes.iloc[0] edge_id = "edge_id" - df["edge_id"] = df.index + df[edge_id] = df.index df = df.astype(dtype) else: @@ -115,30 +65,13 @@ def input_expected_output(input_combo): G.from_cudf_edgelist( df, source="src", destination="dst", weight="wgt", edge_id=edge_id ) - if isinstance(k, int): - k = G.select_random_vertices(subset_seed, k) - input_combo["k"] = k - # Save the results back to the input_combo dictionary to prevent redundant - # cuGraph runs. Other tests using the input_combo fixture will look for - # them, and if not present they will have to re-run the same cuGraph call. - sg_cugraph_edge_bc = ( - cugraph.edge_betweenness_centrality(G, k, normalized) - .sort_values(["src", "dst"]) - .reset_index(drop=True) - ) + return G - input_data_path = input_combo["graph_file"].get_path() - input_combo["sg_cugraph_results"] = sg_cugraph_edge_bc - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) +def get_mg_graph(dataset, directed, edge_ids, weight): + dataset.unload() + ddf = dataset.get_dask_edgelist() if weight: weight = ddf @@ -154,20 +87,16 @@ def input_expected_output(input_combo): edge_id = None dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist( ddf, source="src", destination="dst", - weight="value", + weight="wgt", edge_id=edge_id, renumber=True, ) - input_combo["MGGraph"] = dg - input_combo["include_weights"] = weight - - return input_combo + return dg, weight # ============================================================================= @@ -175,57 +104,79 @@ def input_expected_output(input_combo): # ============================================================================= -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) @pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("directed", IS_DIRECTED) +@pytest.mark.parametrize("weighted", IS_WEIGHTED) +@pytest.mark.parametrize("edge_ids", INCLUDE_EDGE_IDS) +@pytest.mark.parametrize("normalized", IS_NORMALIZED) +@pytest.mark.parametrize("subset_size", SUBSET_SIZES) def test_dask_mg_edge_betweenness_centrality( - dask_client, benchmark, input_expected_output + dask_client, + dataset, + directed, + weighted, + edge_ids, + normalized, + subset_size, + benchmark, ): - if input_expected_output is not None: - dg = input_expected_output["MGGraph"] - k = input_expected_output["k"] - normalized = input_expected_output["normalized"] - weight = input_expected_output["include_weights"] - if weight is not None: - with pytest.raises(NotImplementedError): - result_edge_bc = benchmark( - dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight - ) - - else: + g = get_sg_graph(dataset, directed, edge_ids) + + if g is None: + pytest.skip("Edge_ids not supported for undirected graph") + + dg, weight = get_mg_graph(dataset, directed, edge_ids, weighted) + subset_seed = 42 + + k = subset_size + if isinstance(k, int): + k = g.select_random_vertices(subset_seed, k) + + sg_cugraph_edge_bc = ( + cugraph.edge_betweenness_centrality(g, k, normalized) + .sort_values(["src", "dst"]) + .reset_index(drop=True) + ) + + if weight is not None: + with pytest.raises(NotImplementedError): result_edge_bc = benchmark( dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight ) - result_edge_bc = ( - result_edge_bc.compute() - .sort_values(["src", "dst"]) - .reset_index(drop=True) - .rename(columns={"betweenness_centrality": "mg_betweenness_centrality"}) - ) - if len(result_edge_bc.columns) > 3: - result_edge_bc = result_edge_bc.rename( - columns={"edge_id": "mg_edge_id"} - ) + else: + result_edge_bc = benchmark( + dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight + ) + result_edge_bc = ( + result_edge_bc.compute() + .sort_values(["src", "dst"]) + .reset_index(drop=True) + .rename(columns={"betweenness_centrality": "mg_betweenness_centrality"}) + ) - expected_output = input_expected_output["sg_cugraph_results"].reset_index( - drop=True - ) - result_edge_bc["betweenness_centrality"] = expected_output[ - "betweenness_centrality" - ] - if len(expected_output.columns) > 3: - result_edge_bc["edge_id"] = expected_output["edge_id"] - edge_id_diff = result_edge_bc.query("mg_edge_id != edge_id") - assert len(edge_id_diff) == 0 - - edge_bc_diffs1 = result_edge_bc.query( - "mg_betweenness_centrality - betweenness_centrality > 0.01" - ) - edge_bc_diffs2 = result_edge_bc.query( - "betweenness_centrality - mg_betweenness_centrality < -0.01" - ) + if len(result_edge_bc.columns) > 3: + result_edge_bc = result_edge_bc.rename(columns={"edge_id": "mg_edge_id"}) + + expected_output = sg_cugraph_edge_bc.reset_index(drop=True) + result_edge_bc["betweenness_centrality"] = expected_output[ + "betweenness_centrality" + ] + if len(expected_output.columns) > 3: + result_edge_bc["edge_id"] = expected_output["edge_id"] + edge_id_diff = result_edge_bc.query("mg_edge_id != edge_id") + assert len(edge_id_diff) == 0 + + edge_bc_diffs1 = result_edge_bc.query( + "mg_betweenness_centrality - betweenness_centrality > 0.01" + ) + edge_bc_diffs2 = result_edge_bc.query( + "betweenness_centrality - mg_betweenness_centrality < -0.01" + ) + + assert len(edge_bc_diffs1) == 0 + assert len(edge_bc_diffs2) == 0 - assert len(edge_bc_diffs1) == 0 - assert len(edge_bc_diffs2) == 0 + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py index e2ce7d2c341..60d39273777 100644 --- a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,11 +16,10 @@ import pytest import cudf -import dask_cudf import cugraph import cugraph.dask as dcg from cugraph.dask.common.mg_utils import is_single_gpu -from cugraph.testing.utils import DATASETS +from cugraph.datasets import karate_disjoint, dolphins, netscience # ============================================================================= @@ -32,28 +31,33 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate_disjoint, dolphins, netscience] IS_DIRECTED = [True, False] +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -@pytest.mark.parametrize("input_data_path", DATASETS) -def test_dask_mg_eigenvector_centrality(dask_client, directed, input_data_path): - input_data_path = input_data_path.as_posix() +def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed): + input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) mg_res = dcg.eigenvector_centrality(dg, tol=1e-6) mg_res = mg_res.compute() + import networkx as nx from cugraph.testing import utils @@ -84,20 +88,15 @@ def test_dask_mg_eigenvector_centrality(dask_client, directed, input_data_path): err = err + 1 assert err == 0 + # Clean-up stored dataset edge-lists + dataset.unload() + @pytest.mark.mg def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): - input_data_path = DATASETS[0] + dataset = DATASETS[0] - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -110,3 +109,6 @@ def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): with pytest.warns(UserWarning, match=warning_msg): dcg.eigenvector_centrality(dg) + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py index 72b81ce50bb..d1a899eba06 100644 --- a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,11 +16,10 @@ import pytest import cudf -import dask_cudf import cugraph import cugraph.dask as dcg from cugraph.dask.common.mg_utils import is_single_gpu -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.datasets import karate # ============================================================================= @@ -32,25 +31,30 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate] IS_DIRECTED = [True, False] +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_katz_centrality(dask_client, directed): +def test_dask_mg_katz_centrality(dask_client, dataset, directed): - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() + input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -92,22 +96,16 @@ def test_dask_mg_katz_centrality(dask_client, directed): err = err + 1 assert err == 0 + # Clean-up stored dataset edge-lists + dataset.unload() + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_katz_centrality_nstart(dask_client, directed): - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() - print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) +def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed): + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -139,20 +137,14 @@ def test_dask_mg_katz_centrality_nstart(dask_client, directed): err = err + 1 assert err == 0 + # Clean-up stored dataset edge-lists + dataset.unload() -@pytest.mark.mg -def test_dask_mg_katz_centrality_transposed_false(dask_client): - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) +@pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset): + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -165,3 +157,6 @@ def test_dask_mg_katz_centrality_transposed_false(dask_client): with pytest.warns(UserWarning, match=warning_msg): dcg.katz_centrality(dg) + + # Clean-up stored dataset edge-lists + dataset.unload()