Skip to content

Commit

Permalink
Merge branch 'branch-24.06' into test/futuredeprwarnings/errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke authored May 14, 2024
2 parents abe8bf4 + b9c0fc6 commit 36ca20f
Show file tree
Hide file tree
Showing 19 changed files with 322 additions and 704 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,3 @@ def test_mg_betweenness_centrality(
second_key="ref_bc",
epsilon=DEFAULT_EPSILON,
)

# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,3 @@ def test_mg_edge_betweenness_centrality(
second_key="ref_bc",
epsilon=DEFAULT_EPSILON,
)
# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.:
# Copyright (c) 2020-2024, NVIDIA CORPORATION.:
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -111,11 +111,18 @@ def calc_betweenness_centrality(
else:
edge_attr = None

G = graph_file.get_graph(
download=True,
create_using=cugraph.Graph(directed=directed),
ignore_weights=not edgevals,
)
G = None
if multi_gpu_batch:
G = graph_file.get_dask_graph(
create_using=cugraph.Graph(directed=directed), ignore_weights=not edgevals
)
G.enable_batch()
else:
G = graph_file.get_graph(
download=True,
create_using=cugraph.Graph(directed=directed),
ignore_weights=not edgevals,
)

M = G.to_pandas_edgelist().rename(
columns={"src": "0", "dst": "1", "wgt": edge_attr}
Expand All @@ -130,8 +137,6 @@ def calc_betweenness_centrality(
)

assert G is not None and Gnx is not None
if multi_gpu_batch:
G.enable_batch()

calc_func = None
if k is not None and seed is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ def setup_function():


def get_sg_graph(dataset, directed):
dataset.unload()
G = dataset.get_graph(create_using=cugraph.Graph(directed=directed))

return G


def get_mg_graph(dataset, directed):
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
Expand Down Expand Up @@ -96,7 +94,6 @@ def test_dask_mg_betweenness_centrality(
benchmark,
):
g = get_sg_graph(dataset, directed)
dataset.unload()
dg = get_mg_graph(dataset, directed)
random_state = subset_seed

Expand Down Expand Up @@ -143,6 +140,3 @@ def test_dask_mg_betweenness_centrality(
diff = cupy.isclose(mg_bc_results, sg_bc_results)

assert diff.all()

# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ def setup_function():


def get_sg_graph(dataset, directed):
dataset.unload()
G = dataset.get_graph(create_using=cugraph.Graph(directed=directed))

return G


def get_mg_graph(dataset, directed):
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
Expand Down Expand Up @@ -118,6 +116,3 @@ def test_dask_mg_degree(dask_client, dataset, directed):
check_names=False,
check_dtype=False,
)

# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def setup_function():


def get_sg_graph(dataset, directed, edge_ids):
dataset.unload()
df = dataset.get_edgelist()
if edge_ids:
if not directed:
Expand All @@ -71,7 +70,6 @@ def get_sg_graph(dataset, directed, edge_ids):


def get_mg_graph(dataset, directed, edge_ids, weight):
dataset.unload()
ddf = dataset.get_dask_edgelist()

if weight:
Expand Down Expand Up @@ -178,6 +176,3 @@ def test_dask_mg_edge_betweenness_centrality(

assert len(edge_bc_diffs1) == 0
assert len(edge_bc_diffs2) == 0

# Clean-up stored dataset edge-lists
dataset.unload()
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def setup_function():
def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed):
input_data_path = dataset.get_path()
print(f"dataset={input_data_path}")
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True)
Expand Down Expand Up @@ -89,15 +88,11 @@ def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed):
err = err + 1
assert err == 0

# Clean-up stored dataset edge-lists
dataset.unload()


@pytest.mark.mg
def test_dask_mg_eigenvector_centrality_transposed_false(dask_client):
dataset = DATASETS[0]

dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False)
Expand All @@ -110,6 +105,3 @@ def test_dask_mg_eigenvector_centrality_transposed_false(dask_client):

with pytest.warns(UserWarning, match=warning_msg):
dcg.eigenvector_centrality(dg)

# Clean-up stored dataset edge-lists
dataset.unload()
12 changes: 0 additions & 12 deletions python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed):
input_data_path = dataset.get_path()
print(f"dataset={input_data_path}")

dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True)
Expand Down Expand Up @@ -95,16 +94,12 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed):
err = err + 1
assert err == 0

# Clean-up stored dataset edge-lists
dataset.unload()


@pytest.mark.mg
@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.parametrize("dataset", DATASETS)
@pytest.mark.parametrize("directed", IS_DIRECTED)
def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed):
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True)
Expand Down Expand Up @@ -136,14 +131,10 @@ def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed):
err = err + 1
assert err == 0

# Clean-up stored dataset edge-lists
dataset.unload()


@pytest.mark.mg
@pytest.mark.parametrize("dataset", DATASETS)
def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset):
dataset.unload()
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False)
Expand All @@ -156,6 +147,3 @@ def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset):

with pytest.warns(UserWarning, match=warning_msg):
dcg.katz_centrality(dg)

# Clean-up stored dataset edge-lists
dataset.unload()
92 changes: 35 additions & 57 deletions python/cugraph/cugraph/tests/comms/test_comms_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
import pytest
import cugraph.dask as dcg

import cudf
import dask_cudf
import cugraph
from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH
from cugraph.datasets import karate, dolphins


# =============================================================================
# Pytest Setup / Teardown - called for each test function
Expand All @@ -30,75 +29,54 @@ def setup_function():
gc.collect()


# =============================================================================
# Parameters
# =============================================================================


DATASETS = [karate, dolphins]
IS_DIRECTED = [True, False]


# @pytest.mark.skipif(
# is_single_gpu(), reason="skipping MG testing on Single GPU system"
# )
# =============================================================================
# Helper Functions
# =============================================================================


def get_pagerank_result(dataset, is_mg):
"""Return the cugraph.pagerank result for an MG or SG graph"""

if is_mg:
dg = dataset.get_dask_graph(store_transposed=True)
return dcg.pagerank(dg).compute()
else:
g = dataset.get_graph(store_transposed=True)
return cugraph.pagerank(g)


# =============================================================================
# Tests
# =============================================================================


@pytest.mark.mg
@pytest.mark.parametrize("directed", IS_DIRECTED)
def test_dask_mg_pagerank(dask_client, directed):

# Initialize and run pagerank on two distributed graphs
# with same communicator

input_data_path1 = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix()
input_data_path1 = karate.get_path()
print(f"dataset1={input_data_path1}")
chunksize1 = dcg.get_chunksize(input_data_path1)
result_pr1 = get_pagerank_result(karate, is_mg=True)

input_data_path2 = (RAPIDS_DATASET_ROOT_DIR_PATH / "dolphins.csv").as_posix()
input_data_path2 = dolphins.get_path()
print(f"dataset2={input_data_path2}")
chunksize2 = dcg.get_chunksize(input_data_path2)

ddf1 = dask_cudf.read_csv(
input_data_path1,
blocksize=chunksize1,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

dg1 = cugraph.Graph(directed=directed)
dg1.from_dask_cudf_edgelist(ddf1, "src", "dst")

result_pr1 = dcg.pagerank(dg1).compute()

ddf2 = dask_cudf.read_csv(
input_data_path2,
blocksize=chunksize2,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

dg2 = cugraph.Graph(directed=directed)
dg2.from_dask_cudf_edgelist(ddf2, "src", "dst")

result_pr2 = dcg.pagerank(dg2).compute()
result_pr2 = get_pagerank_result(dolphins, is_mg=True)

# Calculate single GPU pagerank for verification of results
df1 = cudf.read_csv(
input_data_path1,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

g1 = cugraph.Graph(directed=directed)
g1.from_cudf_edgelist(df1, "src", "dst")
expected_pr1 = cugraph.pagerank(g1)

df2 = cudf.read_csv(
input_data_path2,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

g2 = cugraph.Graph(directed=directed)
g2.from_cudf_edgelist(df2, "src", "dst")
expected_pr2 = cugraph.pagerank(g2)
expected_pr1 = get_pagerank_result(karate, is_mg=False)
expected_pr2 = get_pagerank_result(dolphins, is_mg=False)

# Compare and verify pagerank results

Expand Down
15 changes: 4 additions & 11 deletions python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import cugraph
import cugraph.dask as dcg
import dask_cudf
from cudf.testing.testing import assert_frame_equal
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.datasets import karate, dolphins, email_Eu_core
Expand All @@ -36,11 +35,13 @@ def setup_function():
# Parameters
# =============================================================================


DATASETS = [karate, dolphins, email_Eu_core]
IS_DIRECTED = [True, False]
NUM_VERTICES = [2, 5, 10, 20]
OFFSETS = [None]


# =============================================================================
# Helper functions
# =============================================================================
Expand All @@ -53,15 +54,7 @@ def get_sg_graph(dataset, directed):


def get_mg_graph(dataset, directed):
input_data_path = dataset.get_path()
blocksize = dcg.get_chunksize(input_data_path)
ddf = dask_cudf.read_csv(
input_data_path,
blocksize=blocksize,
delimiter=dataset.metadata["delim"],
names=dataset.metadata["col_names"],
dtype=dataset.metadata["col_types"],
)
ddf = dataset.get_dask_edgelist()
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
ddf,
Expand Down Expand Up @@ -108,7 +101,7 @@ def test_mg_induced_subgraph(

# FIXME: This parameter is not yet tested
# mg_offsets = mg_offsets.compute().reset_index(drop=True)
mg_df, mg_offsets = result_induced_subgraph
mg_df, _ = result_induced_subgraph

if mg_df is not None and sg_induced_subgraph is not None:
# FIXME: 'edges()' or 'view_edgelist()' takes half the edges out if
Expand Down
Loading

0 comments on commit 36ca20f

Please sign in to comment.